ZMQ sockets缺乏线程安全如何处理?
How to deal with ZMQ sockets lack of thread safety?
我已经在一些 Python 应用程序中使用 ZMQ 一段时间了,但直到最近我才决定在 Go 中重新实现其中一个,并且我意识到 ZMQ 套接字不是线程安全的。
最初的 Python 实现使用如下所示的事件循环:
while running:
socks = dict(poller.poll(TIMEOUT))
if socks.get(router) == zmq.POLLIN:
client_id = router.recv()
_ = router.recv()
data = router.recv()
requests.append((client_id, data))
for req in requests:
rep = handle_request(req)
if rep:
replies.append(rep)
requests.remove(req)
for client_id, data in replies:
router.send(client_id, zmq.SNDMORE)
router.send(b'', zmq.SNDMORE)
router.send(data)
del replies[:]
问题是回复可能在第一次通过时还没有准备好,所以每当我有待处理的请求时,我必须以非常短的超时时间进行轮询,否则客户端等待的时间会超过他们应该等待的时间,并且应用程序最终使用大量 CPU 进行轮询。
当我决定用 Go 重新实现它时,我认为它会这么简单,通过在轮询上使用无限超时来避免问题:
for {
sockets, _ := poller.Poll(-1)
for _, socket := range sockets {
switch s := socket.Socket; s {
case router:
msg, _ := s.RecvMessage(0)
client_id := msg[0]
data := msg[2]
go handleRequest(router, client_id, data)
}
}
}
但只有当我连接了一个客户端或轻负载时,这个理想的实现才有效。在重负载下,我在 libzmq 中出现随机断言错误。我尝试了以下方法:
在 zmq4 docs 之后,我尝试在所有套接字操作上添加 sync.Mutex 和 lock/unlock。它失败。我认为这是因为 ZMQ 使用自己的线程进行刷新。
为 polling/receiving 创建一个 goroutine,为发送创建一个 goroutine,并按照我在 Python 版本中使用 req/rep 队列的方式使用通道。它失败了,因为我仍在共享套接字。
同2,但设置GOMAXPROCS=1
。它失败了,并且吞吐量非常有限,因为在 Poll()
调用返回之前回复被阻止。
像 2 一样使用 req/rep 通道,但使用 runtime.LockOSThread
将所有套接字操作保持在与套接字相同的线程中。有和上面一样的问题。它没有失败,但吞吐量非常有限。
与4相同,但使用Python版本的轮询超时策略。它有效,但与 Python 版本存在相同的问题。
共享上下文而不是套接字,并在单独的 goroutine 中创建一个用于发送的套接字和一个用于接收的套接字,与通道通信。它有效,但我必须重写客户端库以使用两个套接字而不是一个套接字。
摆脱 zmq 并使用线程安全的原始 TCP 套接字。它工作得很好,但我还必须重写客户端库。
所以,看起来 6 是 ZMQ 真正打算使用的方式,因为这是我让它与 goroutines 无缝工作的唯一方法,但我想知道是否还有其他我没有尝试过的方法。有什么想法吗?
更新
有了这里的答案,我意识到我可以只向轮询器添加一个 inproc
PULL 套接字,然后让一个 goroutine 连接并推送一个字节来打破无限等待。它不像这里建议的解决方案那么通用,但它可以工作,我什至可以将它移植到 Python 版本。
我opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go到pebbe/zmq4。最终作者决定反对它,但我们已经在生产中(在非常繁重的工作负载下)使用它很长时间了。
这是必须添加到 pebbe/zmq4 包中的文件的 gist(因为它向 Socket 添加了方法)。可以这样重写,使 Socket 接收器上的方法采用 Socket
作为参数,但由于我们无论如何都要提供代码,所以这是一种简单的方法。
基本用法是像平常一样创建您的 Socket
(例如将其命名为 s
)然后您可以:
channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()
现在您有两个 [][]byte
类型的通道可以在 goroutine 之间使用,但是单个 goroutine - 在通道抽象中管理,负责管理 Poller
并与套接字通信.
用 pebbe/zmq4 做到这一点的好方法是 Reactor。 Reactor 能够监听 Go 通道,但您不想那样做,因为它们通过使用轮询超时定期轮询通道来这样做,这会重新引入与您完全相同的问题在您的 Python 版本中。相反,您可以使用 zmq inproc
套接字,一端由反应器控制,另一端由从通道传递数据的 goroutine 控制。它复杂、冗长且令人不愉快,但我已经成功地使用了它。
我已经在一些 Python 应用程序中使用 ZMQ 一段时间了,但直到最近我才决定在 Go 中重新实现其中一个,并且我意识到 ZMQ 套接字不是线程安全的。
最初的 Python 实现使用如下所示的事件循环:
while running:
socks = dict(poller.poll(TIMEOUT))
if socks.get(router) == zmq.POLLIN:
client_id = router.recv()
_ = router.recv()
data = router.recv()
requests.append((client_id, data))
for req in requests:
rep = handle_request(req)
if rep:
replies.append(rep)
requests.remove(req)
for client_id, data in replies:
router.send(client_id, zmq.SNDMORE)
router.send(b'', zmq.SNDMORE)
router.send(data)
del replies[:]
问题是回复可能在第一次通过时还没有准备好,所以每当我有待处理的请求时,我必须以非常短的超时时间进行轮询,否则客户端等待的时间会超过他们应该等待的时间,并且应用程序最终使用大量 CPU 进行轮询。
当我决定用 Go 重新实现它时,我认为它会这么简单,通过在轮询上使用无限超时来避免问题:
for {
sockets, _ := poller.Poll(-1)
for _, socket := range sockets {
switch s := socket.Socket; s {
case router:
msg, _ := s.RecvMessage(0)
client_id := msg[0]
data := msg[2]
go handleRequest(router, client_id, data)
}
}
}
但只有当我连接了一个客户端或轻负载时,这个理想的实现才有效。在重负载下,我在 libzmq 中出现随机断言错误。我尝试了以下方法:
在 zmq4 docs 之后,我尝试在所有套接字操作上添加 sync.Mutex 和 lock/unlock。它失败。我认为这是因为 ZMQ 使用自己的线程进行刷新。
为 polling/receiving 创建一个 goroutine,为发送创建一个 goroutine,并按照我在 Python 版本中使用 req/rep 队列的方式使用通道。它失败了,因为我仍在共享套接字。
同2,但设置
GOMAXPROCS=1
。它失败了,并且吞吐量非常有限,因为在Poll()
调用返回之前回复被阻止。像 2 一样使用 req/rep 通道,但使用
runtime.LockOSThread
将所有套接字操作保持在与套接字相同的线程中。有和上面一样的问题。它没有失败,但吞吐量非常有限。与4相同,但使用Python版本的轮询超时策略。它有效,但与 Python 版本存在相同的问题。
共享上下文而不是套接字,并在单独的 goroutine 中创建一个用于发送的套接字和一个用于接收的套接字,与通道通信。它有效,但我必须重写客户端库以使用两个套接字而不是一个套接字。
摆脱 zmq 并使用线程安全的原始 TCP 套接字。它工作得很好,但我还必须重写客户端库。
所以,看起来 6 是 ZMQ 真正打算使用的方式,因为这是我让它与 goroutines 无缝工作的唯一方法,但我想知道是否还有其他我没有尝试过的方法。有什么想法吗?
更新
有了这里的答案,我意识到我可以只向轮询器添加一个 inproc
PULL 套接字,然后让一个 goroutine 连接并推送一个字节来打破无限等待。它不像这里建议的解决方案那么通用,但它可以工作,我什至可以将它移植到 Python 版本。
我opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go到pebbe/zmq4。最终作者决定反对它,但我们已经在生产中(在非常繁重的工作负载下)使用它很长时间了。
这是必须添加到 pebbe/zmq4 包中的文件的 gist(因为它向 Socket 添加了方法)。可以这样重写,使 Socket 接收器上的方法采用 Socket
作为参数,但由于我们无论如何都要提供代码,所以这是一种简单的方法。
基本用法是像平常一样创建您的 Socket
(例如将其命名为 s
)然后您可以:
channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()
现在您有两个 [][]byte
类型的通道可以在 goroutine 之间使用,但是单个 goroutine - 在通道抽象中管理,负责管理 Poller
并与套接字通信.
用 pebbe/zmq4 做到这一点的好方法是 Reactor。 Reactor 能够监听 Go 通道,但您不想那样做,因为它们通过使用轮询超时定期轮询通道来这样做,这会重新引入与您完全相同的问题在您的 Python 版本中。相反,您可以使用 zmq inproc
套接字,一端由反应器控制,另一端由从通道传递数据的 goroutine 控制。它复杂、冗长且令人不愉快,但我已经成功地使用了它。