从 celery worker/alternative 解决方案中创建分离进程?
Creating detached processes from celery worker/alternative solution?
我正在开发将用作 "database as a service" 提供程序的 Web 服务。目标是拥有一个基于烧瓶的小型 Web 服务,运行 在某些主机上,"worker" 在不同团队拥有的不同主机上处理 运行。每当团队成员过来请求新数据库时,我都应该在他们的主机上创建一个。现在的问题...我启动的服务必须是运行。然而,工人可能会重新启动。可能发生 5 分钟可能发生 5 天。一个简单的 Popen 不会解决这个问题,因为它会创建一个子进程,如果工作人员稍后停止,Popen 进程将被销毁(我试过这个)。
我有一个使用多处理的实现,它像冠军一样工作,遗憾的是我不能将它与芹菜一起使用。那里真倒霉。我试图通过双分叉和命名管道摆脱多处理库。我可以制作的最小样本:
def launcher2(working_directory, cmd, *args):
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(f'{working_directory}/ipc.fifo', 'wb') as wpid:
wpid.write(process.pid)
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
ipc = f'{working_directory}/ipc.fifo'
if os.path.exists(ipc):
os.remove(ipc)
os.mkfifo(ipc)
pid1 = os.fork()
if pid1 == 0:
os.setsid()
os.umask(0)
pid2 = os.fork()
if pid2 > 0:
sys.exit(0)
os.setsid()
os.umask(0)
launcher2(working_directory, cmd, *args)
else:
with os.fdopen(os.open(ipc, flags=os.O_NONBLOCK | os.O_RDONLY), 'rb') as ripc:
readers, _, _ = select.select([ripc], [], [], 15)
if not readers:
raise TimeoutError(60, 'Timed out', ipc)
reader = readers.pop()
pid = struct.unpack('I', reader.read())[0]
pid, status = os.waitpid(pid, 0)
print(status)
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
我的用例更复杂,但我认为没有人愿意阅读 200 多行引导程序,但这完全以同样的方式失败了。另一方面,除非需要,否则我不会等待 pid,所以这就像根据请求启动进程并让它完成它的工作。引导数据库完成完整设置大约需要一分钟,我不希望客户端等待一分钟。请求进来,我生成进程并发回数据库实例的 id,客户端可以根据收到的实例 id 查询状态。然而,通过上述分叉解决方案,我得到:
[2020-01-20 18:03:17,760: INFO/MainProcess] Received task: Test[dbebc31c-7929-4b75-ae28-62d3f9810fd9]
[2020-01-20 18:03:20,859: ERROR/MainProcess] Process 'ForkPoolWorker-2' pid:16634 exited with 'signal 15 (SIGTERM)'
[2020-01-20 18:03:20,877: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).')
Traceback (most recent call last):
File "/home/pupsz/PycharmProjects/provider/venv37/lib/python3.7/site-packages/billiard/pool.py", line 1267, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).
这让我想知道,可能发生了什么。我尝试了一个更简单的任务:
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return process.wait()
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
再次失败并出现同样的错误。现在我喜欢 Celery,但从这里感觉它不适合我的需求。我搞砸了吗?能不能实现,我需要一个worker做什么?我有其他选择吗,还是我应该自己编写任务队列?
Celery 对多处理不友好,所以尝试使用 billiard 而不是多处理(来自 billiard import Process 等...)我希望有一天 Celery 人员对该代码进行大量重构,删除 billiard,然后开始使用改为多处理...
因此,在他们转向多处理之前,我们只能使用台球。我的建议是在你的 Celery 任务中删除任何 multiprocessing 的使用,并开始使用 billiard.context.Process
和类似的,这取决于你的用例。
我正在开发将用作 "database as a service" 提供程序的 Web 服务。目标是拥有一个基于烧瓶的小型 Web 服务,运行 在某些主机上,"worker" 在不同团队拥有的不同主机上处理 运行。每当团队成员过来请求新数据库时,我都应该在他们的主机上创建一个。现在的问题...我启动的服务必须是运行。然而,工人可能会重新启动。可能发生 5 分钟可能发生 5 天。一个简单的 Popen 不会解决这个问题,因为它会创建一个子进程,如果工作人员稍后停止,Popen 进程将被销毁(我试过这个)。
我有一个使用多处理的实现,它像冠军一样工作,遗憾的是我不能将它与芹菜一起使用。那里真倒霉。我试图通过双分叉和命名管道摆脱多处理库。我可以制作的最小样本:
def launcher2(working_directory, cmd, *args):
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(f'{working_directory}/ipc.fifo', 'wb') as wpid:
wpid.write(process.pid)
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
ipc = f'{working_directory}/ipc.fifo'
if os.path.exists(ipc):
os.remove(ipc)
os.mkfifo(ipc)
pid1 = os.fork()
if pid1 == 0:
os.setsid()
os.umask(0)
pid2 = os.fork()
if pid2 > 0:
sys.exit(0)
os.setsid()
os.umask(0)
launcher2(working_directory, cmd, *args)
else:
with os.fdopen(os.open(ipc, flags=os.O_NONBLOCK | os.O_RDONLY), 'rb') as ripc:
readers, _, _ = select.select([ripc], [], [], 15)
if not readers:
raise TimeoutError(60, 'Timed out', ipc)
reader = readers.pop()
pid = struct.unpack('I', reader.read())[0]
pid, status = os.waitpid(pid, 0)
print(status)
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
我的用例更复杂,但我认为没有人愿意阅读 200 多行引导程序,但这完全以同样的方式失败了。另一方面,除非需要,否则我不会等待 pid,所以这就像根据请求启动进程并让它完成它的工作。引导数据库完成完整设置大约需要一分钟,我不希望客户端等待一分钟。请求进来,我生成进程并发回数据库实例的 id,客户端可以根据收到的实例 id 查询状态。然而,通过上述分叉解决方案,我得到:
[2020-01-20 18:03:17,760: INFO/MainProcess] Received task: Test[dbebc31c-7929-4b75-ae28-62d3f9810fd9]
[2020-01-20 18:03:20,859: ERROR/MainProcess] Process 'ForkPoolWorker-2' pid:16634 exited with 'signal 15 (SIGTERM)'
[2020-01-20 18:03:20,877: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).')
Traceback (most recent call last):
File "/home/pupsz/PycharmProjects/provider/venv37/lib/python3.7/site-packages/billiard/pool.py", line 1267, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).
这让我想知道,可能发生了什么。我尝试了一个更简单的任务:
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return process.wait()
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
再次失败并出现同样的错误。现在我喜欢 Celery,但从这里感觉它不适合我的需求。我搞砸了吗?能不能实现,我需要一个worker做什么?我有其他选择吗,还是我应该自己编写任务队列?
Celery 对多处理不友好,所以尝试使用 billiard 而不是多处理(来自 billiard import Process 等...)我希望有一天 Celery 人员对该代码进行大量重构,删除 billiard,然后开始使用改为多处理...
因此,在他们转向多处理之前,我们只能使用台球。我的建议是在你的 Celery 任务中删除任何 multiprocessing 的使用,并开始使用 billiard.context.Process
和类似的,这取决于你的用例。