将 dask map_partitions 与打印功能同步

Synchronize dask map_partitions with print functions

我有以下代码:

def func1(df):
    x = 1
    print('Processing func1')
    return x

def func2(df):
    x = 2
    print('Processing func2')
    return x

ddf = from_pandas(df, npartitions=3)

print('func1 processing started')
ddf.map_partitions(func1)
print('func1 processing ended')

print('func2 processing started')
ddf.map_partitions(func2)
print('func2 processing ended')

ddf.compute()

我正在寻找一种方法来记录(或在本例中为打印)每个地图分区执行之前、期间和之后的步骤。

但是,由于 ddf.compute() 在打印函数后触发了 map_partitions,我得到如下信息:

func1 processing started
func1 processing ended
func2 processing started
func2 processing ended
Processing func1
Processing func1
Processing func1
Processing func2
Processing func2
Processing func2

相反,我需要

func1 processing started
Processing func1
Processing func1
Processing func1
func1 processing ended
func2 processing started
Processing func2
Processing func2
Processing func2
func2 processing ended

如何进行这项工作?注意:我的示例使用 print,但我想将 map_partitions 与 any python 函数同步。

更新

更现实的场景:

def func1():
   df = dd.read_csv('file.csv', npartitions=3)
   log('In func1') 
   df = func11(df,123)
   log('func1 ended')
   ddf.compute()


def func11(df,x):
    log('In func11')
    # ... do stuff with df
    df = func111(df,x)
    return df 
 

def func111(df, x):
    log('In func111')
    df = df.map_partitions(func1111)
    return df


def func1111(df):
    df['abc'] = df['abc'] * 2
    log('Processing func1111')
    return df

log(msg):
    print(msg) # or log in file or DB

要求是打印:

In func1
In func11
In func111
Processing func1111
Processing func1111
Processing func1111
func1 ended
   

一种可能性是创建一个包含所需操作序列的新函数:

def my_flow(df):

    print('func1 processing started')
    df = func1(df)
    print('func1 processing ended')

    print('func2 processing started')
    df = func2(df)
    print('func2 processing ended')
    
    return df

ddf_new = ddf.map_partitions(my_flow)

或许您可以尝试用装饰器包装您的函数。如果您将所需的日志记录行为放在装饰器中,那么您就不必在代码中放置一堆打印语句。

import numpy as np
import pandas as pd
import dask.dataframe as dd

import time
from functools import wraps


def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time.time()
        print("Starting :%r" % f.__name__)
        result = f(*args, **kw)
        te = time.time()
        print('Completed :%r took: %2.2f sec' % (f.__name__, te-ts))
        return result
    return wrap


@timing
def func1():
    df = pd.DataFrame(np.random.random([3000, 10]))
    ddf = dd.from_pandas(df, npartitions=3)
    time.sleep(1)
    ddf = func11(ddf, None)
    df = ddf.compute()
    return df


@timing
def func11(ddf, x):
    time.sleep(1)
    ddf = func111(ddf, None)
    return ddf


@timing
def func1111(df):
    df = df * 2
    time.sleep(1)
    return df


@timing
def func111(ddf, x):

    ddf = ddf.map_partitions(func1111)
    time.sleep(1)
    return ddf


ddf = func1()

这给出了输出:

Starting :'func1'
Starting :'func11'
Starting :'func111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func111' took: 2.01 sec
Completed :'func11' took: 3.01 sec
Starting :'func1111'
Starting :'func1111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1' took: 5.02 sec

您可以将 ddf 包装到队列 ddf.map_partitions()log() 以及 ddf.persist()wait(ddf)

from dask.distributed import wait


class QueuedMapPartitionsWrapper:
    def __init__(self, ddf, queue=None):
        self.ddf = ddf
        self.queue = queue or []

    def map_partitions(self, func, *args, **kwargs):
        return self.__class__(self.ddf, self.queue + [(True, func, args, kwargs)])

    def log(self, *args, **kwargs):
        return self.__class__(self.ddf, self.queue + [(False, log, args, kwargs)])

    def compute(self):
        ddf = self.ddf
        for (map_partitions, func, args, kwargs) in self.queue:
            if map_partitions:
                ddf = ddf.map_partitions(func, *args, **kwargs)
            else:
                ddf = ddf.persist()
                wait(ddf)
                func(*args, **kwargs)
        return ddf.compute()

用法 1:

ddf = dd.from_pandas(df, npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf)

ddf = ddf.log('func1 processing started')
ddf = ddf.map_partitions(func1)
ddf = ddf.log('func1 processing ended')

ddf = ddf.log('func2 processing started')
ddf = ddf.map_partitions(func2)
ddf = ddf.log('func2 processing ended')

ddf.compute()

用法 2:

def func1():
   ddf = dd.read_csv('file.csv', npartitions=3)
   ddf = QueuedMapPartitionsWrapper(ddf)    # Either here

   log('In func1') 

   ddf = func11(ddf, 123)
   # ddf = QueuedMapPartitionsWrapper(ddf)  # or here (just before ddf.log)
   ddf = ddf.log('func1 ended')

   ddf.compute()