Python + ZMQ: 当前状态无法完成操作
Python + ZMQ: Operation cannot be accomplished in current state
我正在尝试让一个 python 程序使用请求-回复模式通过 zeromq 与另一个 python 程序通信。客户端程序应该向服务器程序发送请求,服务器程序会回复。
我有两台服务器,当一台服务器出现故障时另一台服务器接管。当第一台服务器工作时,通信工作完美,但是,当第一台服务器发生故障并且当我向第二台服务器发出请求时,我看到错误:
zmp.error.ZMQError: Operation cannot be accomplished in current state
服务器1的代码:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5677")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
服务器2的代码:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5877")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
客户代码:
# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")
try:
sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_1.setsockopt(zmq.RCVTIMEO, 1000)
socks_1.setsockopt(zmq.LINGER, 0)
data = socks_1.recv().decode('utf-8') #receive data from the main node
except:
try:
#when server one fails
sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_2.setsockopt(zmq.RCVTIMEO, 1000)
socks_2.setsockopt(zmq.LINGER, 0)
data = socks_2.recv().decode('utf-8')
except Exception as e:
print(str(e))
这种方法有什么问题?
我该如何解决?
问:我该如何解决这个问题?
答:避免已知的 REQ/REP
死锁风险!
虽然 ZeroMQ 是一个强大的框架,但了解其内部组成对于稳健和可靠分布式系统设计和原型设计是必要的。
仔细观察后,使用常见的 REQ/REP
正式沟通模式可能会让(并且确实会让)对方陷入相互僵局:一个人期望另一个人做某事,这将永远无法完成,无法摆脱僵局。
更多
Next,故障转移系统必须能够承受其自身组件的任何冲突。因此,必须设计好分布式系统状态信号,并尽可能避免对 element-FSA-design/stepping/blocking 的依赖,否则,故障安全行为只是一种幻想。
始终 小心处理资源,不要将 ZeroMQ smart-signalling/messaging 的组件视为任何一种 "expendable disposables",学者可能会容忍这样做示例,而不是在生产系统环境中。您仍然需要支付费用(时间、资源分配/取消分配/垃圾收集)。如评论中所述,切勿在没有适当控制的情况下让资源 creation/allocation。 while True: .socket(); .bind(); .send();
在原则上是严重错误的,并且会恶化设计的其余部分。
实施lazy pirate pattern。在捕获到错误时,在尝试再次发送消息之前,从您的上下文中创建一个新套接字。
The pretty good brute force solution is to close and reopen the REQ
socket after an error
Here 是一个 python 例子。
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function
import zmq
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context(1)
print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
sequence += 1
request = str(sequence).encode()
print("I: Sending (%s)" % request)
client.send(request)
expect_reply = True
while expect_reply:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
if not reply:
break
if int(reply) == sequence:
print("I: Server replied OK (%s)" % reply)
retries_left = REQUEST_RETRIES
expect_reply = False
else:
print("E: Malformed reply from server: %s" % reply)
else:
print("W: No response from server, retrying…")
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
retries_left -= 1
if retries_left == 0:
print("E: Server seems to be offline, abandoning")
break
print("I: Reconnecting and resending (%s)" % request)
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll.register(client, zmq.POLLIN)
client.send(request)
context.term()
在服务器端,"receive" 和 "send" 对很关键。我遇到了类似的问题,而 socket.send 被遗漏了。
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request
我正在尝试让一个 python 程序使用请求-回复模式通过 zeromq 与另一个 python 程序通信。客户端程序应该向服务器程序发送请求,服务器程序会回复。
我有两台服务器,当一台服务器出现故障时另一台服务器接管。当第一台服务器工作时,通信工作完美,但是,当第一台服务器发生故障并且当我向第二台服务器发出请求时,我看到错误:
zmp.error.ZMQError: Operation cannot be accomplished in current state
服务器1的代码:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5677")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
服务器2的代码:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5877")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
客户代码:
# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")
try:
sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_1.setsockopt(zmq.RCVTIMEO, 1000)
socks_1.setsockopt(zmq.LINGER, 0)
data = socks_1.recv().decode('utf-8') #receive data from the main node
except:
try:
#when server one fails
sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_2.setsockopt(zmq.RCVTIMEO, 1000)
socks_2.setsockopt(zmq.LINGER, 0)
data = socks_2.recv().decode('utf-8')
except Exception as e:
print(str(e))
这种方法有什么问题? 我该如何解决?
问:我该如何解决这个问题?
答:避免已知的 REQ/REP
死锁风险!
虽然 ZeroMQ 是一个强大的框架,但了解其内部组成对于稳健和可靠分布式系统设计和原型设计是必要的。
仔细观察后,使用常见的 REQ/REP
正式沟通模式可能会让(并且确实会让)对方陷入相互僵局:一个人期望另一个人做某事,这将永远无法完成,无法摆脱僵局。
更多
Next,故障转移系统必须能够承受其自身组件的任何冲突。因此,必须设计好分布式系统状态信号,并尽可能避免对 element-FSA-design/stepping/blocking 的依赖,否则,故障安全行为只是一种幻想。
始终 小心处理资源,不要将 ZeroMQ smart-signalling/messaging 的组件视为任何一种 "expendable disposables",学者可能会容忍这样做示例,而不是在生产系统环境中。您仍然需要支付费用(时间、资源分配/取消分配/垃圾收集)。如评论中所述,切勿在没有适当控制的情况下让资源 creation/allocation。 while True: .socket(); .bind(); .send();
在原则上是严重错误的,并且会恶化设计的其余部分。
实施lazy pirate pattern。在捕获到错误时,在尝试再次发送消息之前,从您的上下文中创建一个新套接字。
The pretty good brute force solution is to close and reopen the REQ socket after an error
Here 是一个 python 例子。
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function
import zmq
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context(1)
print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
sequence += 1
request = str(sequence).encode()
print("I: Sending (%s)" % request)
client.send(request)
expect_reply = True
while expect_reply:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
if not reply:
break
if int(reply) == sequence:
print("I: Server replied OK (%s)" % reply)
retries_left = REQUEST_RETRIES
expect_reply = False
else:
print("E: Malformed reply from server: %s" % reply)
else:
print("W: No response from server, retrying…")
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
retries_left -= 1
if retries_left == 0:
print("E: Server seems to be offline, abandoning")
break
print("I: Reconnecting and resending (%s)" % request)
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll.register(client, zmq.POLLIN)
client.send(request)
context.term()
在服务器端,"receive" 和 "send" 对很关键。我遇到了类似的问题,而 socket.send 被遗漏了。
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request