无法使用 zeroMQ 在拉式套接字上接收数据,"Address in use" - 错误
Can't receive data on pull socket using zeroMQ, "Address in use" - Error
我目前尝试使用 zeroMQ 设置一个简单的推拉式套接字架构,而 Metatrader 4 (MQL) 充当 publisher
,我的 Python 后端充当 consumer
。
我每秒从 Metatrader 4 终端推送数据,效果很好。但是,我很难通过拉式插座接收数据。一旦我尝试从线路中提取数据,原子 script
包就会引发错误 address already in use
.
我在开发过程中 运行 MT 4 终端和本地机器上的 Python 脚本。
Metatrader 4:
extern string PROJECT_NAME = "Dashex.Feeder";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "*";
extern int PUSH_PORT = 32220;
extern string t0 = "--- Feeder Parameters ---";
input string DID = "insert your DID here";
extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;
// ZeroMQ environment //
// CREATE ZeroMQ Context
Context context(PROJECT_NAME);
// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);
string Publish_Symbols[7] = {
"EURUSD","GBPUSD","USDJPY","USDCAD","AUDUSD","NZDUSD","USDCHF"
};
//+------------------------------------------------------------------+
//| Expert initialization function |
//+------------------------------------------------------------------+
int OnInit()
{
//---
EventSetTimer(1); // Set Millisecond Timer to get client socket input
context.setBlocky(false);
// Send responses to PULL_PORT that client is listening on.
Print("[PUSH] Connecting MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
pushSocket.setSendHighWaterMark(1);
pushSocket.setLinger(0);
//---
return(INIT_SUCCEEDED);
}
//+------------------------------------------------------------------+
//| Expert deinitialization function |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
//---
Print("[PUSH] Disconnecting MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
// Shutdown ZeroMQ Context
context.shutdown();
context.destroy(0);
EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert tick function |
//+------------------------------------------------------------------+
void OnTimer()
{
/*
Use this OnTimer() function to send market data to consumer.
*/
if(!IsStopped() && Publish_MarketData == true)
{
for(int s = 0; s < ArraySize(Publish_Symbols); s++)
{
// Python clients can subscribe to a price feed by setting
// socket options to the symbol name. For example:
string _tick = GetBidAsk(Publish_Symbols[s]);
Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUSH Socket");
ZmqMsg reply(StringFormat("%s %s", Publish_Symbols[s], _tick));
pushSocket.send(reply, true);
}
}
}
//+------------------------------------------------------------------+
string GetBidAsk(string symbol) {
MqlTick last_tick;
if(SymbolInfoTick(symbol,last_tick))
{
return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
}
// Default
return "";
}
推送数据按预期工作:
Python 基拉插座:
import zmq
import time
context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32220")
time.sleep(1)
while True:
result = zmq_socket.recv()
print(result)
time.sleep(1)
这是 script
在控制台中报告的内容:
Netstat 输出:
注意:当我同时终止 Metatrader 推送脚本和 python 脚本时,端口在 netstats 中仍标记为 "listened"。当我在两个实例中将端口更改为 32225(或任何其他端口)并重新 运行 它们时,我再次遇到相同的错误。如果我首先 运行 拉动实例,我会在原子 script
中弹出一个沙漏,然后我 运行 MT4 推动实例在拉动侧没有任何反应。当我重新 运行 拉实例时,我再次遇到同样的错误。
更新:
后台的python.exe
实例占用了端口。我关闭了 python 执行,端口又被释放了。当我现在 运行 我的 pull 实例时,我收到以下控制台反馈:
1.)
2.)
然后我 运行 工作正常的推送实例。
3.)
拉动实例仍然显示沙漏并且不在控制台中打印任何数据:
4.)
然后当我重新运行 拉取实例时,它会引发错误 address in use 这现在很有意义,因为 Python 仍在使用该端口背景。
但是为什么在拉取端没有打印任何数据?我是否必须更改拉取客户端代码才能"grab"推送数据?
问题在于您的 PUSH
代码,此处:
extern string HOSTNAME = "*";
虽然您可以在 bind
URL 中合法地使用 *
作为主机名组件(在这种情况下它意味着 "listen on all addresses"),但它不会产生任何影响connect
调用中的意义:您必须提供有效的主机名或 IP 地址。
如果您将代码修改为:
extern string HOSTNAME = "localhost";
它可能会按预期工作。
这是我用来测试您的 PULL
代码的简单 Python PUSH
客户端;如果你 运行 这个和 运行 你的 PULL
代码(在你的问题中发布),一切都有效:
import time
import zmq
c = zmq.Context()
s = c.socket(zmq.PUSH)
s.connect('tcp://localhost:32220')
i = 0
while True:
s.send_string('this is message {}'.format(i))
i += 1
time.sleep(0.5)
我目前尝试使用 zeroMQ 设置一个简单的推拉式套接字架构,而 Metatrader 4 (MQL) 充当 publisher
,我的 Python 后端充当 consumer
。
我每秒从 Metatrader 4 终端推送数据,效果很好。但是,我很难通过拉式插座接收数据。一旦我尝试从线路中提取数据,原子 script
包就会引发错误 address already in use
.
我在开发过程中 运行 MT 4 终端和本地机器上的 Python 脚本。
Metatrader 4:
extern string PROJECT_NAME = "Dashex.Feeder";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "*";
extern int PUSH_PORT = 32220;
extern string t0 = "--- Feeder Parameters ---";
input string DID = "insert your DID here";
extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;
// ZeroMQ environment //
// CREATE ZeroMQ Context
Context context(PROJECT_NAME);
// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);
string Publish_Symbols[7] = {
"EURUSD","GBPUSD","USDJPY","USDCAD","AUDUSD","NZDUSD","USDCHF"
};
//+------------------------------------------------------------------+
//| Expert initialization function |
//+------------------------------------------------------------------+
int OnInit()
{
//---
EventSetTimer(1); // Set Millisecond Timer to get client socket input
context.setBlocky(false);
// Send responses to PULL_PORT that client is listening on.
Print("[PUSH] Connecting MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.connect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
pushSocket.setSendHighWaterMark(1);
pushSocket.setLinger(0);
//---
return(INIT_SUCCEEDED);
}
//+------------------------------------------------------------------+
//| Expert deinitialization function |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
{
//---
Print("[PUSH] Disconnecting MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
// Shutdown ZeroMQ Context
context.shutdown();
context.destroy(0);
EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert tick function |
//+------------------------------------------------------------------+
void OnTimer()
{
/*
Use this OnTimer() function to send market data to consumer.
*/
if(!IsStopped() && Publish_MarketData == true)
{
for(int s = 0; s < ArraySize(Publish_Symbols); s++)
{
// Python clients can subscribe to a price feed by setting
// socket options to the symbol name. For example:
string _tick = GetBidAsk(Publish_Symbols[s]);
Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUSH Socket");
ZmqMsg reply(StringFormat("%s %s", Publish_Symbols[s], _tick));
pushSocket.send(reply, true);
}
}
}
//+------------------------------------------------------------------+
string GetBidAsk(string symbol) {
MqlTick last_tick;
if(SymbolInfoTick(symbol,last_tick))
{
return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
}
// Default
return "";
}
推送数据按预期工作:
Python 基拉插座:
import zmq
import time
context = zmq.Context()
zmq_socket = context.socket(zmq.PULL)
zmq_socket.bind("tcp://*:32220")
time.sleep(1)
while True:
result = zmq_socket.recv()
print(result)
time.sleep(1)
这是 script
在控制台中报告的内容:
Netstat 输出:
注意:当我同时终止 Metatrader 推送脚本和 python 脚本时,端口在 netstats 中仍标记为 "listened"。当我在两个实例中将端口更改为 32225(或任何其他端口)并重新 运行 它们时,我再次遇到相同的错误。如果我首先 运行 拉动实例,我会在原子 script
中弹出一个沙漏,然后我 运行 MT4 推动实例在拉动侧没有任何反应。当我重新 运行 拉实例时,我再次遇到同样的错误。
更新:
后台的python.exe
实例占用了端口。我关闭了 python 执行,端口又被释放了。当我现在 运行 我的 pull 实例时,我收到以下控制台反馈:
1.)
2.)
然后我 运行 工作正常的推送实例。
3.)
拉动实例仍然显示沙漏并且不在控制台中打印任何数据:
4.)
然后当我重新运行 拉取实例时,它会引发错误 address in use 这现在很有意义,因为 Python 仍在使用该端口背景。
但是为什么在拉取端没有打印任何数据?我是否必须更改拉取客户端代码才能"grab"推送数据?
问题在于您的 PUSH
代码,此处:
extern string HOSTNAME = "*";
虽然您可以在 bind
URL 中合法地使用 *
作为主机名组件(在这种情况下它意味着 "listen on all addresses"),但它不会产生任何影响connect
调用中的意义:您必须提供有效的主机名或 IP 地址。
如果您将代码修改为:
extern string HOSTNAME = "localhost";
它可能会按预期工作。
这是我用来测试您的 PULL
代码的简单 Python PUSH
客户端;如果你 运行 这个和 运行 你的 PULL
代码(在你的问题中发布),一切都有效:
import time
import zmq
c = zmq.Context()
s = c.socket(zmq.PUSH)
s.connect('tcp://localhost:32220')
i = 0
while True:
s.send_string('this is message {}'.format(i))
i += 1
time.sleep(0.5)