运行 Faust with kafka 因 ConsumerStoppedError 崩溃
Running Faust with kafka crashes with ConsumerStoppedError
我新安装了 Faust 和 运行 通过 Kafka.I 发送和接收消息的基本程序使用了 (Faust example of publishing to a kafka topic) 中提到的示例代码,而 运行 该程序最初它连接到 Kafka(在我的机器上也是 运行)。但是随后在尝试使用 Kafka 时断开连接并且应用程序崩溃并出现以下异常
[2020-11-11 07:08:26,623] [76392] [ERROR] [^---Fetcher]: Crashed reason=ConsumerStoppedError()
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
await task
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
await self._drainer
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
records = await self._getmany(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
return await self.call_thread(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
raise ConsumerStoppedError()
在调试消费者断开连接的原因时,我看到在 alokafka 消费者的 fetcher.py 中,连接因以下异常而关闭
Unable to display children:Error resolving variables Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
def resolve(self, dict, key):
KeyError: 'Exception'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 1227, in do_it
def do_it(self, dbg):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 262, in resolve_compound_variable_fields
def resolve_compound_variable_fields(thread_id, frame_id, scope, attrs):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 169, in getVariable
def getVariable(thread_id, frame_id, scope, attrs):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
def resolve(self, dict, key):
AttributeError: 'dict' object has no attribute 'Exception'
软件版本如下
- Mac OS : 10.15.4
- 卡夫卡:2_12.2.1.1
- 艾奥卡夫卡:1.1.6
- Python : 3.9.0
- 浮士德:1.10.4
请帮忙。
我认为你的问题和我遇到的问题是一样的 python 3.9 我现在换成 3.8 可以了。
我新安装了 Faust 和 运行 通过 Kafka.I 发送和接收消息的基本程序使用了 (Faust example of publishing to a kafka topic) 中提到的示例代码,而 运行 该程序最初它连接到 Kafka(在我的机器上也是 运行)。但是随后在尝试使用 Kafka 时断开连接并且应用程序崩溃并出现以下异常
[2020-11-11 07:08:26,623] [76392] [ERROR] [^---Fetcher]: Crashed reason=ConsumerStoppedError()
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
await task
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
await self._drainer
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
records = await self._getmany(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
return await self.call_thread(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
raise ConsumerStoppedError()
在调试消费者断开连接的原因时,我看到在 alokafka 消费者的 fetcher.py 中,连接因以下异常而关闭
Unable to display children:Error resolving variables Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
def resolve(self, dict, key):
KeyError: 'Exception'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 1227, in do_it
def do_it(self, dbg):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 262, in resolve_compound_variable_fields
def resolve_compound_variable_fields(thread_id, frame_id, scope, attrs):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 169, in getVariable
def getVariable(thread_id, frame_id, scope, attrs):
File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
def resolve(self, dict, key):
AttributeError: 'dict' object has no attribute 'Exception'
软件版本如下
- Mac OS : 10.15.4
- 卡夫卡:2_12.2.1.1
- 艾奥卡夫卡:1.1.6
- Python : 3.9.0
- 浮士德:1.10.4
请帮忙。
我认为你的问题和我遇到的问题是一样的 python 3.9 我现在换成 3.8 可以了。