Dask:无法从单独的 multiprocessing.Process 向全局客户端提交任务
Dask: can't submit tasks to global client from separate multiprocessing.Process
我有 2 个进程:第一个用于创建全局分布式客户端;第二个用于创建全局分布式客户端。第二个进程是一个网络爬虫,它应该获取全局客户端并向它提交任务,当一切都完成后,它会向另一个进程发送一条消息,告诉它他可以继续。
from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq
def get(url) -> dict:
# downloads data from url
time.sleep(3)
return data
def save(data) -> None:
# saves data locally
time.sleep(3)
return None
def scraper(urls):
# global client
client = get_client()
# zeromq socket
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:port')
while True:
for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
save(data=result)
socket.send_string('All job is done for this minute, proceed.')
sleep(60)
if __name__ == '__main__':
client = Client()
s = Process(target=scraper, *args, **kwargs)
s.start()
问题是我可以从抓取功能中获取全局客户端(如果我打印它,我会正确地看到它),但是我不能向它提交任何类型的任务。控制台不打印任何错误,只是卡住了,什么也没做。我认为原因是刮板功能是 运行 在一个单独的 multiprocessing.Process.
任何解决方案或解决方法?谢谢。
dask 客户端保持与调度程序的打开连接。根据您的系统创建新进程的方式,您可能会获得连接的副本,这些副本指向新进程中没有任何用处,或者无法完全传输客户端(不可 pickleable)。
相反,您应该将连接信息发送给子进程
addr = c.scheduler_info()['address']
并在目标函数中做
client = Client(addr)
我有 2 个进程:第一个用于创建全局分布式客户端;第二个用于创建全局分布式客户端。第二个进程是一个网络爬虫,它应该获取全局客户端并向它提交任务,当一切都完成后,它会向另一个进程发送一条消息,告诉它他可以继续。
from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq
def get(url) -> dict:
# downloads data from url
time.sleep(3)
return data
def save(data) -> None:
# saves data locally
time.sleep(3)
return None
def scraper(urls):
# global client
client = get_client()
# zeromq socket
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:port')
while True:
for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
save(data=result)
socket.send_string('All job is done for this minute, proceed.')
sleep(60)
if __name__ == '__main__':
client = Client()
s = Process(target=scraper, *args, **kwargs)
s.start()
问题是我可以从抓取功能中获取全局客户端(如果我打印它,我会正确地看到它),但是我不能向它提交任何类型的任务。控制台不打印任何错误,只是卡住了,什么也没做。我认为原因是刮板功能是 运行 在一个单独的 multiprocessing.Process.
任何解决方案或解决方法?谢谢。
dask 客户端保持与调度程序的打开连接。根据您的系统创建新进程的方式,您可能会获得连接的副本,这些副本指向新进程中没有任何用处,或者无法完全传输客户端(不可 pickleable)。
相反,您应该将连接信息发送给子进程
addr = c.scheduler_info()['address']
并在目标函数中做
client = Client(addr)