将 dask map_partitions 与打印功能同步
Synchronize dask map_partitions with print functions
我有以下代码:
def func1(df):
x = 1
print('Processing func1')
return x
def func2(df):
x = 2
print('Processing func2')
return x
ddf = from_pandas(df, npartitions=3)
print('func1 processing started')
ddf.map_partitions(func1)
print('func1 processing ended')
print('func2 processing started')
ddf.map_partitions(func2)
print('func2 processing ended')
ddf.compute()
我正在寻找一种方法来记录(或在本例中为打印)每个地图分区执行之前、期间和之后的步骤。
但是,由于 ddf.compute()
在打印函数后触发了 map_partitions,我得到如下信息:
func1 processing started
func1 processing ended
func2 processing started
func2 processing ended
Processing func1
Processing func1
Processing func1
Processing func2
Processing func2
Processing func2
相反,我需要
func1 processing started
Processing func1
Processing func1
Processing func1
func1 processing ended
func2 processing started
Processing func2
Processing func2
Processing func2
func2 processing ended
如何进行这项工作?注意:我的示例使用 print
,但我想将 map_partitions 与 any python 函数同步。
更新
更现实的场景:
def func1():
df = dd.read_csv('file.csv', npartitions=3)
log('In func1')
df = func11(df,123)
log('func1 ended')
ddf.compute()
def func11(df,x):
log('In func11')
# ... do stuff with df
df = func111(df,x)
return df
def func111(df, x):
log('In func111')
df = df.map_partitions(func1111)
return df
def func1111(df):
df['abc'] = df['abc'] * 2
log('Processing func1111')
return df
log(msg):
print(msg) # or log in file or DB
要求是打印:
In func1
In func11
In func111
Processing func1111
Processing func1111
Processing func1111
func1 ended
一种可能性是创建一个包含所需操作序列的新函数:
def my_flow(df):
print('func1 processing started')
df = func1(df)
print('func1 processing ended')
print('func2 processing started')
df = func2(df)
print('func2 processing ended')
return df
ddf_new = ddf.map_partitions(my_flow)
或许您可以尝试用装饰器包装您的函数。如果您将所需的日志记录行为放在装饰器中,那么您就不必在代码中放置一堆打印语句。
import numpy as np
import pandas as pd
import dask.dataframe as dd
import time
from functools import wraps
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time.time()
print("Starting :%r" % f.__name__)
result = f(*args, **kw)
te = time.time()
print('Completed :%r took: %2.2f sec' % (f.__name__, te-ts))
return result
return wrap
@timing
def func1():
df = pd.DataFrame(np.random.random([3000, 10]))
ddf = dd.from_pandas(df, npartitions=3)
time.sleep(1)
ddf = func11(ddf, None)
df = ddf.compute()
return df
@timing
def func11(ddf, x):
time.sleep(1)
ddf = func111(ddf, None)
return ddf
@timing
def func1111(df):
df = df * 2
time.sleep(1)
return df
@timing
def func111(ddf, x):
ddf = ddf.map_partitions(func1111)
time.sleep(1)
return ddf
ddf = func1()
这给出了输出:
Starting :'func1'
Starting :'func11'
Starting :'func111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func111' took: 2.01 sec
Completed :'func11' took: 3.01 sec
Starting :'func1111'
Starting :'func1111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1' took: 5.02 sec
您可以将 ddf
包装到队列 ddf.map_partitions()
和 log()
以及 ddf.persist()
和 wait(ddf)
。
from dask.distributed import wait
class QueuedMapPartitionsWrapper:
def __init__(self, ddf, queue=None):
self.ddf = ddf
self.queue = queue or []
def map_partitions(self, func, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(True, func, args, kwargs)])
def log(self, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(False, log, args, kwargs)])
def compute(self):
ddf = self.ddf
for (map_partitions, func, args, kwargs) in self.queue:
if map_partitions:
ddf = ddf.map_partitions(func, *args, **kwargs)
else:
ddf = ddf.persist()
wait(ddf)
func(*args, **kwargs)
return ddf.compute()
用法 1:
ddf = dd.from_pandas(df, npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf)
ddf = ddf.log('func1 processing started')
ddf = ddf.map_partitions(func1)
ddf = ddf.log('func1 processing ended')
ddf = ddf.log('func2 processing started')
ddf = ddf.map_partitions(func2)
ddf = ddf.log('func2 processing ended')
ddf.compute()
用法 2:
def func1():
ddf = dd.read_csv('file.csv', npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf) # Either here
log('In func1')
ddf = func11(ddf, 123)
# ddf = QueuedMapPartitionsWrapper(ddf) # or here (just before ddf.log)
ddf = ddf.log('func1 ended')
ddf.compute()
我有以下代码:
def func1(df):
x = 1
print('Processing func1')
return x
def func2(df):
x = 2
print('Processing func2')
return x
ddf = from_pandas(df, npartitions=3)
print('func1 processing started')
ddf.map_partitions(func1)
print('func1 processing ended')
print('func2 processing started')
ddf.map_partitions(func2)
print('func2 processing ended')
ddf.compute()
我正在寻找一种方法来记录(或在本例中为打印)每个地图分区执行之前、期间和之后的步骤。
但是,由于 ddf.compute()
在打印函数后触发了 map_partitions,我得到如下信息:
func1 processing started
func1 processing ended
func2 processing started
func2 processing ended
Processing func1
Processing func1
Processing func1
Processing func2
Processing func2
Processing func2
相反,我需要
func1 processing started
Processing func1
Processing func1
Processing func1
func1 processing ended
func2 processing started
Processing func2
Processing func2
Processing func2
func2 processing ended
如何进行这项工作?注意:我的示例使用 print
,但我想将 map_partitions 与 any python 函数同步。
更新
更现实的场景:
def func1():
df = dd.read_csv('file.csv', npartitions=3)
log('In func1')
df = func11(df,123)
log('func1 ended')
ddf.compute()
def func11(df,x):
log('In func11')
# ... do stuff with df
df = func111(df,x)
return df
def func111(df, x):
log('In func111')
df = df.map_partitions(func1111)
return df
def func1111(df):
df['abc'] = df['abc'] * 2
log('Processing func1111')
return df
log(msg):
print(msg) # or log in file or DB
要求是打印:
In func1
In func11
In func111
Processing func1111
Processing func1111
Processing func1111
func1 ended
一种可能性是创建一个包含所需操作序列的新函数:
def my_flow(df):
print('func1 processing started')
df = func1(df)
print('func1 processing ended')
print('func2 processing started')
df = func2(df)
print('func2 processing ended')
return df
ddf_new = ddf.map_partitions(my_flow)
或许您可以尝试用装饰器包装您的函数。如果您将所需的日志记录行为放在装饰器中,那么您就不必在代码中放置一堆打印语句。
import numpy as np
import pandas as pd
import dask.dataframe as dd
import time
from functools import wraps
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time.time()
print("Starting :%r" % f.__name__)
result = f(*args, **kw)
te = time.time()
print('Completed :%r took: %2.2f sec' % (f.__name__, te-ts))
return result
return wrap
@timing
def func1():
df = pd.DataFrame(np.random.random([3000, 10]))
ddf = dd.from_pandas(df, npartitions=3)
time.sleep(1)
ddf = func11(ddf, None)
df = ddf.compute()
return df
@timing
def func11(ddf, x):
time.sleep(1)
ddf = func111(ddf, None)
return ddf
@timing
def func1111(df):
df = df * 2
time.sleep(1)
return df
@timing
def func111(ddf, x):
ddf = ddf.map_partitions(func1111)
time.sleep(1)
return ddf
ddf = func1()
这给出了输出:
Starting :'func1'
Starting :'func11'
Starting :'func111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func111' took: 2.01 sec
Completed :'func11' took: 3.01 sec
Starting :'func1111'
Starting :'func1111'
Starting :'func1111'
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1111' took: 1.00 sec
Completed :'func1' took: 5.02 sec
您可以将 ddf
包装到队列 ddf.map_partitions()
和 log()
以及 ddf.persist()
和 wait(ddf)
。
from dask.distributed import wait
class QueuedMapPartitionsWrapper:
def __init__(self, ddf, queue=None):
self.ddf = ddf
self.queue = queue or []
def map_partitions(self, func, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(True, func, args, kwargs)])
def log(self, *args, **kwargs):
return self.__class__(self.ddf, self.queue + [(False, log, args, kwargs)])
def compute(self):
ddf = self.ddf
for (map_partitions, func, args, kwargs) in self.queue:
if map_partitions:
ddf = ddf.map_partitions(func, *args, **kwargs)
else:
ddf = ddf.persist()
wait(ddf)
func(*args, **kwargs)
return ddf.compute()
用法 1:
ddf = dd.from_pandas(df, npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf)
ddf = ddf.log('func1 processing started')
ddf = ddf.map_partitions(func1)
ddf = ddf.log('func1 processing ended')
ddf = ddf.log('func2 processing started')
ddf = ddf.map_partitions(func2)
ddf = ddf.log('func2 processing ended')
ddf.compute()
用法 2:
def func1():
ddf = dd.read_csv('file.csv', npartitions=3)
ddf = QueuedMapPartitionsWrapper(ddf) # Either here
log('In func1')
ddf = func11(ddf, 123)
# ddf = QueuedMapPartitionsWrapper(ddf) # or here (just before ddf.log)
ddf = ddf.log('func1 ended')
ddf.compute()