使用 map_async 写入 csv 文件

writing to csv file using map_async

我有以下代码:

#!/usr/bin/env python

def do_job(row):
  # COMPUTING INTENSIVE OPERATION
  sleep(1)
  row.append(int(row[0])**2)

  # WRITING TO FILE - ATOMICITY ENSURED
  semaphore.acquire()
  print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
  csvWriter.writerow(row)
  print "Inside semaphore after writing to file"
  semaphore.release()

  # RETURNING VALUE
  return row

def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # COMPUTING INTENSIVE OPERATIONS
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    results = p.map_async(do_job, csvReader, chunksize = 10)

  except KeyboardInterrupt:
    p.close()
    p.terminate()
    p.join()

  # WAIT FOR RESULTS
  # results.get()
  p.close()
  p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

if __name__ == '__main__':
  import csv
  from time import sleep
  from multiprocessing import Pool
  from multiprocessing import cpu_count
  from multiprocessing import current_process
  from multiprocessing import Semaphore
  from pprint import pprint as pp
  import calendar
  import time

  SCRIPT_START_TIME  = calendar.timegm(time.gmtime())
  inputFile  = "input.csv"
  outputFile = "output.csv"
  semaphore = Semaphore(1)

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')
  csvWriter.writerow(["before","calling","multiprocessing"])
  parallel_csv_processing(inputFile, cpuCount = cpu_count())
  csvWriter.writerow(["after","calling","multiprocessing"])

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  SCRIPT_STOP_TIME   = calendar.timegm(time.gmtime())
  SCRIPT_DURATION    = SCRIPT_STOP_TIME - SCRIPT_START_TIME
  print "Script duration:    %s seconds" % SCRIPT_DURATION

在 运行 之后,终端上的输出如下:

Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration:    10 seconds

input.csv的内容如下:

0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27

output.csv 的创建内容如下:

before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing

为什么 parallel_csv_processing 没有任何内容写入 output.csvdo_job 方法?

您的进程静默失败并出现异常 - 具体来说,在生成的进程中,脚本没有 csvWriter 的值,因为它们每个都在单独的 python 解释器中,并且没有 运行 main() - 这是故意的,你不希望子进程到 运行 main。 do_job() 函数只能访问您在 map_async() 调用中显式传递给它的值,并且您没有传递 csvWriter。即使你是我不确定它是否有效,也不知道文件句柄是否在 main 和 multiprocessing 创建的进程之间共享。

在 do_job 中的代码周围放置一个 try/except,您将看到异常。

def do_job(row):
  try:
      # COMPUTING INTENSIVE OPERATION
      sleep(1)
      row.append(int(row[0])**2)

      # WRITING TO FILE - ATOMICITY ENSURED
      semaphore.acquire()
      print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
      csvWriter.writerow(row)
      print "Inside semaphore after writing to file"
      semaphore.release()

      # RETURNING VALUE
      return row
  except:
      print "exception"

显然在实际代码中应该正确处理异常,但是如果你 运行 现在你会看到每次调用 do_job.

都会打印异常

在 Python 2.7 标准库文档中的“16.6.1.4. 进程间共享状态”标题下查看多处理文档以获得更多指导。