Dask worker post-处理中

Dask worker post-processing

我是 dask 的新手,正在尝试在工作人员关闭时执行一些 post 处理任务。 我目前正在使用 n_workers=5

的 EC2Cluster

每次我需要 运行 我的大任务时都会创建集群。该任务输出一堆我想发送到 AWS S3 的文件。

我将如何实现一个“post-processing”功能,使每个工作人员 运行 将任何日志和输出发送到我的 AWS S3?

提前致谢

def complex():
    time.sleep(10)
    print('hard')
    print(get_worker().id)

    return 'hard'


class DaskWorkerHandler(WorkerPlugin):
    """
    Worker life-cycle handler
    """
    def __init__(self):
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

    def teardown(self, worker):
        print(f"tearing down - {self.worker_id}. thread={threading.get_ident()}")

        # some post processing on the worker server
        # eg. post files to S3 etc...


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=5)
    print(f"cluster_name={cluster.name}")

    shutdown_handler = DaskWorkerHandler()
    client = Client(cluster)
    client.register_worker_plugin(shutdown_handler)

    future = client.submit(complex)
    result = future.result()

您可以使用 Python’s standard logging module to log whatever you'd like as the workers are running and then use the worker plugin you wrote to save these logs to an S3 bucket on teardown (check out the docs on logging in Dask 了解更多详情)。这是一个例子:

import dask
from dask.distributed import Client, LocalCluster, get_worker
from dask.distributed.diagnostics.plugin import WorkerPlugin
import fsspec
import logging

def complex():
    logger = logging.getLogger("distributed.worker")
    logger.error("Got here")
    return 'hard'


class DaskWorkerHandler(WorkerPlugin):
    """Worker life-cycle handler."""
    def __init__(self):
        self.worker_id = None

    def setup(self, worker):
        self.worker_id = worker.id

    def teardown(self, worker):
        logs = worker.get_logs()
        # replace with S3 path
        with fsspec.open(f"worker-{self.worker_id}-logs.txt", "w") as f:
            f.write("\n".join([str(log) for log in logs]))


cluster = LocalCluster(n_workers=5)
client = Client(cluster)

shutdown_handler = DaskWorkerHandler()
client.register_worker_plugin(shutdown_handler)

future = client.submit(complex)
result = future.result()


client.close()
cluster.close()