芹菜任务不会发送给经纪人
Celery task does not get send to broker
当我尝试将我的任务发送到代理 (RabbitMQ) 时它挂起。
# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.
如果我 运行 同步任务,它会按预期工作。
# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2
如果我用 ctrl+c 打断 .apply_async()
,我会得到一些线索的回溯:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
host, port, family, socket.SOCK_STREAM, SOL_TCP)
File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -9] Address family for hostname not supported
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 866, in _connection_factory
self._connection = self._establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 801, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
conn.connect()
File "/usr/local/lib/python3.7/site-packages/amqp/connection.py", line 323, in connect
self.transport.connect()
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 113, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 184, in _connect
"failed to resolve broker hostname"))
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 197, in _connect
self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.7/site-packages/celery/canvas.py", line 225, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 565, in apply_async
**options
File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 749, in send_task
amqp.send_task_message(P, name, message, **options)
File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 532, in send_task_message
**properties
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 178, in publish
exchange_name, declare,
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 525, in _ensured
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 184, in _publish
channel = self.channel
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 206, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 221, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 884, in default_channel
self._ensure_connection(**conn_opts)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 439, in _ensure_connection
callback, timeout=timeout
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
sleep(1.0)
经纪人连接字符串在系统中如下所示:
~$ env | grep BROKER
CELERY_BROKER=pyamqp://guest@172.23.0.3//
python中的代理连接字符串:
# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//
之前你提示RabbitMQ不是运行ning,或者连接字符串坏;我的芹菜工人(消费者)进程能够使用相同的连接字符串进行连接。
-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport: amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results: postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. foo_task
. (long list of tasks)
[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.
这就是我 app/producer 连接到代理的方式。
文件 celeryconfig.py 包含代理 url 后端、并发性等设置
# celery_tasks.py
# imports...
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file
@app.task(name="foo")
def foo(first_arg: str) -> str:
print(f"thanks for {first_arg}")
return "OK"
问题出在我的配置文件中。 Celery 没有找到 broker_url
这个属性,也没有给出任何警告。相反,芹菜默默地设置了一个默认值 amqp://guest:**@localhost:5672//
。在此处查看详细信息 https://github.com/celery/celery/issues/6661
当我尝试将我的任务发送到代理 (RabbitMQ) 时它挂起。
# python shell
promise = foo.s(first_arg="2").apply_async()
# blocking indefinitely. I expected a promise object.
如果我 运行 同步任务,它会按预期工作。
# python shell
promise = foo.s(first_arg="2").apply()
>>> hello argument 2
如果我用 ctrl+c 打断 .apply_async()
,我会得到一些线索的回溯:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 173, in _connect
host, port, family, socket.SOCK_STREAM, SOL_TCP)
File "/usr/local/lib/python3.7/socket.py", line 752, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -9] Address family for hostname not supported
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 866, in _connection_factory
self._connection = self._establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 801, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/transport/pyamqp.py", line 128, in establish_connection
conn.connect()
File "/usr/local/lib/python3.7/site-packages/amqp/connection.py", line 323, in connect
self.transport.connect()
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 113, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 184, in _connect
"failed to resolve broker hostname"))
File "/usr/local/lib/python3.7/site-packages/amqp/transport.py", line 197, in _connect
self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.7/site-packages/celery/canvas.py", line 225, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 565, in apply_async
**options
File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 749, in send_task
amqp.send_task_message(P, name, message, **options)
File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 532, in send_task_message
**properties
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 178, in publish
exchange_name, declare,
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 525, in _ensured
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 184, in _publish
channel = self.channel
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 206, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 221, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 884, in default_channel
self._ensure_connection(**conn_opts)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 439, in _ensure_connection
callback, timeout=timeout
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 339, in retry_over_time
sleep(1.0)
经纪人连接字符串在系统中如下所示:
~$ env | grep BROKER
CELERY_BROKER=pyamqp://guest@172.23.0.3//
python中的代理连接字符串:
# python shell
from src.celery import app
app.pool.connection
>>> Connection: amqp://guest:**@localhost:5672//
之前你提示RabbitMQ不是运行ning,或者连接字符串坏;我的芹菜工人(消费者)进程能够使用相同的连接字符串进行连接。
-------------- celery@f9ab48fc6b63 v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-4.15.0-20-generic-x86_64-with-debian-9.12 2021-03-05 07:56:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_statst_api:0x7f15b6de0450
- ** ---------- .> transport: amqp://guest:**@my-rabbit:5672//
- ** ---------- .> results: postgresql://docker:**@pg_db:5432/
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. foo_task
. (long list of tasks)
[2021-03-05 07:56:30,564: INFO/MainProcess] Connected to amqp://guest:**@my-rabbit:5672//
[2021-03-05 07:56:30,581: INFO/MainProcess] mingle: searching for neighbors
[2021-03-05 07:56:31,622: INFO/MainProcess] mingle: all alone
[2021-03-05 07:56:31,647: INFO/MainProcess] celery@f9ab48fc6b63 ready.
这就是我 app/producer 连接到代理的方式。 文件 celeryconfig.py 包含代理 url 后端、并发性等设置
# celery_tasks.py
# imports...
app = Celery('celery_statst_api')
app.config_from_object(celeryconfig) # import config file
@app.task(name="foo")
def foo(first_arg: str) -> str:
print(f"thanks for {first_arg}")
return "OK"
问题出在我的配置文件中。 Celery 没有找到 broker_url
这个属性,也没有给出任何警告。相反,芹菜默默地设置了一个默认值 amqp://guest:**@localhost:5672//
。在此处查看详细信息 https://github.com/celery/celery/issues/6661