并行 python 迭代
Parallel python iteration
我想根据 pandas.DataFrame
中的值创建 class 的多个实例。这个我记下来了。
import itertools
import multiprocessing as mp
import pandas as pd
class Toy:
id_iter = itertools.count(1)
def __init__(self, row):
self.id = self.id_iter.next()
self.type = row['type']
if __name__ == "__main__":
table = pd.DataFrame({
'type': ['a', 'b', 'c'],
'number': [5000, 4000, 30000]
})
for index, row in table.iterrows():
[Toy(row) for _ in range(row['number'])]
多处理尝试
我已经能够通过添加以下内容来并行化这个(某种程度上):
pool = mp.Pool(processes=mp.cpu_count())
m = mp.Manager()
q = m.Queue()
for index, row in table.iterrows():
pool.apply_async([Toy(row) for _ in range(row['number'])])
如果 row['number']
中的数字比 table
中的数字长得多,这似乎会更快。但是在我的实际情况下,table
有几千行,而每个 row['number']
都比较小。
尝试将 table
分解为 cpu_count()
块并在 table 中迭代似乎更明智。但现在我们处于我的 python 技能的边缘。
我已经尝试过 python 口译员对我尖叫的事情,例如:
pool.apply_async(
for index, row in table.iterrows():
[Toy(row) for _ in range(row['number'])]
)
还有 "can't be pickled"
Parallel(n_jobs=4)(
delayed(Toy)([row for _ in range(row['number'])]) \
for index, row in table.iterrows()
)
编辑
这可能让我更接近了一点,但仍然没有。我在一个单独的函数中创建了 class 个实例,
def create_toys(row):
[Toy(row) for _ in range(row['number'])]
....
Parallel(n_jobs=4, backend="threading")(
(create_toys)(row) for i, row in table.iterrows()
)
但我被告知 'NoneType' 对象不可迭代。
我有点不清楚你期望的输出是什么。你只想要一个大列表
[Toy(row_1) ... Toy(row_n)]
每个 Toy(row_i)
出现的次数是多少 row_i.number
?
根据@JD Long 提到的答案,我认为你可以这样做:
def process(df):
L = []
for index, row in table.iterrows():
L += [Toy(row) for _ in range(row['number'])]
return L
table = pd.DataFrame({
'type': ['a', 'b', 'c']*10,
'number': [5000, 4000, 30000]*10
})
p = mp.Pool(processes=8)
split_dfs = np.array_split(table,8)
pool_results = p.map(process, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
result = [a for L in pool_results for a in L]
我想根据 pandas.DataFrame
中的值创建 class 的多个实例。这个我记下来了。
import itertools
import multiprocessing as mp
import pandas as pd
class Toy:
id_iter = itertools.count(1)
def __init__(self, row):
self.id = self.id_iter.next()
self.type = row['type']
if __name__ == "__main__":
table = pd.DataFrame({
'type': ['a', 'b', 'c'],
'number': [5000, 4000, 30000]
})
for index, row in table.iterrows():
[Toy(row) for _ in range(row['number'])]
多处理尝试
我已经能够通过添加以下内容来并行化这个(某种程度上):
pool = mp.Pool(processes=mp.cpu_count())
m = mp.Manager()
q = m.Queue()
for index, row in table.iterrows():
pool.apply_async([Toy(row) for _ in range(row['number'])])
如果 row['number']
中的数字比 table
中的数字长得多,这似乎会更快。但是在我的实际情况下,table
有几千行,而每个 row['number']
都比较小。
尝试将 table
分解为 cpu_count()
块并在 table 中迭代似乎更明智。但现在我们处于我的 python 技能的边缘。
我已经尝试过 python 口译员对我尖叫的事情,例如:
pool.apply_async(
for index, row in table.iterrows():
[Toy(row) for _ in range(row['number'])]
)
还有 "can't be pickled"
Parallel(n_jobs=4)(
delayed(Toy)([row for _ in range(row['number'])]) \
for index, row in table.iterrows()
)
编辑
这可能让我更接近了一点,但仍然没有。我在一个单独的函数中创建了 class 个实例,
def create_toys(row):
[Toy(row) for _ in range(row['number'])]
....
Parallel(n_jobs=4, backend="threading")(
(create_toys)(row) for i, row in table.iterrows()
)
但我被告知 'NoneType' 对象不可迭代。
我有点不清楚你期望的输出是什么。你只想要一个大列表
[Toy(row_1) ... Toy(row_n)]
每个 Toy(row_i)
出现的次数是多少 row_i.number
?
根据@JD Long 提到的答案,我认为你可以这样做:
def process(df):
L = []
for index, row in table.iterrows():
L += [Toy(row) for _ in range(row['number'])]
return L
table = pd.DataFrame({
'type': ['a', 'b', 'c']*10,
'number': [5000, 4000, 30000]*10
})
p = mp.Pool(processes=8)
split_dfs = np.array_split(table,8)
pool_results = p.map(process, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
result = [a for L in pool_results for a in L]