使用islice和multiprocessing批量读取和处理大文本文件
Reading and processing large text file in batches using islice and multiprocessing
code 没有 return 任何东西,它永远保持 运行。请帮助代码片段。仅供参考:我是第一次使用 multiprocessing
。
我的本地内存不足,因此从 zip 文件中提取数据。我的想法是使用 islice
一次读取 n 行并使用 process_logBatch()
.
处理它们
运行 此代码在 windows 机器上 - Jupyter Notebook。
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
def collect_results(result):
results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
results = []
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
while True:
print(f.closed)
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results)
有几个问题。一个是 Windows 您需要一个 if __name__ == '__main__':
语句来保护主模块,如多处理模块 documentation.
中标题为 "Safe importing of main module" 的部分所示和讨论的那样
然而,第二件事就没那么容易解决了。每个进程都在自己的内存 space 中运行,因此它们并不都具有相同的 results
列表。为了避免这种情况,我改用 Pool.map_async()
并在所有子流程结束时收集结果。
这是我认为可行的方法(基于您的示例代码):
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
import random # Added.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
if __name__ == '__main__':
# Not longer needed.
# def collect_results(result):
# results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
counter = 0 # Added to avoid NameError because undefined.
while True:
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
results = pool.map_async(process_logBatch, next_n_lines)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results.get())
code 没有 return 任何东西,它永远保持 运行。请帮助代码片段。仅供参考:我是第一次使用 multiprocessing
。
我的本地内存不足,因此从 zip 文件中提取数据。我的想法是使用 islice
一次读取 n 行并使用 process_logBatch()
.
运行 此代码在 windows 机器上 - Jupyter Notebook。
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
def collect_results(result):
results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
results = []
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
while True:
print(f.closed)
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
pool.apply_async(process_logBatch, args=(next_n_lines, ), callback=collect_results)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results)
有几个问题。一个是 Windows 您需要一个 if __name__ == '__main__':
语句来保护主模块,如多处理模块 documentation.
然而,第二件事就没那么容易解决了。每个进程都在自己的内存 space 中运行,因此它们并不都具有相同的 results
列表。为了避免这种情况,我改用 Pool.map_async()
并在所有子流程结束时收集结果。
这是我认为可行的方法(基于您的示例代码):
import multiprocessing as mp
import zipfile
from itertools import islice
import time
#import pandas as pd # Unused.
import random # Added.
def process_logBatch(next_n_lines):
l = [random.randint(0,100) for i in range(5)]
print(l)
return l
if __name__ == '__main__':
# Not longer needed.
# def collect_results(result):
# results.extend(result)
pool = mp.Pool(processes=(mp.cpu_count()-1))
with zipfile.ZipFile('log.zip', 'r') as z:
with z.open('log.txt') as f:
counter = 0 # Added to avoid NameError because undefined.
while True:
next_n_lines = [x.decode("utf-8").strip() for x in islice(f, 2)]
if not next_n_lines:
break
try:
results = pool.map_async(process_logBatch, next_n_lines)
except Exception as e:
print(e)
if counter == 2:
break
pool.close()
pool.join()
print(results.get())