为什么我的 celery python 模块在注册任务之前被阻塞?
Why is my celery python module blocked until a task is registered?
我正在尝试在微服务架构中实现 celery。
我有 ServiceA 需要调用其他服务 X、Y、Z 上的任务。
在 python 中,当我在 X 注册任务之前加载我的模块时,整个模块似乎被阻塞了。
期望:
无论队列状态如何,当加载 python 模块时,我应该看到 'Starting caller.py.....' 后跟 blocked/waiting 任务
实际结果:
我没有看到打印语句。稍后,当另一个服务注册一个任务工作者时,整个模块然后加载...
服务A
from celery import Celery
from time import sleep
from celery.execute import send_task
from retry import retry
# this does not get called until a task is registered from service X
print('Starting caller.py.....')
app = Celery('tasks',
broker='redis://zredis:6379',
backend='redis://zredis:6379'
)
def register():
print("HELLLLLLOOOOOOOO REGISTER")
result = send_task('tasks.getNodeResults', kwargs={})
node_response = result.get()
print('NODE_RESPONSE', node_response)
register()
问题:
我如何在工作人员注册之前将工作添加到队列中以及为什么这会阻止整个模块加载?
观察:
删除 register 函数中的 'send_task' 行和相关代码可以解除整个脚本的阻塞。出现打印语句等。不确定函数内的语句如何在默认范围内阻止打印语句。
编辑 1:
我尝试过以多种方式实例化这些方法,1 作为额外的工作人员,1 作为直接 python 模块。 (带和不带 pool/concurrency 标志)
celery -A caller worker --loglevel=CRITICAL --pool=solo --concurrency=1
python caller
这是 STDOUT 和基本 python 配置的问题,与 celery 没有直接关系。
解决方法:
函数 register() 实际上是与初始打印语句一起被调用的。您可以通过在不同点添加 sys.stdout.flush()
来查看。
您可以设置以下 ENV 参数:
PYTHONUNBUFFERED=1
这是做什么的:
PYTHONUNBUFFERED
If this is set to a non-empty string it is equivalent to specifying the -u option.
命令行选项:
-u
Force stdin, stdout and stderr to be totally unbuffered. On systems where it matters, also put stdin, stdout and stderr in binary mode.
Note that there is internal buffering in file.readlines() and File Objects (for line in sys.stdin) which is not influenced by this option. To work around this, you will want to use file.readline() inside a while 1: loop.
我正在尝试在微服务架构中实现 celery。
我有 ServiceA 需要调用其他服务 X、Y、Z 上的任务。
在 python 中,当我在 X 注册任务之前加载我的模块时,整个模块似乎被阻塞了。
期望: 无论队列状态如何,当加载 python 模块时,我应该看到 'Starting caller.py.....' 后跟 blocked/waiting 任务
实际结果: 我没有看到打印语句。稍后,当另一个服务注册一个任务工作者时,整个模块然后加载...
服务A
from celery import Celery
from time import sleep
from celery.execute import send_task
from retry import retry
# this does not get called until a task is registered from service X
print('Starting caller.py.....')
app = Celery('tasks',
broker='redis://zredis:6379',
backend='redis://zredis:6379'
)
def register():
print("HELLLLLLOOOOOOOO REGISTER")
result = send_task('tasks.getNodeResults', kwargs={})
node_response = result.get()
print('NODE_RESPONSE', node_response)
register()
问题: 我如何在工作人员注册之前将工作添加到队列中以及为什么这会阻止整个模块加载?
观察: 删除 register 函数中的 'send_task' 行和相关代码可以解除整个脚本的阻塞。出现打印语句等。不确定函数内的语句如何在默认范围内阻止打印语句。
编辑 1:
我尝试过以多种方式实例化这些方法,1 作为额外的工作人员,1 作为直接 python 模块。 (带和不带 pool/concurrency 标志)
celery -A caller worker --loglevel=CRITICAL --pool=solo --concurrency=1
python caller
这是 STDOUT 和基本 python 配置的问题,与 celery 没有直接关系。
解决方法:
函数 register() 实际上是与初始打印语句一起被调用的。您可以通过在不同点添加 sys.stdout.flush()
来查看。
您可以设置以下 ENV 参数:
PYTHONUNBUFFERED=1
这是做什么的:
PYTHONUNBUFFERED
If this is set to a non-empty string it is equivalent to specifying the -u option.
命令行选项:
-u
Force stdin, stdout and stderr to be totally unbuffered. On systems where it matters, also put stdin, stdout and stderr in binary mode.
Note that there is internal buffering in file.readlines() and File Objects (for line in sys.stdin) which is not influenced by this option. To work around this, you will want to use file.readline() inside a while 1: loop.