Hiredis 发布者只在 while 循环中发送第一条消息

Hiredis publisher only sending first message in a while loop

我是 Hiredis/Redis 的新手,无法在 while 循环中为 Pub/Sub 架构创建工作发布者。
我成功地创建了一个只触发一条消息然后退出的发布者。但我正在尝试让发布者定期发送消息。这是我的发布者:

#include <signal.h>
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>

using namespace std;

void pubCallback(redisAsyncContext *c, void *r, void *privdata) {

  redisReply *reply = (redisReply*)r;
  if (reply == NULL){
    cout<<"Response not recev"<<endl; 
    return;
  }
  cout<<"message published"<<endl;
  redisAsyncDisconnect(c);
}

int main(int argc, char* argv[])
{
    signal(SIGPIPE, SIG_IGN);
    struct event_base* base = event_base_new();
    int status;
    int i = 0;
    redisAsyncContext* _redisContext = redisAsyncConnect("172.17.0.2", 6379);

    if (_redisContext->err) {
        /* Let context leak for now... */
        cout<<"Error: "<< _redisContext->errstr<<endl;
        return 1;
    }
    redisLibeventAttach(_redisContext,base);

    while(1) {
        string command ("publish ");
        command.append("test_channel");
        command.append (" ");
        command.append(to_string(i));
        cout << command << endl;
        status = redisAsyncCommand(_redisContext, 
            pubCallback, 
            (char*)"pub", command.c_str()
        );
        event_base_dispatch(base);
        i+=1;
        usleep(1000000);
    }
}

使用此发布者,仅收到第一条消息 "0",后续命令似乎被忽略。
是否可以在 while 循环中创建发布者发布?我是否必须为每条消息创建一个新连接或 disconnect/reconnect?

我已经成功了。
首先,pubCallback函数中的redisAsyncDisconnect禁止我的程序发送后续消息。此行需要删除。
这导致了另一个问题,因为程序在第一条消息发布后开始挂起。发生这种情况是因为事件循环将挂起,等待新事件的派发。我需要一种方法来在消息发布后立即打破此挂起。
方法是在 pubCallback.

中调用 event_base_loopbreak

这是工作代码:

#include <signal.h>
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>

using namespace std;

void pubCallback(redisAsyncContext *c, void *r, void *privdata) {

  redisReply *reply = (redisReply*)r;
  if (reply == NULL){
    cout<<"Response not recev"<<endl; 
    return;
  }
  cout<<"message published"<<endl;

  redisLibeventEvents *e = (redisLibeventEvents*) c->ev.data;
  event_base_loopbreak(e->base);
}

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        return;
    }
    printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        return;
    }
    printf("Disconnected...\n");
}

int main(int argc, char* argv[])
{
    signal(SIGPIPE, SIG_IGN);
    
    int status;
    int i = 0;
    
    redisAsyncContext* _redisContext = redisAsyncConnect("172.17.0.2", 6379);
    if (_redisContext->err) {
        /* Let context leak for now... */
        cout<<"Error: "<< _redisContext->errstr<<endl;
        return 1;
    }
    struct event_base* base = event_base_new();

    redisAsyncSetConnectCallback(_redisContext,connectCallback);
    redisAsyncSetDisconnectCallback(_redisContext,disconnectCallback);
    redisLibeventAttach(_redisContext,base);

    while(1) {
        string command ("publish ");
        command.append("test_channel");
        command.append (" ");
        command.append(to_string(i));
        cout << command << endl;

        status = redisAsyncCommand(_redisContext, 
            pubCallback, 
            (char*)"pub", command.c_str()
        );

        event_base_dispatch(base);
        i+=1;
        usleep(500000);
    }
}