在 ZMQ 中使用 send_multipart() 发送一系列不同数据类型的最有效方法是什么?
What is the most efficient way of sending a sequence of different data types with send_multipart() in ZMQ?
我正在尝试使用 ZeroMQ 进行多处理。我想从 tar 文件流式传输文件,所以我使用了流媒体。
下面是一个想要做的事的例子。
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
def server(frontend_port, number_of_workers):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in range(0,10):
socket.send_json('#%s' % i)
for i in range(number_of_workers):
socket.send_json('STOP')
return True
def worker(work_num, backend_port):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
message = socket.recv_json()
if message == 'STOP':
break
print("Worker #%s got message! %s" % (work_num, message))
time.sleep(1)
def main():
frontend_port = 7559
backend_port = 7560
number_of_workers = 2
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
streamerdevice.start()
processes = []
for work_num in range(number_of_workers):
w = Process(target=worker, args=(work_num,backend_port))
processes.append(w)
w.start()
time.sleep(1)
s = Process(target=server, args=(frontend_port,number_of_workers))
s.start()
# server(frontend_port)
s.join()
for w in processes:
w.join()
if __name__ == '__main__':
main()
这段代码工作正常。但是我想使用 send_multipart()
发送一个元组或一个列表,其中包含具有不同类型的项目,例如 [string, numpy_array, integer]
但 json 无法处理 numpy
数组。我避免使用泡菜,因为我需要它尽可能快。我也尝试将数组转换为字节,但没有成功。 (也许我做错了我不确定)。
如果您能提供一段有效的代码,我将不胜感激。
理想情况下,我想做这样的事情:
socket.send_multipart([string, numpy_array, integer])
所以我想知道最有效的方法是什么。
我正在使用 Python 3.6
msgpack
和 msgpack_numpy
是我能找到的最佳选择。
试试这个:
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import numpy as np
import msgpack
import msgpack_numpy as m
def server(frontend_port, number_of_workers):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in range(0,10):
arr = np.array([[[i,i],[i,i]],[[i,i],[i,i]]])
file_name = 'image file name or any other srting'
number = 10 # just an instance of an integer
msg = msgpack.packb((arr, number, file_name), default=m.encode, use_bin_type=True)
socket.send(msg, copy=False)
time.sleep(1)
for i in range(number_of_workers):
msg = msgpack.packb((b'STOP', b'STOP'), default=m.encode, use_bin_type=True)
socket.send(msg, copy=False)
return True
def worker(work_num, backend_port):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
task = socket.recv()
task = msgpack.unpackb(task, object_hook= m.decode, use_list=False, max_bin_len=50000000, raw=False)
if task[1] == b'STOP':
break
(arr, number, file_name) = task
print("Worker ",work_num, 'got message!', file_name)
return True
def main():
m.patch()
frontend_port = 3559
backend_port = 3560
number_of_workers = 2
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
streamerdevice.start()
processes = []
for work_num in range(number_of_workers):
w = Process(target=worker, args=(work_num,backend_port))
processes.append(w)
w.start()
time.sleep(1)
s = Process(target=server, args=(frontend_port,number_of_workers))
s.start()
s.join()
for w in processes:
w.join()
if __name__ == '__main__':
main()
我正在尝试使用 ZeroMQ 进行多处理。我想从 tar 文件流式传输文件,所以我使用了流媒体。 下面是一个想要做的事的例子。
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
def server(frontend_port, number_of_workers):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in range(0,10):
socket.send_json('#%s' % i)
for i in range(number_of_workers):
socket.send_json('STOP')
return True
def worker(work_num, backend_port):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
message = socket.recv_json()
if message == 'STOP':
break
print("Worker #%s got message! %s" % (work_num, message))
time.sleep(1)
def main():
frontend_port = 7559
backend_port = 7560
number_of_workers = 2
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
streamerdevice.start()
processes = []
for work_num in range(number_of_workers):
w = Process(target=worker, args=(work_num,backend_port))
processes.append(w)
w.start()
time.sleep(1)
s = Process(target=server, args=(frontend_port,number_of_workers))
s.start()
# server(frontend_port)
s.join()
for w in processes:
w.join()
if __name__ == '__main__':
main()
这段代码工作正常。但是我想使用 send_multipart()
发送一个元组或一个列表,其中包含具有不同类型的项目,例如 [string, numpy_array, integer]
但 json 无法处理 numpy
数组。我避免使用泡菜,因为我需要它尽可能快。我也尝试将数组转换为字节,但没有成功。 (也许我做错了我不确定)。
如果您能提供一段有效的代码,我将不胜感激。
理想情况下,我想做这样的事情:
socket.send_multipart([string, numpy_array, integer])
所以我想知道最有效的方法是什么。
我正在使用 Python 3.6
msgpack
和 msgpack_numpy
是我能找到的最佳选择。
试试这个:
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import numpy as np
import msgpack
import msgpack_numpy as m
def server(frontend_port, number_of_workers):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in range(0,10):
arr = np.array([[[i,i],[i,i]],[[i,i],[i,i]]])
file_name = 'image file name or any other srting'
number = 10 # just an instance of an integer
msg = msgpack.packb((arr, number, file_name), default=m.encode, use_bin_type=True)
socket.send(msg, copy=False)
time.sleep(1)
for i in range(number_of_workers):
msg = msgpack.packb((b'STOP', b'STOP'), default=m.encode, use_bin_type=True)
socket.send(msg, copy=False)
return True
def worker(work_num, backend_port):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
task = socket.recv()
task = msgpack.unpackb(task, object_hook= m.decode, use_list=False, max_bin_len=50000000, raw=False)
if task[1] == b'STOP':
break
(arr, number, file_name) = task
print("Worker ",work_num, 'got message!', file_name)
return True
def main():
m.patch()
frontend_port = 3559
backend_port = 3560
number_of_workers = 2
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
streamerdevice.start()
processes = []
for work_num in range(number_of_workers):
w = Process(target=worker, args=(work_num,backend_port))
processes.append(w)
w.start()
time.sleep(1)
s = Process(target=server, args=(frontend_port,number_of_workers))
s.start()
s.join()
for w in processes:
w.join()
if __name__ == '__main__':
main()