Dask:无法从单独的 multiprocessing.Process 向全局客户端提交任务

Dask: can't submit tasks to global client from separate multiprocessing.Process

我有 2 个进程:第一个用于创建全局分布式客户端;第二个用于创建全局分布式客户端。第二个进程是一个网络爬虫,它应该获取全局客户端并向它提交任务,当一切都完成后,它会向另一个进程发送一条消息,告诉它他可以继续。

from dask.distributed import Client, as_completed
from multiprocessing import Process
from time import sleep
import zmq


def get(url) -> dict:
   # downloads data from url
   time.sleep(3)

   return data


def save(data) -> None:
    # saves data locally
    time.sleep(3)

    return None


def scraper(urls):
    # global client
    client = get_client()

    # zeromq socket
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:port')

    while True:
        for future, result in as_completed([client.submit(get, url=url) for url in urls], with_results=True):
        save(data=result)
        socket.send_string('All job is done for this minute, proceed.')
        sleep(60)


if __name__ == '__main__':

    client = Client()

    s = Process(target=scraper, *args, **kwargs)
    s.start()

问题是我可以从抓取功能中获取全局客户端(如果我打印它,我会正确地看到它),但是我不能向它提交任何类型的任务。控制台不打印任何错误,只是卡住了,什么也没做。我认为原因是刮板功能是 运行 在一个单独的 multiprocessing.Process.

任何解决方案或解决方法?谢谢。

dask 客户端保持与调度程序的打开连接。根据您的系统创建新进程的方式,您可能会获得连接的副本,这些副本指向新进程中没有任何用处,或者无法完全传输客户端(不可 pickleable)。

相反,您应该将连接信息发送给子进程

addr = c.scheduler_info()['address']

并在目标函数中做

client = Client(addr)