使用多处理时截断文件头

Truncated file header while using multiprocessing

当我运行行:

def book_processing(pair, pool_length):
    p = Pool(len(pool_length)*3)
    temp_parameters = partial(book_call_mprocess, pair)
    p.map_async(temp_parameters, pool_length).get(999999)
    p.close()                                 
    p.join()
    return exchange_books

我收到以下错误:

Traceback (most recent call last):
  File "test_code.py", line 214, in <module>
    current_books = book_call.book_processing(cp, book_list)
  File "/home/user/Desktop/book_call.py", line 155, in book_processing
    p.map_async(temp_parameters, pool_length).get(999999)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
zipfile.BadZipfile: Truncated file header

我觉得好像有一些正在使用的资源在上一个循环中没有关闭,但我不确定如何关闭它(仍在学习多处理库)。仅当我的代码相对较快地(在同一分钟内)重复此部分时,才会出现此错误。这种情况不常发生,但一旦发生就很明显。

编辑(添加 book_call 代码):

def book_call_mprocess(currency_pair, ex_list):

    polo_error = 0
    live_error = 0
    kraken_error = 0
    gdax_error = 0

    ex_list = set([ex_list])

    ex_Polo = 'Polo'
    ex_Live = 'Live'
    ex_GDAX = 'GDAX'
    ex_Kraken = 'Kraken'

    cp_polo = 'BTC_ETH'
    cp_kraken = 'XETHXXBT'
    cp_live = 'ETH/BTC'
    cp_GDAX = 'ETH-BTC'

    # Instances
    polo_instance = poloapi.poloniex(polo_key, polo_secret)
    fookraken = krakenapi.API(kraken_key, kraken_secret)
    publicClient = GDAX.PublicClient()

    flag = False
    while not flag:
        flag = False
        err = False

        # Polo Book

        try:
            if ex_Polo in ex_list:
                polo_books = polo_instance.returnOrderBook(cp_polo)
                exchange_books['Polo'] = polo_books
        except:
            err = True
            polo_error = 1

        # Livecoin

        try:
            if ex_Live in ex_list:
                method = "/exchange/order_book"
                live_books = OrderedDict([('currencyPair', cp_live)])
                encoded_data = urllib.urlencode(live_books)
                sign = hmac.new(live_secret, msg=encoded_data, digestmod=hashlib.sha256).hexdigest().upper()
                headers = {"Api-key": live_key, "Sign": sign}
                conn = httplib.HTTPSConnection(server)
                conn.request("GET", method + '?' + encoded_data, '', headers)
                response = conn.getresponse()
                live_books = json.load(response)
                conn.close()
                exchange_books['Live'] = live_books
        except:
            err = True
            live_error = 1

        # Kraken

        try:
            if ex_Kraken in ex_list:
                kraken_books = fookraken.query_public('Depth', {'pair': cp_kraken})
                exchange_books['Kraken'] = kraken_books
        except:
            err = True
            kraken_error = 1

        # GDAX books

        try:
            if ex_GDAX in ex_list:
                gdax_books = publicClient.getProductOrderBook(level=2, product=cp_GDAX)
                exchange_books['GDAX'] = gdax_books
        except:
            err = True
            gdax_error = 1

        flag = True
        if err:
            flag = False
            err = False
            error_list = ['Polo', polo_error, 'Live', live_error, 'Kraken', kraken_error, 'GDAX', gdax_error]
            print_to_excel('excel/error_handler.xlsx', 'Book Call Errors', error_list)
            print "Holding..."
            time.sleep(30)
        return exchange_books


def print_to_excel(workbook, worksheet, data_list):
    ts = str(datetime.datetime.now()).split('.')[0]
    data_list = [ts] + data_list
    wb = load_workbook(workbook)
    if worksheet == 'active':
        ws = wb.active
    else:
        ws = wb[worksheet]
    ws.append(data_list)
    wb.save(workbook)

问题出在函数print_to_excel

这里更具体:

wb = load_workbook(workbook)

如果两个进程同时运行使用此函数,您将运行进入以下竞争条件:

  • 进程 1 想要打开 error_handler.xlsx,因为它不存在,所以创建一个空文件
  • 进程 2 想要打开 error_handler.xlsx,它 确实 存在,因此它尝试读取它,但它仍然是空的。由于 xlsx 格式只是一个由一堆 XML 文件组成的 zip 文件,该过程需要一个有效的 ZIP header,但它没有找到它并忽略了 zipfile.BadZipfile: Truncated file header

虽然你的错误消息看起来很奇怪,但我希望在调用堆栈中看到 print_to_excelload_workbook

无论如何,既然您确认问题确实出在 XLSX 处理上,您可以

  • 通过tempfile为每个进程生成一个新文件名
  • 使用locking确保一次只有一个进程运行s print_to_excel