带有 aiozmq 流的简单 PUB/SUB
Simple PUB/SUB with aiozmq stream
我正在尝试使用 aiozmq 流(由于某些原因我不想使用 aiozmq rpc)使工作变得简单 PUB/SUB 但没有成功:
pub.py
# coding: utf-8
import asyncio
import time
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.PUB,
bind='tcp://127.0.0.1:5556',
)
while True:
await asyncio.sleep(1)
msg = [str(time.time()).encode()]
print('write ', msg)
stream.write(msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
sub.py
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
执行时pub.py:
python pub.py
write [b'1534927086.914483']
write [b'1534927087.9154818']
write [b'1534927088.9164672']
然后执行sub.py:
python sub.py
wait ...
我错过了什么?
只是错过了 sub.py
中的交通订阅线路。有一个工作 sub.py:
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
stream.transport.subscribe(b'')
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
生产者:
python sub.py
wait ...
received [b'1534927504.0462704']
wait ...
received [b'1534927505.0478334']
我正在尝试使用 aiozmq 流(由于某些原因我不想使用 aiozmq rpc)使工作变得简单 PUB/SUB 但没有成功:
pub.py
# coding: utf-8
import asyncio
import time
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.PUB,
bind='tcp://127.0.0.1:5556',
)
while True:
await asyncio.sleep(1)
msg = [str(time.time()).encode()]
print('write ', msg)
stream.write(msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
sub.py
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
执行时pub.py:
python pub.py
write [b'1534927086.914483']
write [b'1534927087.9154818']
write [b'1534927088.9164672']
然后执行sub.py:
python sub.py
wait ...
我错过了什么?
只是错过了 sub.py
中的交通订阅线路。有一个工作 sub.py:
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
stream.transport.subscribe(b'')
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
生产者:
python sub.py
wait ...
received [b'1534927504.0462704']
wait ...
received [b'1534927505.0478334']