如何在 python 中包装列表函数?
How to wrap list functions in python?
我无法将这个问题准确反映到标题中。
我想无误地使用 list
、func(*args)
和 Pool.map
。
请看下面。
▼代码
def df_parallelize_run(func, arguments):
p = Pool(psutil.cpu_count())
df = p.map(func, arguments)
p.close()
p.join()
return df
def make_lag(df: DataFrame, LAG_DAY: list):
for l in LAG_DAY:
df[f'lag{l}d'] = df.groupby(['id'])['target'].transform(lambda x: x.shift(l))
return df
def wrap_make_lag(args):
return make_lag(*args)
鉴于以上三个功能,我想做以下事情
# df: DataFrame
arguments = (df, [1, 3, 7, 13, 16])
df = df_parallelize_run(wrap_make_lag, arguments)
▼ 错误
in df_parallelize_run(func, arguments)
----> 7 df = pool.map(func, arguments)
in ..../python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
in ..../python3.7/multiprocessing/pool.py in get(self, timeout)
--> 657 raise self._value
TypeError: make_lag() takes 2 positional arguments but 5 were given
我知道这种不匹配的原因(由于拆包列表,[1, 3, 7, 13, 16]
,即 5)。
怎样做才是正确的?如果可能的话,我想让这个列表符合位置参数的约束。如果几乎不可能(list
or Pool.map
),什么是更合适、简单和灵活的方法?
使用pool.starmap
。您为函数的参数生成一个元组列表。在这里,看起来 df 每次都是相同的,arg 是参数中的每个元素。
arglist = [(df, arg) for arg in arguments]
with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
results = p.starmap(make_lag, arglist)
已解决。我按照以下方式重写了。
▼功能
def df_parallelize_run(func, arglist):
with Pool(psutil.cpu_count()) as p:
# concat((lots of returned df))
results = pd.concat(p.starmap(func, arglist), 1)
return results
def make_lag(df, lag):
if not isinstance(lag, list):
lag = [lag]
# it doesn't have to be for-loop when you use multiprocessing
for l in lag:
col_name = f'lag{l}d'
df[col_name] = df.groupby(['item_id', 'store_id'])['sales'].transform(lambda x: x.shift(l))
return df[[col_name]]
其他功能
def make_lag_roll(df, lag, roll):
col_name = f'lag{lag}_roll_mean_{roll}'
df[col_name] = df.groupby(['id'])['target'].transform(lambda x: x.shift(lag).rolling(roll).mean())
return df[[col_name]]
▼使用方法
arglist = [(df[['id', 'target']], arg) for arg in range(1, 36)]
lag_df = df_parallelize_run(make_lag, arglist)
arglist_roll = [(df[['id', 'target']], lag, roll)
for lag in range(1, 36)
for roll in [7, 14, 28]]
lag_roll_df = df_parallelize_run(make_lag_roll, arglist_roll)
我无法将这个问题准确反映到标题中。
我想无误地使用 list
、func(*args)
和 Pool.map
。
请看下面。
▼代码
def df_parallelize_run(func, arguments):
p = Pool(psutil.cpu_count())
df = p.map(func, arguments)
p.close()
p.join()
return df
def make_lag(df: DataFrame, LAG_DAY: list):
for l in LAG_DAY:
df[f'lag{l}d'] = df.groupby(['id'])['target'].transform(lambda x: x.shift(l))
return df
def wrap_make_lag(args):
return make_lag(*args)
鉴于以上三个功能,我想做以下事情
# df: DataFrame
arguments = (df, [1, 3, 7, 13, 16])
df = df_parallelize_run(wrap_make_lag, arguments)
▼ 错误
in df_parallelize_run(func, arguments)
----> 7 df = pool.map(func, arguments)
in ..../python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
--> 268 return self._map_async(func, iterable, mapstar, chunksize).get()
in ..../python3.7/multiprocessing/pool.py in get(self, timeout)
--> 657 raise self._value
TypeError: make_lag() takes 2 positional arguments but 5 were given
我知道这种不匹配的原因(由于拆包列表,[1, 3, 7, 13, 16]
,即 5)。
怎样做才是正确的?如果可能的话,我想让这个列表符合位置参数的约束。如果几乎不可能(list
or Pool.map
),什么是更合适、简单和灵活的方法?
使用pool.starmap
。您为函数的参数生成一个元组列表。在这里,看起来 df 每次都是相同的,arg 是参数中的每个元素。
arglist = [(df, arg) for arg in arguments]
with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
results = p.starmap(make_lag, arglist)
已解决。我按照以下方式重写了。
▼功能
def df_parallelize_run(func, arglist):
with Pool(psutil.cpu_count()) as p:
# concat((lots of returned df))
results = pd.concat(p.starmap(func, arglist), 1)
return results
def make_lag(df, lag):
if not isinstance(lag, list):
lag = [lag]
# it doesn't have to be for-loop when you use multiprocessing
for l in lag:
col_name = f'lag{l}d'
df[col_name] = df.groupby(['item_id', 'store_id'])['sales'].transform(lambda x: x.shift(l))
return df[[col_name]]
其他功能
def make_lag_roll(df, lag, roll):
col_name = f'lag{lag}_roll_mean_{roll}'
df[col_name] = df.groupby(['id'])['target'].transform(lambda x: x.shift(lag).rolling(roll).mean())
return df[[col_name]]
▼使用方法
arglist = [(df[['id', 'target']], arg) for arg in range(1, 36)]
lag_df = df_parallelize_run(make_lag, arglist)
arglist_roll = [(df[['id', 'target']], lag, roll)
for lag in range(1, 36)
for roll in [7, 14, 28]]
lag_roll_df = df_parallelize_run(make_lag_roll, arglist_roll)