如何使用 N 个生产者(连接)和 1 个消费者(绑定)构建一个简单的 zeroMQ 单向推拉模式 - MT4 到 Python

How to structure a plain zeroMQ one way Push-Pull pattern with N-producers (connect) and 1-consumer (bind) - MT4 to Python

多个用户将数据从 Metatrader 4 推送到 Python 后端。因此,Metatrader 4(= 生产者)的 运行ning 个实例的数量是动态的。 MT4 的每个实例都会 运行 在用户本地机器上,创建它们自己的 zmq.context 和 zmq.socket(推送)。所以我们基本上得到了一个 N 对 1 的结构:

|--------------------生产者1-----生产者2-----生产者3-----生产者4-----生产者X------------------------------------ ----------|--------------------|----------------|-- --------------|--------------------|------------ --------------
................................ ...............|----------------Consumer/Backend---------------- --|................................................

也就是说,消费者将 运行 24/7 并等待来自任何生产者的数据。如果有 160 个用户在线使用该服务,则消费者将必须处理来自这 160 个节点的数据输入,依此类推。接收消息的顺序无关紧要,如果某些消息偶尔丢失也无关紧要。

当前状态:
现在我的代码(见下文)运行在我自己的本地机器上使用一个 Metatrader 4 实例非常好。但是,如果有 X 个 MT4 实例,并且所有实例都有自己的 zmq 上下文并在本地 metatrader 4 实例中推送套接字以发送数据,会发生什么情况?它也会按预期工作吗?

代码生产者端(Metatrader 4 实例),每个用户将在他的本地计算机上运行:

extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "localhost";
extern int PUSH_PORT = 32225;

// CREATE ZeroMQ Context
Context context(PROJECT_NAME);

// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {

//---

   EventSetTimer(1);     // Set Second Timer as push intervall

   context.setBlocky(false);

   // Send data to PULL_PORT that consumer is listening to.
   Print("[PUSH] Connecting MT4 Dashex Feeder to Dashboard on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   pushSocket.setSendHighWaterMark(1);
   pushSocket.setLinger(0);   

   return(INIT_SUCCEEDED);
  }

//---

void OnDeinit(const int reason)
{

//---

   Print("[PUSH] Disconnecting MT4 Dashex Feeder on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   // Shutdown ZeroMQ Context
   context.shutdown();
   context.destroy(0);

   EventKillTimer();
}

[...]

pushSocket.send(StringFormat("%s", account_info, true));

Python 后端消费者:

# create zeroMQ Pull Socket to fetch data out of the wire
context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32225")
time.sleep(1)

while True:
    # check 24/7 for available data in the pull socket
    try:
        [...]

问题:

1.) 当前代码在用户本地计算机上为每个 metatrader 4 实例创建一个新的上下文和套接字 运行,并在 MT4 启动后再次关闭它们关闭。多个 MT4 instances/push 套接字是否可以同时向同一个消费者发送数据? (这是我想要的结果)

2.) 另外,每个终端实例是否可以使用相同的套接字端口,或者每个终端实例是否需要一个唯一的套接字端口?

3.) 我应该使用不同的模式吗?

提前非常感谢您的想法和意见。

  1. 如果您使用 inproc 传输,您只需要共享上下文。当您使用 tcp 时,它将按您预期的方式工作。
  2. 终端将 connect() 与 python 后端指定的端口 bind()。所以是的,所有终端都将连接到代码中指定的端口 32225。
  3. push/pull 模式似乎是一个不错的选择,应该可以正常工作。如果您担心 python 后端的开销(许多连接),您可以在终端和后端之间放置一个 zeromq 代理(PULL/PUSH)代理进程。