使用 map_async 写入 csv 文件
writing to csv file using map_async
我有以下代码:
#!/usr/bin/env python
def do_job(row):
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rb")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# COMPUTING INTENSIVE OPERATIONS
try:
p = Pool(processes = cpuCount)
# results = p.map(do_job, csvReader, chunksize = 10)
results = p.map_async(do_job, csvReader, chunksize = 10)
except KeyboardInterrupt:
p.close()
p.terminate()
p.join()
# WAIT FOR RESULTS
# results.get()
p.close()
p.join()
# CLOSE FH FOR READING INPUT
inputFH.close()
if __name__ == '__main__':
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from multiprocessing import Semaphore
from pprint import pprint as pp
import calendar
import time
SCRIPT_START_TIME = calendar.timegm(time.gmtime())
inputFile = "input.csv"
outputFile = "output.csv"
semaphore = Semaphore(1)
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
csvWriter.writerow(["before","calling","multiprocessing"])
parallel_csv_processing(inputFile, cpuCount = cpu_count())
csvWriter.writerow(["after","calling","multiprocessing"])
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
SCRIPT_STOP_TIME = calendar.timegm(time.gmtime())
SCRIPT_DURATION = SCRIPT_STOP_TIME - SCRIPT_START_TIME
print "Script duration: %s seconds" % SCRIPT_DURATION
在 运行 之后,终端上的输出如下:
Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration: 10 seconds
input.csv
的内容如下:
0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27
output.csv
的创建内容如下:
before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing
为什么 parallel_csv_processing
没有任何内容写入 output.csv
。 do_job
方法?
您的进程静默失败并出现异常 - 具体来说,在生成的进程中,脚本没有 csvWriter 的值,因为它们每个都在单独的 python 解释器中,并且没有 运行 main() - 这是故意的,你不希望子进程到 运行 main。 do_job() 函数只能访问您在 map_async() 调用中显式传递给它的值,并且您没有传递 csvWriter。即使你是我不确定它是否有效,也不知道文件句柄是否在 main 和 multiprocessing 创建的进程之间共享。
在 do_job 中的代码周围放置一个 try/except,您将看到异常。
def do_job(row):
try:
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
except:
print "exception"
显然在实际代码中应该正确处理异常,但是如果你 运行 现在你会看到每次调用 do_job.
都会打印异常
在 Python 2.7 标准库文档中的“16.6.1.4. 进程间共享状态”标题下查看多处理文档以获得更多指导。
我有以下代码:
#!/usr/bin/env python
def do_job(row):
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rb")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# COMPUTING INTENSIVE OPERATIONS
try:
p = Pool(processes = cpuCount)
# results = p.map(do_job, csvReader, chunksize = 10)
results = p.map_async(do_job, csvReader, chunksize = 10)
except KeyboardInterrupt:
p.close()
p.terminate()
p.join()
# WAIT FOR RESULTS
# results.get()
p.close()
p.join()
# CLOSE FH FOR READING INPUT
inputFH.close()
if __name__ == '__main__':
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from multiprocessing import Semaphore
from pprint import pprint as pp
import calendar
import time
SCRIPT_START_TIME = calendar.timegm(time.gmtime())
inputFile = "input.csv"
outputFile = "output.csv"
semaphore = Semaphore(1)
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
csvWriter.writerow(["before","calling","multiprocessing"])
parallel_csv_processing(inputFile, cpuCount = cpu_count())
csvWriter.writerow(["after","calling","multiprocessing"])
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
SCRIPT_STOP_TIME = calendar.timegm(time.gmtime())
SCRIPT_DURATION = SCRIPT_STOP_TIME - SCRIPT_START_TIME
print "Script duration: %s seconds" % SCRIPT_DURATION
在 运行 之后,终端上的输出如下:
Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration: 10 seconds
input.csv
的内容如下:
0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27
output.csv
的创建内容如下:
before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing
为什么 parallel_csv_processing
没有任何内容写入 output.csv
。 do_job
方法?
您的进程静默失败并出现异常 - 具体来说,在生成的进程中,脚本没有 csvWriter 的值,因为它们每个都在单独的 python 解释器中,并且没有 运行 main() - 这是故意的,你不希望子进程到 运行 main。 do_job() 函数只能访问您在 map_async() 调用中显式传递给它的值,并且您没有传递 csvWriter。即使你是我不确定它是否有效,也不知道文件句柄是否在 main 和 multiprocessing 创建的进程之间共享。
在 do_job 中的代码周围放置一个 try/except,您将看到异常。
def do_job(row):
try:
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
except:
print "exception"
显然在实际代码中应该正确处理异常,但是如果你 运行 现在你会看到每次调用 do_job.
都会打印异常在 Python 2.7 标准库文档中的“16.6.1.4. 进程间共享状态”标题下查看多处理文档以获得更多指导。