当迭代次数足够多时,Dask 多处理失败,并出现令人尴尬的并行循环,包括调用 MongoDB

Dask multiprocessing fails with embarrassingly parallel for loop including call to MongoDB when number of iterations is high enough

我正在尝试 运行 使用 Dask 多处理在 Python for 循环中进行一种并行模拟。并行化在迭代次数相当少时工作正常,但在迭代次数增加时失败。该问题出现在 Win7(4 核,10 Gb RAM)、Win10(8 核,8 Gb RAM)和 Azure VM 运行ning Windows Server 2016(16 核,32 Gb RAM)上。最慢的,Win7,可以在失败之前经历大多数迭代。可以通过在流程中包含的每个函数结束时添加足够长的休眠时间来缓解此问题,但所需的休眠时间会导致性能非常低,类似于 运行ning sequentially。

我希望有人能在这里帮助我。在此先感谢您的评论和回答!

以下简单代码包含 for 循环的某些阶段并重复错误。

import json
import pandas as pd
from pymongo import MongoClient

# Create random DataFrame
df = pd.DataFrame(np.random.randint(0,100,size=(100,11)), columns=list('ABCDEFGHIJK'))

# Save to Mongo
client = MongoClient()
db = client.errordemo
res = db.errordemo.insert_many(json.loads(df.to_json(orient='records')))
db.client.close()


class ToBeRunParallel:

def __init__(self):
    pass

def functionToBeRunParallel(self, i):

    # Read data from mongo
    with MongoClient() as client:
        db = client.errordemo
        dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))

    # Randomize data
    dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)

    # Sum rows
    dataSum = dataRand.sum(axis=1)

    # Select randomly one of the resulting values and return
    return dataSum.sample().values[0]

在控制台或 Jupyter 中调用函数 functionToBeRunParallel(均失败)。 'errordemo' 是包含 class ToBeRunParallel 的本地模块。 运行在 Azure VM 上运行时,代码在 500 次循环中成功,在 5,000 次循环中失败。

import errordemo
from dask import delayed, compute, multiprocessing

# Determine how many times to loop
rng = range(15000)

# Define empty result lists
resList = []

# Create instance
err = errordemo.ToBeRunParallel()

# Loop in parallel using Dask
for i in rng:
    sampleValue = delayed(err.functionToBeRunParallel)(i)
    resList.append(sampleValue)

# Compute in parallel 
result = compute(*resList, get=multiprocessing.get)

Jupyter中的错误堆栈如下

---------------------------------------------------------------------------
AutoReconnect                             Traceback (most recent call last)
<ipython-input-3-9f535dd4c621> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', '# Determine how many times to loop\nrng = range(50000)\n\n# Define empty result lists\nresList = []\n\n# Create instance\nerr = errordemo.ToBeRunParallel()\n\n# Loop in parallel using Dask\nfor i in rng:\n    sampleValue = delayed(err.functionToBeRunParallel)(i)\n    resList.append(sampleValue)\n    \n# Compute in parallel \nresult = compute(*resList, get=dask.multiprocessing.get)')

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2113             magic_arg_s = self.var_expand(line, stack_depth)
   2114             with self.builtin_trap:
-> 2115                 result = fn(magic_arg_s, cell)
   2116             return result
   2117 

<decorator-gen-60> in time(self, line, cell, local_ns)

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

C:\ProgramData\Anaconda3\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
   1178         else:
   1179             st = clock2()
-> 1180             exec(code, glob, local_ns)
   1181             end = clock2()
   1182             out = None

<timed exec> in <module>()

C:\ProgramData\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    200     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    201     keys = [var._keys() for var in variables]
--> 202     results = get(dsk, keys, **kwargs)
    203 
    204     results_iter = iter(results)

C:\ProgramData\Anaconda3\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
     85         result = get_async(pool.apply_async, len(pool._pool), dsk3, keys,
     86                            get_id=_process_get_id,
---> 87                            dumps=dumps, loads=loads, **kwargs)
     88     finally:
     89         if cleanup:

C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
    498                     _execute_task(task, data)  # Re-execute locally
    499                 else:
--> 500                     raise(remote_exception(res, tb))
    501             state['cache'][key] = res
    502             finish_task(dsk, key, state, results, keyorder.get)

AutoReconnect: localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

Traceback
---------
  File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 266, in execute_task
    result = _execute_task(task, data)
  File "C:\ProgramData\Anaconda3\lib\site-packages\dask\async.py", line 247, in _execute_task
    return func(*args2)
  File "C:\Git_repository\footie\Pipeline\errordemo.py", line 20, in functionToBeRunParallel
    dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))
  File "C:\ProgramData\Anaconda3\lib\site-packages\pandas\core\frame.py", line 981, in from_records
    first_row = next(data)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1090, in next
    if len(self.__data) or self._refresh():
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 1012, in _refresh
    self.__read_concern))
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\cursor.py", line 850, in __send_message
    **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 844, in _send_message_with_response
    exhaust)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\mongo_client.py", line 855, in _reset_on_error
    return func(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 99, in send_message_with_response
    with self.get_socket(all_credentials, exhaust) as sock_info:
  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
    return next(self.gen)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\server.py", line 163, in get_socket
    with self.pool.get_socket(all_credentials, checkout) as sock_info:
  File "C:\ProgramData\Anaconda3\lib\contextlib.py", line 82, in __enter__
    return next(self.gen)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 582, in get_socket
    sock_info = self._get_socket_no_auth()
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 618, in _get_socket_no_auth
    sock_info, from_pool = self.connect(), False
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 555, in connect
    _raise_connection_failure(self.address, error)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pymongo\pool.py", line 65, in _raise_connection_failure
    raise AutoReconnect(msg)

更新:

this post 之后,我创建了一个装饰器来捕获 AutoReconnect 异常,如下所示。与 MongoClient 的参数一起循环工作,但它仍然很慢,是它应该花费的时间的两倍。 (在 Azure VM 上计时):

500次迭代:3.74s
50,000 次迭代:12 分钟 12 秒

def safe_mongocall(call):
    def _safe_mongocall(*args, **kwargs):
        for i in range(5):
            try:
                return call(*args, **kwargs)
            except errors.AutoReconnect:
                sleep(random.random() / 100)
        print('Error: Failed operation!')
    return _safe_mongocall

@safe_mongocall
def functionToBeRunParallel(self, i):

    # Read data from mongo
    with MongoClient(connect=False, maxPoolSize=None, maxIdleTimeMS=100) as client:
         db = client.errordemo
         dataFromMongo = pd.DataFrame.from_records(db.errordemo.find({}, {'_id': 0}))

    # Randomize data
    dataRand = dataFromMongo.apply(pd.to_numeric).apply(rand, volatility=0.1)

    # Sum rows
    dataSum = dataRand.sum(axis=1)

    # Select randomly one of the resulting values and return
    return dataSum.sample().values[0]

实际问题是 TCP/IP 端口耗尽,因此解决方案是避免耗尽。在 article by Microsoft 之后,我将以下注册表项和值添加到 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters:

最大用户端口:65534
TcpTimedWaitDelay: 30