Blogs

烧焦器,第3部分:具有双缓冲和旋转壁炉的裸机并发

杰森·萨克斯(Jason Sachs) 2020年7月25日

这是一篇简短的文章,内容涉及一种用于在异步进程之间进行通信的技术 裸机嵌入式系统.

问:为什么多线程鸡肉过马路?

答:到另一边。得到

— Jason Whittington

那里 are many reasons why 并发 难的 to manage — at least if you’re using an 具有共享可变状态的命令式编程语言。在孤立的进程之间使用消息传递(例如Go和 Erlang )是处理并发的一种方法…但是您需要一个支持消息传递作为原始操作的系统以及支持它的资源。

在资源受限的嵌入式系统上运行的任何人运行消息传递的概念,他们可能会看着您,好像您有两个头脑。你’需要某种具有足够空间来处理未决消息的硬件或软件队列。这种方法在高端嵌入式系统或通用PC上效果更好,您可以在其中利用操作系统的消息传递功能,并且所需的开销很小。 运行裸机且具有数千字节(或更少!)RAM而不是兆字节或千兆字节的典型8位或16位系统不会’无处不在使用队列—至少并非没有精心设计。

所以让’为一个替代方案奠定了基础,裸机系统具有两个需要通信的过程。

迅速 and Poky

让’s say we’重新使用具有以下功能的微控制器:

  • 标称数量的RAM(例如4096字节)
  • 单核处理器,其访问方式可以通过以下方式建模: 顺序一致性
  • 模数转换器(ADC),可以将其某些引脚上的一些信号数字化
  • 某种定时机制可以定期触发ADC,例如以10kHz触发
  • 当ADC完成对这些信号的数字化处理时可以触发的中断

这些都是很常见的。除了没有ADC的低端8位处理器之外,您’找到这样的微控制器将毫无困难。 (如果ADC的自动触发不’如果有可用的话,稍微不太理想的后备方法是在进入定时器中断服务程序后以软件手动触发它。)

可能更难理解和验证的唯一方面是顺序一致性。这是一个用于处理器核心的简单计算模型,该处理器核心一个接一个地执行一系列指令,其中读取存储器的每个指令都可以查看任何先前写入存储器的指令的结果。这不是唯一的 计算记忆模型 that exists, but it’是规则,而不是简单的嵌入式系统中的例外。在另一端是今天’的台式机和服务器处理器,它们不仅具有多个内核使用多级内存缓存访问共享内存,而且还具有 乱序执行。在那种系统中,内存访问的顺序保证放宽了,只有在使用时才获得更严格的保证。 屏障或栅栏 确保一次计算的结果 发生在之前 a read from another.

我们的微控制器的固件将使用C编程,并且不使用操作系统。我们有两个过程(我用术语“processes”从抽象的意义上讲,而不是在像Linux这样的操作系统中,它试图尝试调度一些进程。)’ll call 迅速ky:

  • ky starts executing from main(), and after some initialization steps, its only job is to repeat the following tasks in order:
    • 通过某种串行通信端口(CAN或UART)接受命令
    • 处理它们(可能需要与Speedy协调)
    • 通过串行通信端口将结果发送回
  • 迅速 只要有ADC采样可用,就在一个称为10kHz的中断服务程序中执行,执行以下任务:
    • 获取ADC样本的结果
    • 进行足够的信号处理以减轻Poky的负担
    • 与Poky交换信息
    • 在某些情况下,通过PWM信号或DAC改变微控制器的输出

这种Speedy / Poky系统并非人为设计:’s something I’在过去的25年中,我们一直与之合作,并在许多电机驱动器,数字电源转换器和其他工业系统中得到广泛应用。

因为这是单核处理器,所以在发生中断时:

  • ky’的执行停止(这可能不会立即发生;对于诸如此类的事情,通常需要花费一些指令周期 管道 to clear)
  • 程序计数器(PC)保存在硬件中
  • ISR开始运行,并保存可能需要更改的所有CPU寄存器
  • 迅速 executes
  • ISR将所有CPU寄存器恢复到其保存状态
  • CPU跳回到保存的PC并继续Poky’s execution.

从Poky的角度来看,Speedy在Poky的指令之间瞬间即刻执行’的固件。除了一个例外,Poky必须假设Speedy’执行可以随时发生,通常在最不方便的指令之间执行。一个例外是Poky可以在代码的关键部分暂时禁用ADC中断,并且在禁用ADC中断的同时,迅速’的执行被推迟。这意味着Poky不会’不必担心在关键部分快速更改RAM。但这意味着Speedy可能无法完全以10kHz的频率运行,特别是如果Poky长时间禁用中断。

这里的挑战是如何让Speedy和Poky相互交流。如果我们定义了一种机制,包括共享状态以及一些有关如何使用它的规则,那么我们可以做到。

阻塞和非阻塞并发

现在,在使用共享内存的计算机上管理并发的最安全方法是使用精心设计的库原语,例如 互斥体 。互斥锁是阻止并发的示例,其中两个进程通过等待获取互斥锁来争用资源。进程仅在获取互斥量后才访问共享内存,并且一旦完成,便释放它,以便另一个进程可以访问。该技术可以安全地序列化对共享内存的访问。

不过,对于Speedy和Poky,使用互斥可能不起作用:Speedy是一个示例 硬实时过程,其中某些内容必须经常执行,并带有一定的最大延迟。在迅速’s case, we haven’t指定必须严格管理中断处理程序的执行时间,以便它可以运行并完成其处理,但是我们’d希望它是每100微秒,也许这意味着我们可以容忍10或20μ中断开始前的等待时间。这是你的事’d必须逐案研究,看看迟到的后果。 (例如,在电动机驱动器中,在将ADC输入数字化和更新PWM输出之间有一定的时间,迟到会导致对控制环路的干扰,从而导致听得见的噪声,振动甚至失去控制。)快速不能容忍,但是,是无限期的延迟。在互斥锁上进行阻塞不是在ISR中应该执行的操作;即使您可以以某种方式保证最大延迟,’一个红旗。 ISR应该仅包含非阻塞代码。

此外,尽管在ISR中使用非阻塞代码是必要的,但这还不够。快速不仅可以’t get stuck; it 必须 完成其所需的任务。如果Speedy检测到由于Poky正在写东西而无法访问共享内存,则Speedy’任务失败,它所能做的就是设置某种标志,以指出它无法完成其处理,从而有可能危害所有将来计算的正确性。

那里 are a number of 不用等待 在并发进程需要访问共享内存的情况下可以使用的数据结构—用于链接列表,队列和哈希映射之类的东西。那些在这里过大了。您可能会在通用计算机上的多线程软件中看到它们。

We’将要利用Speedy / Poky系统的一些特性’存在于通用计算机中,并描述了一些简单的东西, 双缓冲 我称之为旋转壁炉。

旋转壁炉

图片如下:

毛茸茸 史酷比 在鬼屋里。他们试图与朋友们解开一个谜,但与此同时,他们又饿又有点冷,因此他们从餐具室掠过三明治固定物,然后带到另一个房间,壁炉前摆放着一些勃艮第的勃艮第天鹅绒椅​​子。有一团温暖的火焰在燃烧,壁炉旁有一张小桌子,所以他们每个人都开始在桌子上固定一个三明治。它们很快被堆满了他们发现的所有食物。

史酷比听到微弱的how叫声,跳到毛茸茸的’的手臂。他们走进大厅去调查,但是不要’什么都看不到。一阵石刮的声音,就像一个沉重的地穴盖滑开的声音,当它们回来时,桌子上光秃秃的:三明治和所有食物都没了。

毛茸茸的和史酷比搜索房间,寻找他们的三明治,当他们的背转过身时,又有刮擦的声音,这一次,他们露出了眼角,发誓他们看到桌子移动了一会儿。这次,一个大毛绒黑熊站在桌子旁边,动物标本师’的杰作,举起手臂和爪子,瞪着它们。毛茸茸的笑着。他在熊面前挥舞着手’s face. “It’s only a dummy,”他说,敲熊’的头骨。眼睛在熊内移动’的头,发出嘶哑的声音。“Zoinks!”大吼大叫。他和史酷比跑出房间,穿过大厅,胳膊和腿在走动时异常摆动。

史酷比跳上楼梯栏杆并滑落,随后沙吉紧追在身,随后他们降落在楼梯底部的一堆垃圾中。弗雷德,费尔玛和达芙妮不赞成地看着他们。“你们两个去哪了” says Fred. “We’在寻找这个谜的线索,” says Velma.

“We found a h-h-h…一个h鬼的壁炉!” says Shaggy.

“Raunted rireplace!” says Scooby.

弗雷德(Fred),维尔玛(Velma)和达芙妮(Daphne)持怀疑态度,但沙吉(Shaggy)和史酷比(Scooby)带头回到楼上,走进带壁炉的房间,他们的三明治在桌子上安静地坐着,就如同’d left them.

“But…但是这里有只熊!”毛茸茸的说。史酷比吃了两个三明治。

弗雷德,维尔玛和达芙妮不服气。

在剧集中的晚些时候,达芙妮从一个洗衣槽掉落到另一个房间,房间里摆放着富丽堂皇的蓝色天鹅绒椅子,放在同一个壁炉和桌子前。该团伙的其余成员设法找到达芙妮,以及一个秘密杠杆,使壁炉从一个房间到另一个房间串联旋转。事实证明,有一对伪造者只是在与傻子和史酷比开玩笑,这是一种恶作剧,尽管他们所有的伪造机器都安全地藏在了小镇的另一边的一家废弃糖果厂中,但该团伙抓住了他们并发现了伪造品20美元的钞票,这使地方检察官可能有理由发出搜查糖果工厂的手令。“如果没有的话,我们也将一事无成。’为您爱管闲事的孩子们!”一名造假者说,当警察戴上手铐时。

假设地说,就是这样。我本可以发誓有一个史酷比(Scooby-Doo)情节,上面有一对旋转壁炉(与哈林环球旅行者(Harlem Globetrotters)在一起)。不幸的是,似乎没有这样的事情。我似乎已经把这与 凯文船长与双子滑雪小屋的那集 或许是《印第安纳·琼斯》和《最后的十字军东征》中的火景:

旋转 什么 ?!

作为并发协议,旋转壁炉是个穷人’s messaging system.

一个真实的 信息传递 传输过程和接收过程之间相互隔离的体系结构需要以下元素:

  • 传输端的临时存储,以构造一条消息
  • 接收端的临时存储,以接收消息
  • 充当发送和接收过程之间的中介的某些实体(例如,操作系统,或者在裸机系统的情况下,包含用于促进消息传递的功能和数据结构的模块)
  • 允许发送端知道是否可以发送消息的某种方法
  • 允许发送端请求中介从发送器发送消息的某种方法’的临时存储,并知道何时传输完成
  • 允许接收端知道消息是否可以接收的某种方法
  • 允许接收端请求中介者将下一个可用消息传递到接收者的某种方法’的临时存储,并知道接收完成的时间
  • 中间人在其发送和接收之间保存消息的存储容量

这给中介带来了任何并发的负担。发送器和接收器本身解耦,在那里’没有共享状态,不需要互斥或使它们合作。

对于Speedy和Poky,我们不’t need all that.

迅速和Poky各自需要某种工作空间来准备他们的信息’打算互相分享。使用共享内存而不是消息传递,Poky需要自由地将信息写入适当的位置,并且只有在完成时才向Speedy发信号以读取信息。我们的“fireplaces”共享内存的两个部分可以在Speedy和Poky之间交换;我们还需要一些同步机制来通知事件(“ready to publish” or “要求更改所有权”),以及一些适当合作的规则。

可以使用单个共享变量和操作它的操作来实现同步机制,只要这些操作具有三个重要的属性即可:

  • 访问共享变量是原子的
  • 共享变量被认为是易变的
  • 禁止通过处理器或编译器对共享变量的访问与其他键操作进行重新排序。

这里’是棘手的地方,因为不幸的是我们没有’从C语言标准中获得很多帮助,而这是普通的普通程序员易于理解的。

原子和挥发性:再次访问地下室

易挥发的

The volatile property just requires use of the 易挥发的 keyword in C, which tells the compiler not to make any optimization based on an assumption that the compiler knows what is contained in a variable. For example:

int16_t ultimate_answer(void)
{
    volatile int16_t x = 6*9;
    x = 42;
    return x;
}

Without the 易挥发的 qualifier on the local variable x, the compiler could just have ultimate_answer() return 42 to the caller. Instead, the compiler is forced to store 54 into x, then store 42 into x, and then read the content of x before returning that value to the caller.

Furthermore, access to different 易挥发的 variables cannot be reordered (although the compiler can reorder non-易挥发的 variables with respect to 易挥发的 variables); the C standard’声明这的方法是 N1256草案第5.1.2.3节)

访问易失性对象,修改对象,修改文件或调用函数 这些操作中的任何一个都是副作用,即状态的变化 执行环境。评价表达可能会产生副作用。在 执行序列中的某些指定点称为序列点,所有副作用 以前的评估应完整,且以后的评估无副作用 应该发生了。 (序列点的摘要在附件C中给出。)

(最近的草稿,例如N1570,已经使水变得混乱,并以普通读者不太清楚的方式表达出来。)

I’ve mentioned 易挥发的 在上一篇文章中,因此,如果您想更深入地了解它,请阅读它或其中之一:

定购

只要计算的关键顺序由C确定’s rules on 易挥发的 variables, there’s no additional synchronization needed. Memory barriers (e.g. asm volatile ("" : : : "memory"); as outlined in John Regehr’s post) may be needed to constrain non-易挥发的 variables from being reordered.

原子性

对于原子性,我们只需要确保在一个不可分割的操作中读取或写入共享变量即可;如果Poky需要两步来完成某件事,而Speedy在这两步之间执行,则Speedy可能会在其共享状态下看到无效的值。 (例如,设置32位指针的16位一半。)

大多数(如果不是全部的话)处理器可以确保内存加载或存储的机器字大小是原子的:例如,在16位系统上,您可以在一条指令中加载或存储16位值在内存中,并且中断可以’t pop up in the middle of a load and store. Similarly, sometimes there are atomic instructions for setting or clearing bits, or even for loading/storing two words at once. (Read the fine print, though; on a dsPIC33 device, there is a MOV.D instruction that takes 2 cycles and is not interruptable, but because the device has a 16-bit bus, it is possible for DMA to sneak in between the cycles and ruin the atomicity, at least with respect to DMA.)

如果编译器没有’如果没有保证可以映射到这些原子指令之一的内建函数或内在函数,那么您可能需要依靠内联汇编才能使用它们。

那里’较弱的方法,那就是信任编译器,但是在这种情况下,负担就在于 信任但要验证 编译器正在执行您认为正在执行的操作。这很危险。

如果你’很幸运拥有现代C或C ++编译器— where “modern”表示C11 / C ++ 11或更高版本—有一些功能由 std::atomic library in C++, or <stdatomic.h> in C that can guarantee all three of these behaviors from the compiler:

  • 值将以不可分割的操作读取或更新
  • 对值的访问将以顺序一致的顺序进行
  • 禁止编译器根据值的假设进行优化

不幸的是,我’我对C11 / C ++ 11原子不够熟悉,无法对它们的正确使用提供很好的建议;您可以查看下面的小节 投机建议。如果你不这样做’没有现代化的C或C ++编译器,您’我将不得不看看您的C编译器是否具有可以保证原子操作的内建函数或内在函数。

投机建议

本节包含有关C语言中原子支持的有限建议,但在某些方面存在疑问。(警告!由于我的工作环境有限,我以前从未使用过这些建议,因此请稍加盐味并进行应有的尽职调查。)

如果你r C compiler supports C11 ’s <stdatomic.h> (compilers are allowed to define __STDC_NO_ATOMICS__ to say that they can’不要被这个东西困扰,你’re on your own), then you should be able to use any of the base types such as 原子_int or 原子_bool, or qualify any type with _Atomic() such as _Atomic(uint32_t). Here you just use the variables the way you want and the compiler will guarantee there aren’t数据争用,尽管即使您的C代码未指定使用锁,也可能需要在基础实现中使用锁才能保证这一点。例如:

#include <stdint.h>
#include <stdatomic.h>

_Atomic(uint32_t) shared_var1;
atomic_uint shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    shared_var1 = val;
    ++shared_count;
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = shared_var1;
    /* Warning: the code below doesn't perform the if-statement and body 
     * in an atomic manner; for that, you would need to use 
     * a critical section or mutex.
     */
    if (shared_count <= maxcount) 
    {
        shared_var1 = 44;
    }
    return oldval;
}

The accesses to shared_var1 might turn into a function call that allows only the currently-running thread to read or write it.

另外,您可以使用类似 原子_load原子_store to work with “regular”原子方式的变量:

#include <stdint.h>
#include <stdatomic.h>

uint32_t shared_var1;
unsigned int shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    atomic_store(&shared_var1, val);
    atomic_fetch_add(&shared_count, 1);  
    // atomic version of ++shared_count
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = atomic_load(&shared_var1);
    /* Warning: the code below doesn't perform the if-statement and body 
     * in an atomic manner; for that, you would need to use 
     * a critical section or mutex.
     */
    if (atomic_load(&shared_count) <= maxcount) 
    {
        atomic_store(&shared_var1, 44);
    }
    return oldval;
}

仅通过这些功能访问共享变量之间才能保证顺序一致性。如果一个线程直接从这些变量之一读取或写入,则所有选择均关闭。

如果你r compiler does not support C11 or does not provide <stdatomic.h> support, then some compilers provide intrinsic substitutes. For example, GCC 4.7.0 (and later) has __atomic_load_n__atomic_store_n:

#include <stdint.h>

uint32_t shared_var1;
unsigned int shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    __atomic_store_n(&shared_var1, val, __ATOMIC_SEQ_CST);
    __atomic_fetch_add(&shared_count, 1, __ATOMIC_SEQ_CST);  
    // atomic version of ++shared_count
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = __atomic_load_n(&shared_var1, __ATOMIC_SEQ_CST);
    /* Warning: the code below doesn't perform the if-statement and body 
     * in an atomic manner; for that, you would need to use
     * a critical section or mutex.
     */
    if (__atomic_load_n(&shared_count, __ATOMIC_SEQ_CST) <= maxcount) 
    {
        __atomic_store_n(&shared_var1, 44, __ATOMIC_SEQ_CST);
    }
    return oldval;
}

Clang appears to provide builtins like __c11_atomic_store__c11_atomic_load but it’s unclear if there’在任何情况下,都应使用此功能代替标准的C11功能。

至于在具有顺序一致的内存访问的典型低端单核处理器上,什么样的代码实际上在后台运行,我们可以看一下 MSP430上的编译器资源管理器在哪里<stdatomic.h> 是 supported. I wrote a handful of short functions with _a_ (atomic) and _v_ (volatile) variants to perform loads and stores of bool, uint16_t, and uint32_t values, along with a 16-bit increment:

#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>

typedef _Atomic(uint16_t) atomic_uint16_t; 
typedef _Atomic(uint32_t) atomic_uint32_t;

void store_v_b(volatile bool *pshared, bool b) {
    *pshared = b;
}

void store_v_16(volatile uint16_t *pshared, uint16_t x) {
    *pshared = x;
}

void store_v_32(volatile uint32_t *pshared, uint32_t x) {
    *pshared = x;
}

void store_a_b(atomic_bool *pshared, bool b) {
    *pshared = b;
}

void store_a_16(atomic_uint16_t *pshared, uint16_t x) {
    *pshared = x;
}

void store_a_32(atomic_uint32_t *pshared, uint32_t x) {
    *pshared = x;
}

bool load_v_b(volatile bool *pshared) {
    return *pshared;
} 

uint16_t load_v_16(volatile uint16_t *pshared) {
    return *pshared;
}

uint16_t load_v_32(volatile uint32_t *pshared)
{
    return *pshared;
}

bool load_a_b(atomic_bool *pshared)
{
    return *pshared;
}

uint16_t load_a_16(atomic_uint16_t *pshared)
{
    return *pshared;
}

uint16_t load_a_32(atomic_uint32_t *pshared)
{
    return *pshared;
}

void inc_a_16(atomic_uint16_t *pshared)
{
    atomic_fetch_add(pshared, 1);
}

void inc_v_16(volatile uint16_t *pshared)
{
    ++*pshared;
}

With MSP430 gcc 6.2.1 -O2, this compiles to

store_v_b:
        MOV.B   R13, @R12
        RET
store_v_16:
        MOV.W   R13, @R12
        RET
store_v_32:
        MOV.W   R13, @R12
        MOV.W   R14, 2(R12)
        RET
store_a_b:
        AND     #0xff, R13
        MOV.B   R13, @R12
        RET
store_a_16:
        MOV.W   R13, @R12
        RET
store_a_32:
        MOV.B   #5, R15
        CALL    #__atomic_store_4
        RET
load_v_b:
        MOV.B   @R12, R12
        RET
load_v_16:
        MOV.W   @R12, R12
        RET
load_v_32:
        MOV.W   2(R12), R13
        MOV.W   @R12, R12
        RET
load_a_b:
        MOV.B   @R12, R12
        RET
load_a_16:
        MOV.W   @R12, R12
        RET
load_a_32:
        MOV.B   #5, R13
        CALL    #__atomic_load_4
        RET
inc_a_16:
        MOV.B   #5, R14
        MOV.B   #1, R13
        CALL    #__atomic_fetch_add_2
        RET
inc_v_16:
        ADD.W   #1, @R12
        RET

你’ll note that in most cases these are trivial, lightweight implementations, even for the _Atomic variants. The exceptions are:

  • 32位原子负载
  • 32位原子存储
  • 16位原子增量

在这些情况下,编译器将调用一个库函数,该函数可能会禁用中断足够长的时间以使其能够运行一些关键指令。 (任何人都对MSP430编译器足够熟悉,可以说出什么’这些里面?)32位加载和存储是有意义的,但是我’我对16位的增量很感兴趣— you can see there is a single instruction ADD.W #1, @R12 that works in the 易挥发的 case. Either there is something about the CPU execution or memory models on the MSP430 that makes this insufficient for the guarantees needed by C11 atomics, or the compiler writers haven’t finished tweaking the compiler to reduce the 16-bit fetch_add case to the ADD.W instruction.

扩展组装的DIY原子

如果你不这样做’t have access to <stdatomic.h> or an appropriate builtin, and you REALLY want to try implementing and testing your own atomic functions, you might try using extended assembly. Here’一个例子,用 pyxc16:

import pyxc16

src = r'''
#include <stdint.h>
#include <stdbool.h>

#define DECLARE_ATOMIC_READWRITE(T, M) \
inline static void _util_atomic_##T##_write(volatile T *location, T value) \
{ \
    /* atomic equivalent of *location = value; */                \
    asm volatile(";! BEGIN _util_atomic_" #T "_write\n"          \
                 "\t" M " %[val], [%[loc]]\n"                    \
                 "\t;! END   _util_atomic_" #T "_write"          \
                 : /* no outputs */                              \
                 : [val] "r" (value), [loc] "r" (location));     \
} \
\
inline static uint16_t _util_atomic_##T##_read(volatile uint16_t *location) \
{ \
    /* atomic equivalent of return *location; */                 \
    uint16_t result;                                             \
    asm volatile(";! BEGIN _util_atomic_" #T "_read\n"           \
                 "\t" M " [%[loc]], %[result]\n"                 \
                 "\t;! END   _util_atomic_" #T "_read"           \
                 : [result] "=r" (result)                        \
                 : [loc] "r" (location));                        \
    return result;                                               \
}

typedef volatile void *voidptr;
DECLARE_ATOMIC_READWRITE(uint16_t, "mov")
DECLARE_ATOMIC_READWRITE(voidptr, "mov")
DECLARE_ATOMIC_READWRITE(bool, "mov.b")

// just using volatile
uint16_t test0(volatile uint16_t *px, uint16_t v, 
               volatile voidptr *pv, 
               volatile bool *pb)
{
    *px = v;
    *px = ++v;
    *++px = v+37;
    *pv = px;
    uint16_t y = *px;
    *pb = y > 100;
    
    return y;
}

uint16_t test1(uint16_t *px, uint16_t v, 
               voidptr *pv, 
               bool *pb)
{
    _util_atomic_uint16_t_write(px, v);
    _util_atomic_uint16_t_write(px, ++v);
    _util_atomic_uint16_t_write(++px, v+37);
    _util_atomic_voidptr_write(pv, px);
    uint16_t y = _util_atomic_uint16_t_read(px);
    _util_atomic_bool_write(pb, y > 100);
    
    return y;
}
'''
pyxc16.compile(src, '-O2', comment_filter=';!')
_test0:
	mov	w1,[w0]
	inc	w1,[w0]
	mov	#38,w4
	add	w1,w4,[++w0]
	mov	w0,[w2]
	mov	[w0],w0
	mov.b	#1,w1
	mov	#100,w2
	sub	w0,w2,[w15]
	bra	gtu,.L2
	clr.b	w1
.L2:
	mov.b	w1,[w3]
	return
_test1:
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	inc	w1,w1
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	inc2	w0,w0
	add	#37,w1
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	;!	BEGIN _util_atomic_voidptr_write
	mov	w0, [w2]
	;!	END   _util_atomic_voidptr_write
	;!	BEGIN _util_atomic_uint16_t_read
	mov	[w0], w0
	;!	END   _util_atomic_uint16_t_read
	mov.b	#1,w1
	mov	#100,w2
	sub	w0,w2,[w15]
	bra	gtu,.L6
	clr.b	w1
.L6:
	;!	BEGIN _util_atomic_bool_write
	mov.b	w1, [w3]
	;!	END   _util_atomic_bool_write
	return

The DECLARE_ATOMIC_READWRITE macro takes in a typename T 和 a move instruction M, and generates functions that utilize inline assembly with a single line of implementation (surrounded by BEGIN and END comments to tell where it comes from). The single line is the key — if you can implement a load or store on a dsPIC with a single MOV.B or MOV or MOV.D then it is uninterruptable, and does not suffer from any read-modify-write problems. (although again: MOV.D 是 a 2-cycle instruction due to the use of a 16-bit data bus; it’不会中断,但是DMA事务可能会在总线访问之间潜入。)

(这里 ’s why read-modify-write is a tricky issue. The dsPIC33 architecture has pipelined execution where each instruction is handled with a fetch and execute phase. We have to be sure that an interrupt cannot sneak in between those fetch and execute stages for something like an INC (increment) instruction in indirect mode where contents of memory, specified by a pointer, are incremented, since it accesses memory both on the fetch and execute stages of the pipeline. The manuals on the dsPIC33 cores are mostly clear on 中断处理,即允许所有指令在中断发生之前完成。读取-修改-写入行为可能导致 停滞在指令管道中 为了防止下一条指令获取旧数据,但这是不间断的。如果您使用的是其他架构,则需要非常仔细地了解它如何执行代码。)

无论如何,你’ll note that the assembly generated for test0()test1() are almost the same — in test0() the compiler can save 2 instructions because it can optimize some of the generated code, whereas in test1() it is restricted to plunking down the inline assembly of the __util_atomic_uint16_t* functions as black boxes it cannot change. If I weren’t picky, I’d just use test0() because the compiler 应该 更好地了解并将16位分配视为原子操作。但是那里’至少在C标准方面,编译器不这样做。

仅重申一点:C11 和C ++ 11 原子<> 应该可以帮助您编写正确的程序。 如果可以使用可以为编译器提供更好,更安全的保证的东西,则不建议使用DIY方法。

如果你 really want to delve into the dark corners of memory models and the C11/C++11 atomics, check out 香草萨特’s talks on “atomic<> Weapons” 进入多核处理器的行为 获取/释放语义,以及你如何’应该使用C11或C ++ 11原子编写C代码。

反正’是地下室里的单词。 Yeccch,我感觉有些不舒服。如果您需要有关这些主题的建议,请咨询熟悉C标准的专业人员。

旋转壁炉,示例1

OK,现在到一些实际代码!这里’旋转壁炉实现的一个示例。我们 ’这将使Speedy累积ADC样本的总和,以获得电流和电压。 Poky将读取累积的总和并加以利用,并在需要时通过通信端口进行传输。

在此示例中,协议如下:

  • 在开关操作之外的任何给定时间,Poky和Speedy都可以使用自己的壁炉, 并且不允许访问其他壁炉的内容。
  • 仅Poky允许切换壁炉。
  • 每次ISR完成后,Speedy必须将其壁炉内容保留为有效状态。
/* fireplace.h */

#include <stdint.h>
#include <stdbool.h>

typedef struct {
    int32_t voltage_sum;
    int32_t current_sum;
    uint16_t count;
} FIREPLACE;

typedef struct {
    FIREPLACE fireplaces[2];
    struct {
        volatile FIREPLACE *speedy;
        volatile FIREPLACE *poky;
    } access;
} RAW_SHARED_MEMORY; // w/o volatile -- do not use directly

typedef volatile RAW_SHARED_MEMORY SHARED_MEMORY;

inline static void fireplace_init(volatile FIREPLACE *fp)
{
    fp->voltage_sum = 0;
    fp->current_sum = 0;
    fp->count = 0;
}

inline static void fireplace_switch(SHARED_MEMORY *shmem)
{
    volatile FIREPLACE *tmp = shmem->access.poky;
    shmem->access.poky = shmem->access.speedy;
    shmem->access.speedy = tmp;
}
/* poky.h */
typedef struct {
    int64_t voltage_sum;
    int64_t current_sum;
    uint32_t count;
} POKY_STATE;

inline static void poky_sum_init(POKY_STATE *pstate)
{
    pstate->voltage_sum = 0;
    pstate->current_sum = 0;
    pstate->count = 0;
}
/* poky.c */

#include "fireplace.h"
#include "poky.h"

void poky_init(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    poky_sum_init(pstate);
    fireplace_init(&shmem->fireplaces[0]);
    fireplace_init(&shmem->fireplaces[1]);
    shmem->access.poky   = &shmem->fireplaces[0];
    shmem->access.speedy = &shmem->fireplaces[1];
}

void poky_step(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    /* called during the main loop */
    fireplace_switch(shmem);
    volatile FIREPLACE *mine = shmem->access.poky;
    if (mine->count > 0)
    {
        // Speedy has accumulated samples! 
        // Let's accumulate that into an overall sum.
        pstate->voltage_sum += mine->voltage_sum;
        pstate->current_sum += mine->current_sum;
        pstate->count += mine->count;

        // Now zero out the accumulators 
        // so we can switch fireplaces next time.
        fireplace_init(mine);
    }

    // If we get a message asking for the sums, send them and zero them out.
    if (should_we_transmit_sums())
    {
        transmit_sums(pstate->voltage_sum, pstate->current_sum, pstate->count);
        poky_sum_init(pstate);
    }
}
/* speedy.c */

#include "fireplace.h"

void speedy_step(SHARED_MEMORY *shmem)
{
    // read ADC
    int16_t current = read_adc(ADC_CURRENT);
    int16_t voltage = read_adc(ADC_VOLTAGE);

    volatile FIREPLACE *mine = shmem->access.speedy;
    mine->voltage_sum += voltage;
    mine->current_sum += current;
    ++mine->count;
}

那里!我们有壁炉协议。迅速不’不必担心,他只是将ADC样本倒入壁炉中的累加器中,因为Speedy永远不会被Poky打断。

ky可以被Speedy打断,但是因为Speedy不’不允许切换壁炉,对于Poky来说,从Poky读写是安全的’s own fireplace.

Note that the 壁炉_switch function isn’t atomic — Speedy can interrupt between the read and writes to shmem->access — but this doesn’事Speedy始终可以使用一致的壁炉,而Poky仅在切换之前或之后才能使用壁炉。

唯一的其他要求是Poky必须在变量溢出之前切换壁炉,换句话说,每65536个样本至少要切换一次。

除此之外,Poky现在可以确保它具有每个ADC读数总数的汇总统计信息,而不会丢失任何样本或重复计算,即使

  • 它没有’t以ADC速率运行
  • 它没有’t run periodically
  • 迅速可以在任何地方打断它
  • 它可能在两次Speedy执行之间运行了好几次(有时主循环几乎无所事事,并且重复很快!)

旋转壁炉,示例2

这次,我们’我们会根据Poky的要求迅速更换壁炉。这里的区别是:

  • 迅速不再需要准备好壁炉内的物品以供Poky立即使用。相反,它可以在需要时将其数据按需提供给壁炉 ’s time to switch
  • 请求切换后,Poky在Speedy完成切换之前无法访问壁炉。 (一旦提出请求,Speedy实际上将拥有两个壁炉。)
/* fireplace.h */

#include <stdint.h>
#include <stdbool.h>

typedef struct {
    int32_t voltage_sum;
    int32_t current_sum;
    uint16_t count;
} FIREPLACE;

typedef struct {
    FIREPLACE fireplaces[2];
    struct {
        volatile FIREPLACE *speedy;
        volatile FIREPLACE *poky;
    } access;

    bool switch_request;  
    // only Poky is allowed to set, 
    // only Speedy is allowed to clear
} RAW_SHARED_MEMORY; // w/o volatile -- do not use directly

typedef volatile RAW_SHARED_MEMORY SHARED_MEMORY;

inline static void fireplace_init(volatile FIREPLACE *fp)
{
    fp->voltage_sum = 0;
    fp->current_sum = 0;
    fp->count = 0;
}

inline static void fireplace_switch(SHARED_MEMORY *shmem)
{
    volatile FIREPLACE *tmp = shmem->access.poky;
    shmem->access.poky = shmem->access.speedy;
    shmem->access.speedy = tmp;
}
/* poky.h */
typedef struct {
    int64_t voltage_sum;
    int64_t current_sum;
    uint32_t count;
} POKY_STATE;

inline static void poky_sum_init(POKY_STATE *pstate)
{
    pstate->voltage_sum = 0;
    pstate->current_sum = 0;
    pstate->count = 0;
}
/* poky.c */

#include "fireplace.h"
#include "poky.h"

void poky_init(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    poky_sum_init(pstate);
    fireplace_init(&shmem->fireplaces[0]);
    fireplace_init(&shmem->fireplaces[1]);
    shmem->access.poky   = &shmem->fireplaces[0];
    shmem->access.speedy = &shmem->fireplaces[1];
    shmem->switch_request = false;
}

void poky_step(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    /* called during the main loop */

    // Skip if Speedy hasn't processed the switch request
    if (!shmem->switch_request)
    {
        volatile FIREPLACE *mine = shmem->access.poky;
        if (mine->count > 0)
        {
            // Speedy has accumulated samples! 
            // Let's accumulate that into an overall sum.
            pstate->voltage_sum += mine->voltage_sum;
            pstate->current_sum += mine->current_sum;
            pstate->count += mine->count;

            // Now zero out the accumulators 
            // so we can switch fireplaces next time.
            fireplace_init(mine);
        }
        shmem->switch_request = true;
    }

    // If we get a message asking for the sums, send them and zero them out.
    if (should_we_transmit_sums())
    {
        transmit_sums(pstate->voltage_sum, pstate->current_sum, pstate->count);
        poky_sum_init(pstate);
    }
}
/* speedy.c */

#include "fireplace.h"

typedef struct {
    FIREPLACE private_fireplace;
} SPEEDY_STATE;

void speedy_init(SPEEDY_STATE *pstate)
{
    fireplace_init(&pstate->private_fireplace);
}

void speedy_step(SPEEDY_STATE *pstate, SHARED_MEMORY *shmem)
{
    // read ADC
    int16_t current = read_adc(ADC_CURRENT);
    int16_t voltage = read_adc(ADC_VOLTAGE);

    FIREPLACE *my_own = &pstate->private_fireplace;
    my_own->voltage_sum += voltage;
    my_own->current_sum += current;
    ++my_own->count;

    if (shmem->switch_request)
    {
        // Time to switch fireplaces! Put latest stats in the fireplace
        volatile FIREPLACE *mine = shmem->access.speedy;
        mine->voltage_sum = my_own->voltage_sum;
        mine->current_sum = my_own->current_sum;
        mine->count = my_own->count;
        fireplace_switch(shmem);
        shmem->switch_request = false;

        fireplace_init(my_own);
    }
}

通过这种方法,Speedy可以在自己的私人壁炉中进行堆积’t declared 易挥发的. When it’是时候换壁炉了’s when Speedy copies its accumulated statistics from its private fireplace into the shared fireplace, and triggers the switch, clearing the switch_request flag.

通过执行时间的权衡,示例2可以允许编译器在壁炉切换之外实现更快的ISR(因为它可以优化对Speedy的访问)’的私人壁炉’t 易挥发的), but slower worst-case when switching is necessary (because it has to copy the data). Example 1 has consistent ISR timing but it may be slower on average (when switching is not necessary).

完整的例子

我已经以 我的Github帐户上的MPLAB X项目. These also include a primitive testing facility where Speedy executes from a timer interrupt and Poky executes from the main loop, running some number iter_count_end of iterations which default to 500. Instead of using the real ADC, I provided a source of mock ADC readings from a 线性同余生成器 for a reproducible series of samples that I could repeat sequentially to check the correctness of the results. I ran both examples in the MPLAB X simulator with iter_count_end = 16384 和 got correct sums in both cases. Does this prove correctness? Absolutely not, but if I had found a bug it would have given me a chance to troubleshoot.

其他注意事项

大学教师’不要过度使用共享内存!

不管你’要使用这些方法之一或其他方法,请尽量减少共享内存中由多个线程访问的程序状态的数量。它’编写正确的程序要容易得多’不必担心共享变量。这不仅适用于您,还适用于编译器,当编译器可以假设只有一个线程访问内存区域时,它可以自由地创建优化的程序实现。所以如果你’即使在简单的Speedy / Poky系统中,也可以使用线程之间的共享内存,并且您’re using 易挥发的 or _Atomic or some kind of synchronization mechanism, don’只是丢掉你所有的程序’s state into the concurrency bucket and use 易挥发的 or _Atomic everywhere. That’s overkill and you’可能会遭受不必要的性能损失。

单向与双向

这些示例说明了单向数据流:Speedy正在向Poky提供信息。那里’s no reason it can’t be bidirectional —例如,Poky向Speedy发送电压命令(对于可变DC / DC转换器),而Speedy发送Poky电压和电流反馈。

In the bidirectional case, if you are really stingy about memory use, the fireplace can contain a union of the unidirectional data used, for example:

typedef struct {
    union {
        struct {
            int16_t voltage_command;
        } to_speedy;
        struct {
            int16_t voltage_feedback;
            int16_t current_feedback;
        } to_poky;
    } u;

    /* things that aren't unidirectional go here */
} FIREPLACE;

so that Poky only reads from mine->u.to_poky 和 only writes to mine->u.to_speedy, whereas Speedy only reads from mine->u.to_speedy 和 only writes to mine->u.to_poky. This adds a couple of requirements, so it’在大多数情况下,这可能不是一个好的解决方案,尤其是在仅交换几个字节的数据的情况下:

  • 两个进程都不能依赖壁炉作为持久状态变量的存储位置—相反,它只能用于通讯
  • 在切换壁炉之前,必须满足以下条件之一
    • 两个壁炉的内容必须有效(这意味着每个进程都必须更新内容)
    • 每个壁炉都包含一个“valid”指示壁炉中数据是否有效的标志,并且两个进程都必须设置或清除有效标志

原子度要求

我之前提到原子性。这两个示例中的原子性要求非常小:

  • 迅速’s code has 原子性要求,因为它永远不会被Poky打断。
  • ky’s代码仅对用于同步的数据具有原子性要求:
  • in example 1, this is the assignment to the pointer shmem->access.speedy in 壁炉_switch()
  • in example 2, this is reading and writing of the flag shmem->switch_request (it’s hard to imagine a bool read or write being non-atomic, but I try to make no assumptions here)

测试(是,在那里’s the Rub)

由于并发机制具有不确定性,因此很难测试。仅当进程A和B以相对特定的顺序执行其指令时,才会遇到一个微小的错误。

我将提出一个想法作为促进测试的一种方式。

壁炉可以包含一个保护变量,例如:

  • ky sets the guard to 0xDEAD just before it starts writing to data in its fireplace, and sets it back to 0x0000 just before it switches fireplaces (or signals a fireplace switch request). This sets up an invariant that Speedy should always see 0x0000 in its fireplace and never anything else, and that the guard is set to 0xDEAD when Poky is writing to Poky’在壁炉旁。
  • 快速应检查警卫;如果看到非零值,则表示存在错误,应采取适当的措施。 (在偏执的情况下,触发系统范围的错误;出于调查目的,增加一个可以报告给开发人员以进行进一步调查的计数器)

Isn’这和双缓冲一样吗?

双缓冲是计算机图形学中的经典技术 两个进程共享一对缓冲区以更新显示。缓冲器代表显示像素的网格。一个过程负责将形状绘制到一个缓冲区上。另一个过程显示其他缓冲区的内容。这样,绘制过程就可以花一些时间,直到缓冲区完成并准备好移交给显示过程为止。为了更新显示,绘图过程将切换缓冲区。结果,您可以减少显示闪烁的机会—特别是如果使用类似 页面交换 .

双缓冲的另一个用途是 DMA ,有时也称为乒乓缓冲区,其中一些硬件过程(如ADC)用于用一系列样本逐渐填充两个缓冲区中的一个。当缓冲区已满时,将交换缓冲区,ADC会继续填充另一个缓冲区,而其他软件将处理第一个缓冲区的内容。

从结构上讲,旋转壁炉技术是双重缓冲的一种形式,但它明确地针对并发性,并且壁炉并非真正“buffers”由元素的均质数组组成,就像我刚才概述的用于双缓冲的两种情况一样。

为什么这比使用同步标志保护共享内存更好?

旋转壁炉允许双方处理其数据,而不必担心数据损坏或数据传输无效;交换壁炉是交换发生的地方。

假设您要使用共享内存和一个非阻塞易失事件标志READY来将电压命令从Poky传输到Speedy:

  • 为了使Poky更新命令:
    • ky sets READY to FALSE
    • ky writes a new voltage command to shared memory
    • ky sets READY to TRUE
  • 为了让Speedy能够执行以下命令: 。快速检查
    • 如果READY为TRUE,则Speedy将命令复制到Speedy’s own internal state
    • 如果READY为FALSE,Speedy将忽略该命令,并依赖于其内部状态的最后一个副本

这在理论上是可行的,但在最坏的时序情况下,Speedy可能会反复看到READY = FALSE,并被迫处理陈旧的命令。

另一个方向需要类似的内容:

  • 为了让Poky从共享内存中读取反馈:
    • ky sets READY to FALSE
    • ky reads what it needs from shared memory
    • ky sets READY to TRUE
  • 为了让Speedy更新其反馈: 。快速检查
    • 如果READY为TRUE,Speedy将从其内部状态更新共享内存
    • 如果READY为FALSE,Speedy会默默地流下眼泪,因为它可以’不能做任何事情,必须等到下一个ISR再试一次

同样,在最差的计时情况下,Speedy可能会反复看到READY = FALSE,使Poky拥有非常陈旧的数据

旋转壁炉将发送者和接收者隔离开来,因此不会被迫共享单个存储区域。

免责声明

最后,重要的免责声明:

并发很难。您可以自由使用这些机制,但是我没有提供铁定的证据证明它们可以免受竞争条件或其他并发危害的影响。我也不保证此处的示例代码没有错误。我在实际系统上使用过这种类型的机制,但是它们是专有的,我不得不在这里解释这些想法,而不是发布专有代码。

关于内存模型和并发问题,我对C标准和微控制器的保证和挑战的表述并不具有权威性。在这里报告这些细微之处之前,我已经尽力了调查这些细微之处。如果我有任何错误或误导性陈述,请引起我注意–我深表歉意,并将尽其所能纠正。

在将新的并发机制合并到您自己的系统中之前,请执行您自己的尽职调查。 (或唐’t,并依靠RTOS的机制来满足您的需求。)

包起来

我们了解了嵌入式控制系统中的一种常见模式,以定期执行的中断服务例程为例— aka Speedy — and a main loop — aka Poky —需要交换数据。 Speedy可以打断Poky,但Poky永远不会打断Speedy。

我们探索了一种称为“旋转壁炉”的双缓冲变体,它允许Speedy和Poky之间的通信通过允许它们分别从单独的内存部分进行读取或写入(“fireplace”),并且通常在交换指针时切换对存储区的访问时发生数据交换。这使我们将并发工作集中在切换机制上。 Speedy可以直接切换壁炉,也可以升起一个标志以表示Poky可以切换壁炉。

我们谈到了支持旋转壁炉所需的一些并发要求:

  • 可通过顺序一致性建模的微控制器(无乱序执行)
  • the use of 易挥发的 to restrict the compiler from optimizing out reads or writes, and from reordering with respect to other reads and writes of 易挥发的 variables
  • 在壁炉切换机制中需要原子更新

我们深入研究了C11的地下室’s <stdatomic.h> 和 talked about alternatives for those of us who don’t have access to a C11 compiler. (Note again: if you can use the <stdatomic.h> or C++11’s std::atomic<>, it’可能值得这样做—编译器可以保证某些正确性,这些保证比手动替代方法更强大和更简单。)

I’d很想听听您在资源有限的嵌入式系统中使用此或其他并发机制的经验!

致谢

谢谢 马修·埃斯莱曼 和约翰·佩森(John Payson)的意见和建议。


©2020 Jason M. Sachs,保留所有权利。


要发布对评论的回复,请单击每个评论所附的“回复”按钮。要发布新评论(而不是回复评论),请查看评论顶部的“写评论”标签。

注册后,您可以参加所有相关网站上的论坛,并获得所有pdf下载的访问权限。

注册

我同意 使用条款 隐私政策 .

试试我们偶尔但很受欢迎的时事通讯。非常容易退订。
或登录