无法使用 zeroMQ 在拉式套接字上接收数据,"Address in use" - 错误

Can't receive data on pull socket using zeroMQ, "Address in use" - Error

我目前尝试使用 zeroMQ 设置一个简单的推拉式套接字架构,而 Metatrader 4 (MQL) 充当 publisher,我的 Python 后端充当 consumer

我每秒从 Metatrader 4 终端推送数据,效果很好。但是,我很难通过拉式插座接收数据。一旦我尝试从线路中提取数据,原子 script 包就会引发错误 address already in use.

我在开发过程中 运行 MT 4 终端和本地机器上的 Python 脚本。

Metatrader 4:

extern string PROJECT_NAME = "Dashex.Feeder";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "*";
extern int PUSH_PORT = 32220;

extern string t0 = "--- Feeder Parameters ---";
input string DID = "insert your DID here";
extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;

// ZeroMQ environment //

// CREATE ZeroMQ Context
Context context(PROJECT_NAME);

// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);

string Publish_Symbols[7] = {
   "EURUSD","GBPUSD","USDJPY","USDCAD","AUDUSD","NZDUSD","USDCHF"
};

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {
//---

   EventSetTimer(1);     // Set Millisecond Timer to get client socket input

   context.setBlocky(false);

   // Send responses to PULL_PORT that client is listening on.
   Print("[PUSH] Connecting MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   pushSocket.setSendHighWaterMark(1);
   pushSocket.setLinger(0);   

//---
   return(INIT_SUCCEEDED);
  }

//+------------------------------------------------------------------+
//| Expert deinitialization function                                 |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
//---

   Print("[PUSH] Disconnecting MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   // Shutdown ZeroMQ Context
   context.shutdown();
   context.destroy(0);

   EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert tick function                                             |
//+------------------------------------------------------------------+
void OnTimer()
{
   /*
      Use this OnTimer() function to send market data to consumer.
   */
   if(!IsStopped() && Publish_MarketData == true)
   {
      for(int s = 0; s < ArraySize(Publish_Symbols); s++)
      {
         // Python clients can subscribe to a price feed by setting
         // socket options to the symbol name. For example:

         string _tick = GetBidAsk(Publish_Symbols[s]);
         Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUSH Socket");
         ZmqMsg reply(StringFormat("%s %s", Publish_Symbols[s], _tick));
         pushSocket.send(reply, true);
      }
   }
}
//+------------------------------------------------------------------+

string GetBidAsk(string symbol) {

   MqlTick last_tick;

   if(SymbolInfoTick(symbol,last_tick))
   {
       return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
   }

   // Default
   return "";
}

推送数据按预期工作:

Python 基拉插座:

import zmq
import time

context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32220")
time.sleep(1)

while True:
    result = zmq_socket.recv()
    print(result)
    time.sleep(1)

这是 script 在控制台中报告的内容:

Netstat 输出:

注意:当我同时终止 Metatrader 推送脚本和 python 脚本时,端口在 netstats 中仍标记为 "listened"。当我在两个实例中将端口更改为 32225(或任何其他端口)并重新 运行 它们时,我再次遇到相同的错误。如果我首先 运行 拉动实例,我会在原子 script 中弹出一个沙漏,然后我 运行 MT4 推动实例在拉动侧没有任何反应。当我重新 运行 拉实例时,我再次遇到同样的错误。

更新:

后台的python.exe实例占用了端口。我关闭了 python 执行,端口又被释放了。当我现在 运行 我的 pull 实例时,我收到以下控制台反馈:

1.)

2.)

然后我 运行 工作正常的推送实例。

3.)

拉动实例仍然显示沙漏并且不在控制台中打印任何数据:

4.)

然后当我重新运行 拉取实例时,它会引发错误 address in use 这现在很有意义,因为 Python 仍在使用该端口背景。

但是为什么在拉取端没有打印任何数据?我是否必须更改拉取客户端代码才能"grab"推送数据?

问题在于您的 PUSH 代码,此处:

extern string HOSTNAME = "*";

虽然您可以在 bind URL 中合法地使用 * 作为主机名组件(在这种情况下它意味着 "listen on all addresses"),但它不会产生任何影响connect 调用中的意义:您必须提供有效的主机名或 IP 地址。

如果您将代码修改为:

extern string HOSTNAME = "localhost";

它可能会按预期工作。

这是我用来测试您的 PULL 代码的简单 Python PUSH 客户端;如果你 运行 这个和 运行 你的 PULL 代码(在你的问题中发布),一切都有效:

import time
import zmq

c = zmq.Context()
s = c.socket(zmq.PUSH)

s.connect('tcp://localhost:32220')

i = 0
while True:
    s.send_string('this is message {}'.format(i))
    i += 1
    time.sleep(0.5)