为什么 ZeroMQ PUB 在没有连接订阅者的情况下排队消息? (嗯,"disconnected" SUB-s)

Why is ZeroMQ PUB enqueing messages with no connected subscribers? ( Well, "disconnected" SUB-s )

我发现使用 ZMQ_PUB.

时出现奇怪的行为

我有一个生产者,它 .connect()-s 到不同的进程
.bind()ZMQ_SUB 套接字上。

订阅者全部.bind(),发布者.connect()-s.

当生产者启动时,它会创建一个 ZMQ_PUB 套接字并将其 .connect()-s 到不同的进程。然后它会立即开始定期发送消息。

正如预期的那样,如果没有连接的订阅者,它会丢弃所有消息,直到订阅者启动。

流程正常,当订阅者启动时,它会从那一刻起接收消息。

现在的问题是:

  1. 我断开订阅者(停止进程)。
  2. 此时没有活跃的订阅者,因为我停止了唯一的订阅者。生产者继续发送消息,这些消息应该被丢弃,因为不再有连接的订阅者了……
  3. 我重新启动原始订阅者,它绑定,发布者重新连接...并且订阅者接收同时产生的所有消息!!

所以我看到的是生产者在订阅者关闭时将所有消息排队。一旦套接字重新连接,因为订阅者进程重新启动,它发送了所有排队的消息。

正如我从 here 了解到的,当没有连接的订阅者时,发布者应该丢弃所有已发送的消息:

ZeroMQ examples

"A publisher has no connected subscribers, then it will simply drop all messages."

为什么会这样?

顺便说一下,我在 linux 上使用 C++ 进行这些测试。

我尝试在订阅者绑定时设置不同的身份,但没有成功。发布者仍然对消息进行排队,并在订阅者重新启动时将它们全部传递。

提前致谢,

路易斯


更新:

IMPORTANT UPDATE!!!!!
Before posting this question
I had tried different solutions. One was to set ZMQ_LINGER to 0, which didn't work.
I added ZMQ:IMMEDIATE, and it worked, but I just found out that ZMQ:IMMEDIATE alone does not work. It requires also ZMQ_LINGER.

更新: 根据要求,我添加了一些简单的测试用例来表明我的观点。 一种是简单的订阅者,它在命令行上运行并接收要绑定的 uri,例如:

$ ./sub tcp://127.0.0.1:50001

另一个是发布者,它接收要连接的 uris 列表,例如:

./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002

订阅者最多收到5条消息,然后关闭套接字并退出。我们可以在 wireshark 上看到双向 FIN/ACK 的交换,以及套接字如何移动到 TIME_WAIT 状态。然后,发布者开始发送 SYN,尝试重新连接(探测 ZMQ_PUB 知道连接已关闭)

我明确表示不会取消订阅套接字,只是关闭它。在我看来,如果套接字关闭,发布者应该自动结束对该连接的任何订阅。

所以我看到的是:我启动订阅者(一个或多个),我启动发布者,发布者开始发送消息。订阅者收到 5​​ 条消息并结束。与此同时,发布者继续发送消息,没有连接的订阅者。我重新启动订阅者,并立即收到几条消息,因为它们在发布者端排队。我认为那些排队的消息打破了 Publish/Subscribe 模型,其中消息应该只传递给连接的订阅者。如果订阅者关闭连接,则应该丢弃发送给该订阅者的消息。甚至,当订阅者重新启动时,它可能决定订阅其他消息,但它仍然会收到绑定在同一端口的 "previous encarnation" 订阅的消息。

我的建议是 ZMQ_PUB(在连接模式下),当检测到套接字断开连接时,应该清除该套接字上的所有订阅,直到它重新连接并且新订阅者决定重新订阅。

我为语言错误道歉,但英语不是我的母语。

酒吧代码:

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc < 2 )
    {
        fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",   
        basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int i;
    try
    {
        for ( i = 1; i < argc; i++ )
        {
            printf( "Connecting to [%s]\n", argv[i] );
            {
                pSocket->connect( argv[i] );
            }
        }
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
        exit ( EXIT_FAILURE );
    }

    printf( "Publisher Up and running... sending messages\n" );
    fflush(NULL);

    int msgCounter = 0;
    do
    {
        try
        {
            char msgBuffer[1024];
            sprintf( msgBuffer, "Message #%d", msgCounter++ );
            zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
            printf("Sending message [%s]\n", msgBuffer );
            pSocket->send ( outTask );
            sleep( 1 );
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( true );

    exit ( EXIT_SUCCESS );
}

订阅者代码

#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>

#include <string>
#include <zeromq/zmq.hpp>

int main( int argc, char *argv[] )
{
    if ( argc != 2 )
    {
        fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
        exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
    if ( NULL == pSocket )
    {
        fprintf( stderr, "Couldn't create socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }
    try
    {
        pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
        pSocket->bind( pLocalUri.c_str() );
    }
    catch( ... )
    {
        fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
        exit ( EXIT_FAILURE );
    }

    int msgCounter = 0;
    printf( "Subscriber Up and running... waiting for messages\n" );
    fflush( NULL );

    do
    {
        try
        {
            zmq::message_t inTask;
            pSocket->recv ( &inTask );
            printf( "Message received : [%s]\n", inTask.data() );
            fflush( NULL );
            msgCounter++;
        }
        catch( ... )
        {
            fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
            exit ( EXIT_FAILURE );
        }
    }
    while ( msgCounter < 5 );

    // pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
    pSocket->close();
    exit ( EXIT_SUCCESS );
}

Bind和Connect虽然无所谓,但在这里有特定的含义。

选项 1:

把你的代码改成这样就没问题了:

  1. 发布者应该bind一个地址
  2. 订户应该connect到那个地址

'因为如果你绑定一个订阅者然后中断它,发布者就没有办法知道订阅者是未绑定的,所以它会将消息排队到绑定的端口,当你在同一个端口上再次重新启动时,排队的消息将被耗尽。

选项 2:

但是如果你想按照自己的方式去做,你需要做以下事情:

  1. 在订阅者代码中注册一个中断处理程序(SIGINT
  2. 在订阅者中断时执行以下操作:
    • unsubscribe 话题
    • close子套接字
    • 最好使用 0 return 代码干净地退出订阅者进程

更新:

关于标识这一点,不要假设设置标识将唯一标识一个连接。如果留给 zeromq,它将使用唯一的任意数字分配传入连接的标识。

身份通常不用于回复客户。它们用于在使用 ROUTER 套接字的情况下响应客户端。 '因为 ROUTERsocket 是异步的,而 REQ/REP 是同步的。在 Async 中,我们需要知道我们回应了谁。可以是n/w地址或随机数或uuid等

更新:

我不认为这是 zeromq 的问题,因为在整个指南中 PUB/SUB 的解释方式是发布者通常是静态的(服务器并绑定到端口)并且订阅者来来去去方式(连接到端口的客户端)。

还有一个选项完全符合您的要求 ZMQ_IMMEDIATEZMQ_DELAY_ATTACH_ON_CONNECT

在发布者上设置上述套接字选项不会让消息在没有活动连接时进入队列。

问:为什么会这样?

因为SUB实际上仍然连接(不够"disconnected")。

是的,可能令人惊讶,但是 杀死 SUB-进程,在 .bind() - 或 .connect() 插座的 transport-media、 并不意味着 , I/O-pump 的 Finite-State-Machine 有 "moved" 到 disconnected-state.

鉴于此,PUB方别无选择,只能考虑SUB- side 仍然存在并连接(即使进程在 PUB-side 的 line-of-sight 之外被静默杀死)并且对于这样的 "distributed"-state 有一个 ZeroMQ protocol-defined 行为(一个 PUB 方职责)为一个(是的,看不见的死了)SUB 抄写员收集所有临时消息,PUB 方仍然认为可以生存(但可能在某个低处,在运输 I/O-levels 或某些远程 CPU-resources 饥饿或 concurrency-introduced 短暂间歇性 { 本地 | 远程 } 阻塞状态等)。

所以它缓冲...

以防你暗杀SUB-side agent 看起来更优雅一点(使用归零ZMQ_LINGER + 足够的.close() 在 socket-resource 实例上) PUB 端将识别 "distributed" 系统 system-wide Finite-State-Automaton 转变为确实 "DISCONNECT"-ed 状态,并且到期的 change-of-behaviour 将发生在 "distributed-FSA" 的 PUB 侧,不存储此 [=126] 的任何消息=] 确实 "DISCONNECT"-ed SUB —— 正是文档所述。

"Distributed-FSA" 识别 state-change 事件的手段非常薄弱“超出了 localhost 控制范围。KILL-ing 一个远程进程,它实现了 "distributed-FSA" 的一些显着部分是一个毁灭性的事件,而不是如何保持系统工作的方法。对于这种外部风险的一个好的选择可能是


听起来很复杂?

哦,是的,确实很复杂。这正是 ZeroMQ 为我们解决这个问题的原因,让我们自由并享受基于这些(已经解决的)低级复杂性设计我们的应用程序架构。


Distributed-system FSA(system-wide FSA 的分层组合 sub-FSA-s )

想象一下引擎盖下悄悄发生的事情,想象一下只有一对简单的串联 FSA-FSA - 正是 .Context()[=104 对=] 实例试图在最简单的 1:1 PUB/SUB 场景中为我们处理,其中 use-case KILL-s 所有 sub-FSA-s 在 SUB 侧,而没有尝试承认 PUB 侧的意图。甚至 TCP-protocol(生活在 PUB 侧和 SUB 侧)也有几个 state-transition 来自 [ESTABLISHED]到[CLOSED]状态。


快速 X-ray 查看 distributed-systems' FSA-of-FSA-s

(为清楚起见,仅描述了 TCP-protocol FSA)

PUB-边:



.socket( .. ) 实例的行为 FSA:


SUB-边:

(由 nanomsg 提供)。