zeromq + julia:如何设置标志
zeromq + julia: how to set a flag
我在 ZMQ.
包中使用 Julia
我已经成功发送和接收了 ZMQ 模式的消息 Dealer/Router。
这是我收到消息的方式:
dataRecv = bytestring(ZMQ.recv(sockRouter))
但它正在阻塞。我需要的是非阻塞。
在带有 ZeroMQ 的 C++ 中,我们可以这样做:
zmq_msg_recv(&message, socket, 0); // blocking
zmq_msg_recv(&message, socket, ZMQ_DONTWAIT); // non-blocking
在julia,我找到了一个关键字:ZMQ.ZMQ_DONTWAIT
,但我不知道如何使用它。我试过这样:
dataRecv = bytestring(ZMQ.recv(sockRouter, ZMQ.ZMQ_DONTWAIT))
但是我得到一个错误:
'recv' has no method matching recv(::Socket, ::Int64)
那么Julia是不是不能使用非阻塞模式了?
问题 1)
我提出了一个问题 .
如果我是对的,这个问题的答案是关于多线程+阻塞。
我知道这可行,但我更喜欢使用单线程 + 非阻塞。
问题 2)
@Chisholm 谢谢你给我 source of ZMQ.jl.
但是我做了这样的测试:
dataRecv = bytestring(ZMQ.recv(sockRouter))
println("after recv")
如果我执行上面的代码,它会阻塞在recv
。
换句话说,它不会打印 "after recv" 直到我给它发送一条消息。
所以我觉得完全是阻塞模式
临时回答:
根据的回答:
but it's exposed with a blocking API via tasks
我认为我们别无选择,因为只有一个阻塞 API。
如果有人立即发现如何使用非阻塞 API,请留下您的答案,我会接受您的答案。谢谢
查看 ZMQ.jl
处的代码,似乎阻塞是由 :zmq_msg_recv
之后的 wait
引起的,所以这里是 recv
的另一个定义,称为 pollrecv
可以在主模块中定义(不需要更改 ZMQ.jl
):
function pollrecv(socket::ZMQ.Socket,zmsg::Message)
rc = -1
while true
rc = ccall((:zmq_msg_recv, ZMQ.zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, ZMQ.ZMQ_DONTWAIT)
if rc == -1
if !(ZMQ.zmq_errno() == Base.Libc.EAGAIN)
throw(ZMQ.StateError(ZMQ.jl_zmq_error_str()))
end
return false
else
ZMQ.get_events(socket) != 0 && notify(socket)
break
end
end
return true
end
您可以通过以下方式使用此功能:
msg = Message()
while !pollrecv(s1,msg)
sleep(3)
println("ZZzzzz...")
end
out = convert(IOStream,msg)
println(takebuf_string(out))
close(msg)
ZMQ.send(s1,"response important for next receive")
当然,while
使用轮询实现阻塞,sleep
应该换成其他处理。
我在 ZMQ.
包中使用 Julia我已经成功发送和接收了 ZMQ 模式的消息 Dealer/Router。
这是我收到消息的方式:
dataRecv = bytestring(ZMQ.recv(sockRouter))
但它正在阻塞。我需要的是非阻塞。
在带有 ZeroMQ 的 C++ 中,我们可以这样做:
zmq_msg_recv(&message, socket, 0); // blocking
zmq_msg_recv(&message, socket, ZMQ_DONTWAIT); // non-blocking
在julia,我找到了一个关键字:ZMQ.ZMQ_DONTWAIT
,但我不知道如何使用它。我试过这样:
dataRecv = bytestring(ZMQ.recv(sockRouter, ZMQ.ZMQ_DONTWAIT))
但是我得到一个错误:
'recv' has no method matching recv(::Socket, ::Int64)
那么Julia是不是不能使用非阻塞模式了?
问题 1)
我提出了一个问题
如果我是对的,这个问题的答案是关于多线程+阻塞。
我知道这可行,但我更喜欢使用单线程 + 非阻塞。
问题 2)
@Chisholm 谢谢你给我 source of ZMQ.jl.
但是我做了这样的测试:
dataRecv = bytestring(ZMQ.recv(sockRouter))
println("after recv")
如果我执行上面的代码,它会阻塞在recv
。
换句话说,它不会打印 "after recv" 直到我给它发送一条消息。
所以我觉得完全是阻塞模式
临时回答:
根据
but it's exposed with a blocking API via tasks
我认为我们别无选择,因为只有一个阻塞 API。
如果有人立即发现如何使用非阻塞 API,请留下您的答案,我会接受您的答案。谢谢
查看 ZMQ.jl
处的代码,似乎阻塞是由 :zmq_msg_recv
之后的 wait
引起的,所以这里是 recv
的另一个定义,称为 pollrecv
可以在主模块中定义(不需要更改 ZMQ.jl
):
function pollrecv(socket::ZMQ.Socket,zmsg::Message)
rc = -1
while true
rc = ccall((:zmq_msg_recv, ZMQ.zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, ZMQ.ZMQ_DONTWAIT)
if rc == -1
if !(ZMQ.zmq_errno() == Base.Libc.EAGAIN)
throw(ZMQ.StateError(ZMQ.jl_zmq_error_str()))
end
return false
else
ZMQ.get_events(socket) != 0 && notify(socket)
break
end
end
return true
end
您可以通过以下方式使用此功能:
msg = Message()
while !pollrecv(s1,msg)
sleep(3)
println("ZZzzzz...")
end
out = convert(IOStream,msg)
println(takebuf_string(out))
close(msg)
ZMQ.send(s1,"response important for next receive")
当然,while
使用轮询实现阻塞,sleep
应该换成其他处理。