如何防止进程间生产者消费者消息传递机制因一侧崩溃而损坏?

How can an interprocess producer consumer message passing mechanism be protected against corruption due to one side crashing?

我在 Windows 上为一个生产者和一个消费者在共享内存中实现了一个进程间消息队列。

我正在使用一个命名信号量来计算空槽,一个命名信号量来计算满槽,一个命名互斥量来保护共享内存中的数据结构。

例如考虑消费者方面。生产方类似。 首先,它等待完整的信号量,然后 (1) 它从互斥量下的队列中获取消息,然后它向空信号量发出信号 (2)

问题:

如果消费者进程在 (1) 和 (2) 之间崩溃,那么实际上队列中进程可以使用的槽数会减少一个。 假设当消费者宕机时,生产者可以处理队列被填满的情况。 (它可以在等待空信号量时指定超时,甚至可以指定 0 表示不等待)。

消费者重启后可以继续从队列中读取数据。数据不会溢出,但即使在它清空所有已满槽后,生产者也将少一个可用槽。

多次这样的重启后,队列将没有可用的插槽,也无法发送消息。

问题:

如何避免或恢复这种情况?

这里概述了一种使用事件而不是信号量的简单方法:

DWORD increment_offset(DWORD offset)
{
    offset++;
    if (offset == QUEUE_LENGTH*2) offset = 0;
    return offset;
}

void consumer(void)
{
    for (;;)
    {
        DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0);

        if ((current_write_offset != *read_offset + QUEUE_LENGTH) && 
            (current_write_offset + QUEUE_LENGTH != *read_offset))
        {
            // Queue is not full, make sure producer is awake
            SetEvent(signal_producer_event);
        }

        if (*read_offset == current_write_offset)
        {
            // Queue is empty, wait for producer to add a message
            WaitForSingleObject(signal_consumer_event, INFINITE);
            continue;
        }

        MemoryBarrier();
        _ReadWriteBarrier;

        consume((*read_offset) % QUEUE_LENGTH);

        InterlockedExchange(read_offset, increment_offset(*read_offset));
    }
}

void producer(void)
{
    for (;;)
    {
        DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);

        if (current_read_offset != *write_offset)
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(signal_consumer_event);
        }

        if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
            (*write_offset + QUEUE_LENGTH == current_read_offset))
        {
            // Queue is full, wait for consumer to remove a message
            WaitForSingleObject(signal_producer_event, INFINITE);
            continue;
        }

        produce((*write_offset) % QUEUE_LENGTH);

        MemoryBarrier();
        _ReadWriteBarrier;

        InterlockedExchange(write_offset, increment_offset(*write_offset));
    }
}

备注:

  • 发布的代码可以编译(给出适当的声明)但我还没有测试它。

  • read_offset是指向共享内存中的一个DWORD的指针,表示接下来应该从哪个slot读取。类似地,write_offset 指向共享内存中的一个 DWORD,指示下一个应该写入哪个插槽。

  • QUEUE_LENGTH + x的偏移量与x的偏移量指代相同的时隙,以消除满队列和空队列之间的歧义。这就是 increment_offset() 函数检查 QUEUE_LENGTH*2 而不仅仅是 QUEUE_LENGTH 的原因,也是我们在调用 consume()produce() 函数时取模的原因。 (这种方法的一种替代方法是修改生产者,使其从不使用最后一个可用插槽,但这会浪费一个插槽。)

  • signal_consumer_eventsignal_producer_event 必须是自动重置事件。请注意,设置已设置的事件是空操作。

  • 消费者仅在队列实际为空时才等待其事件,而生产者仅在队列实际已满时才等待其事件。

  • 当任一进程被唤醒时,它必须重新检查队列的状态,因为存在可能导致虚假唤醒的竞争条件。

  • 因为我使用互锁操作,并且因为一次只有一个进程使用任何特定槽,所以不需要互斥体。我加入了内存屏障以确保生产者写入插槽的更改将被消费者看到。如果您对无锁代码不满意,您会发现将所示算法转换为使用互斥锁是微不足道的。

  • 注意 InterlockedCompareExchange(pointer, 0, 0); 看起来有点复杂,但它只是线程安全的等同于 *pointer,即它读取指针处的值。同样,InterlockedExchange(pointer, value);*pointer = value; 相同,但线程安全。根据编译器和目标体系结构,互锁操作可能不是绝对必要的,但性能影响可以忽略不计,因此我建议进行防御性编程。

考虑消费者在调用 consume() 函数期间(或之前)崩溃的情况。当消费者重新启动时,它将再次拾取相同的消息并正常处理它。就生产者而言,没有发生任何异常,只是处理消息的时间比平时长。如果生产者在创建消息时崩溃,则会发生类似的情况;重启后,生成的第一条消息会覆盖掉不完整的消息,消费者不受影响

显然,如果崩溃发生在生产者或消费者调用 InterlockedExchange 之后但调用 SetEvent 之前,并且队列先前分别为空或满,则此时不会唤醒其他进程。但是,一旦崩溃的进程重新启动,它就会被唤醒。您不能丢失队列中的槽,进程也不能死锁。

我认为简单的多生产者单消费者案例看起来像这样:

void producer(void)
{
    for (;;)
    {
        DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);

        if (current_read_offset != *write_offset)
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(signal_consumer_event);
        }

        produce_in_local_cache();

        claim_mutex();

        // read offset may have changed, re-read it
        current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);

        if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
            (*write_offset + QUEUE_LENGTH == current_read_offset))
        {
            // Queue is full, wait for consumer to remove a message
            WaitForSingleObject(signal_producer_event, INFINITE);
            continue;
        }

        copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH);

        MemoryBarrier();
        _ReadWriteBarrier;

        InterlockedExchange(write_offset, increment_offset(*write_offset));

        release_mutex();
    }
}

如果活跃的生产者崩溃了,mutex会被检测为被遗弃;您可以将这种情况视为互斥量已正确释放。如果崩溃的进程达到增加写入偏移量的程度,它添加的条目将照常处理;如果没有,它将被下一个要求互斥量的生产者覆盖。在这两种情况下都不需要任何特殊操作。