Python Asyncio 阻塞协程
Python Asyncio blocked coroutine
我正在尝试编写一个基于 Asyncio 的简单程序和一个使用 ZeroMQ 实现的 Publish/Subscribe 设计模式。发布者有 2 个协程;一个监听传入的订阅,另一个将值(通过 HTTP 请求获得)发布给订阅者。订阅者订阅特定参数(本例中为城市名称),并等待值(该城市的温度)。
这是我的代码:
publisher.py
#!/usr/bin/env python
import json
import aiohttp
import aiozmq
import asyncio
import zmq
class Publisher:
BIND_ADDRESS = 'tcp://*:10000'
def __init__(self):
self.stream = None
self.parameter = ""
@asyncio.coroutine
def main(self):
self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
tasks = [
asyncio.async(self.subscriptions()),
asyncio.async(self.publish())]
print("before wait")
yield from asyncio.wait(tasks)
print("after wait")
@asyncio.coroutine
def subscriptions(self):
print("Entered subscriptions coroutine")
while True:
print("New iteration of subscriptions loop")
received = yield from self.stream.read()
first_byte = received[0][0]
self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
# Subscribe request
if first_byte == 1:
print("subscription request received for parameter "+self.parameter)
# Unsubscribe request
elif first_byte == 0:
print("Unsubscription request received for parameter "+self.parameter)
@asyncio.coroutine
def publish(self):
print("Entered publish coroutine")
while True:
if self.parameter:
print("New iteration of publish loop")
# Make HTTP request
url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
response = yield from aiohttp.request('GET', url)
assert response.status == 200
content = yield from response.read()
# Decode JSON string
decoded_json = json.loads(content.decode())
# Get parameter value
value = decoded_json["main"]["temp"]
# Publish fetched values to subscribers
message = bytearray(self.parameter+":"+str(value),"utf-8")
print(message)
pack = [message]
print("before write")
yield from self.stream.write(pack)
print("after write")
yield from asyncio.sleep(10)
test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())
subscriber.py
#!/usr/bin/env python
import zmq
class Subscriber:
XSUB_CONNECT = 'tcp://localhost:10000'
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.connect(Subscriber.XSUB_CONNECT)
def loop(self):
print(self.socket.recv())
self.socket.close()
def subscribe(self, parameter):
self.socket.send_string('\x01'+parameter)
print("Subscribed to parameter "+parameter)
def unsubscribe(self, parameter):
self.socket.send_string('\x00'+parameter)
print("Unsubscribed to parameter "+parameter)
test = Subscriber()
test.subscribe("London")
while True:
print(test.socket.recv())
这是输出:
订阅方:
$ python3 subscriber.py
Subscribed to parameter London
b'London:288.15'
发布方:
$ python3 publisher.py
before wait
Entered subscriptions coroutine
New iteration of subscriptions loop
Entered publish coroutine
subscription request received for parameter London
New iteration of subscriptions loop
New iteration of publish loop
bytearray(b'London:288.15')
before write
程序卡在那里。
如您所见,"before write"
出现在输出中并且消息已发送,但 "after write"
没有出现。因此,我认为可能在 self.stream.write(pack)
调用堆栈中的某处引发并捕获了异常。
如果我向发布商发送 KeyboardInterrupt
,我得到的是:
Traceback (most recent call last):
File "publisher.py", line 73, in <module>
loop.run_until_complete(test.main())
File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.4/selectors.py", line 432, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "publisher.py", line 66, in publish
yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
所以我想我的问题实际上是这个错误:TypeError: 'NoneType' object is not iterable
,但我不知道是什么原因造成的。
这里出了什么问题?
问题是您正试图 yield from
调用 self.stream.write()
,但 stream.write
isn't actually a coroutine。当您对某个项目调用 yield from
时,Python 会在内部调用 iter(item)
。在这种情况下,对 write()
的调用返回 None
,因此 Python 正在尝试执行 iter(None)
- 因此您会看到异常。
要修复它,您应该像调用普通函数一样调用 write()
。如果您想真正等到 write
被刷新并发送到 reader,请在调用 write()
:
后使用 yield from stream.drain()
print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")
此外,要确保 publish
中的异常无需 Ctrl+C 即可引发,请使用 asyncio.gather
而不是 asyncio.wait
:
yield from asyncio.gather(*tasks)
使用 asyncio.gather
,tasks
中的任务抛出的任何异常都将被重新引发。
我正在尝试编写一个基于 Asyncio 的简单程序和一个使用 ZeroMQ 实现的 Publish/Subscribe 设计模式。发布者有 2 个协程;一个监听传入的订阅,另一个将值(通过 HTTP 请求获得)发布给订阅者。订阅者订阅特定参数(本例中为城市名称),并等待值(该城市的温度)。
这是我的代码:
publisher.py
#!/usr/bin/env python
import json
import aiohttp
import aiozmq
import asyncio
import zmq
class Publisher:
BIND_ADDRESS = 'tcp://*:10000'
def __init__(self):
self.stream = None
self.parameter = ""
@asyncio.coroutine
def main(self):
self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
tasks = [
asyncio.async(self.subscriptions()),
asyncio.async(self.publish())]
print("before wait")
yield from asyncio.wait(tasks)
print("after wait")
@asyncio.coroutine
def subscriptions(self):
print("Entered subscriptions coroutine")
while True:
print("New iteration of subscriptions loop")
received = yield from self.stream.read()
first_byte = received[0][0]
self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
# Subscribe request
if first_byte == 1:
print("subscription request received for parameter "+self.parameter)
# Unsubscribe request
elif first_byte == 0:
print("Unsubscription request received for parameter "+self.parameter)
@asyncio.coroutine
def publish(self):
print("Entered publish coroutine")
while True:
if self.parameter:
print("New iteration of publish loop")
# Make HTTP request
url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
response = yield from aiohttp.request('GET', url)
assert response.status == 200
content = yield from response.read()
# Decode JSON string
decoded_json = json.loads(content.decode())
# Get parameter value
value = decoded_json["main"]["temp"]
# Publish fetched values to subscribers
message = bytearray(self.parameter+":"+str(value),"utf-8")
print(message)
pack = [message]
print("before write")
yield from self.stream.write(pack)
print("after write")
yield from asyncio.sleep(10)
test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())
subscriber.py
#!/usr/bin/env python
import zmq
class Subscriber:
XSUB_CONNECT = 'tcp://localhost:10000'
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.connect(Subscriber.XSUB_CONNECT)
def loop(self):
print(self.socket.recv())
self.socket.close()
def subscribe(self, parameter):
self.socket.send_string('\x01'+parameter)
print("Subscribed to parameter "+parameter)
def unsubscribe(self, parameter):
self.socket.send_string('\x00'+parameter)
print("Unsubscribed to parameter "+parameter)
test = Subscriber()
test.subscribe("London")
while True:
print(test.socket.recv())
这是输出:
订阅方:
$ python3 subscriber.py
Subscribed to parameter London
b'London:288.15'
发布方:
$ python3 publisher.py
before wait
Entered subscriptions coroutine
New iteration of subscriptions loop
Entered publish coroutine
subscription request received for parameter London
New iteration of subscriptions loop
New iteration of publish loop
bytearray(b'London:288.15')
before write
程序卡在那里。
如您所见,"before write"
出现在输出中并且消息已发送,但 "after write"
没有出现。因此,我认为可能在 self.stream.write(pack)
调用堆栈中的某处引发并捕获了异常。
如果我向发布商发送 KeyboardInterrupt
,我得到的是:
Traceback (most recent call last):
File "publisher.py", line 73, in <module>
loop.run_until_complete(test.main())
File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.4/selectors.py", line 432, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "publisher.py", line 66, in publish
yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
所以我想我的问题实际上是这个错误:TypeError: 'NoneType' object is not iterable
,但我不知道是什么原因造成的。
这里出了什么问题?
问题是您正试图 yield from
调用 self.stream.write()
,但 stream.write
isn't actually a coroutine。当您对某个项目调用 yield from
时,Python 会在内部调用 iter(item)
。在这种情况下,对 write()
的调用返回 None
,因此 Python 正在尝试执行 iter(None)
- 因此您会看到异常。
要修复它,您应该像调用普通函数一样调用 write()
。如果您想真正等到 write
被刷新并发送到 reader,请在调用 write()
:
yield from stream.drain()
print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")
此外,要确保 publish
中的异常无需 Ctrl+C 即可引发,请使用 asyncio.gather
而不是 asyncio.wait
:
yield from asyncio.gather(*tasks)
使用 asyncio.gather
,tasks
中的任务抛出的任何异常都将被重新引发。