Dask 失败 with/due 到 Tornado 错误 'too many files open'

Dask failing with/due to Tornado error 'too many files open'

我是 运行ning 从 Anaconda 启动的 Jupyter notebook。尝试初始化分布式 Dask 环境时,抛出以下 Tornado 包错误:

tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
    response = yield self.instantiate()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
    self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 351, in start
    self.init_result_q = init_q = mp_context.Queue()
  File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 102, in Queue
    return Queue(maxsize, ctx=self.get_context())
  File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 41, in __init__
    self._reader, self._writer = connection.Pipe(duplex=False)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 517, in Pipe
    fd1, fd2 = os.pipe()
OSError: [Errno 24] Too many open files
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 143, in _start
    listen_args=self.listen_args)
  File "/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 272, in listen
    self.listener.start()
  File "/anaconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 396, in start
    backlog=backlog)
  File "/anaconda3/lib/python3.7/site-packages/tornado/netutil.py", line 134, in bind_sockets
    sock = socket.socket(af, socktype, proto)
  File "/anaconda3/lib/python3.7/socket.py", line 151, in __init__
    _socket.socket.__init__(self, family, type, proto, fileno)
OSError: [Errno 24] Too many open files
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<timed exec> in <module>

/anaconda3/lib/python3.7/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, **kwargs)
    634             ext(self)
    635 
--> 636         self.start(timeout=timeout)
    637 
    638         from distributed.recreate_exceptions import ReplayExceptionClient

/anaconda3/lib/python3.7/site-packages/distributed/client.py in start(self, **kwargs)
    757             self._started = self._start(**kwargs)
    758         else:
--> 759             sync(self.loop, self._start, **kwargs)
    760 
    761     def __await__(self):

/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/anaconda3/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
    820                 self.cluster = LocalCluster(loop=self.loop, asynchronous=True,
    821                                             **self._startup_kwargs)
--> 822                 yield self.cluster
    823             except (OSError, socket.error) as e:
    824                 if e.errno != errno.EADDRINUSE:

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/asyncio/tasks.py in _wrap_awaitable(awaitable)
    601     that will later be wrapped in a Task by ensure_future().
    602     """
--> 603     return (yield from awaitable.__await__())
    604 
    605 

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py in _start(self, ip, n_workers)
    189         self.scheduler.start(scheduler_address)
    190 
--> 191         yield [self._start_worker(**self.worker_kwargs) for i in range(n_workers)]
    192 
    193         self.status = 'running'

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in callback(f)
    881             for f in children:
    882                 try:
--> 883                     result_list.append(f.result())
    884                 except Exception as e:
    885                     if future.done():

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py in _start_worker(self, death_timeout, **kwargs)
    206               death_timeout=death_timeout,
    207               silence_logs=self.silence_logs, **kwargs)
--> 208         yield w._start()
    209 
    210         self.workers.append(w)

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in _start(self, addr_or_port)
    155 
    156         logger.info('        Start Nanny at: %r', self.address)
--> 157         response = yield self.instantiate()
    158         if response == 'running':
    159             assert self.worker_address

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in instantiate(self, comm)
    224                 result = yield gen.with_timeout(
    225                         timedelta(seconds=self.death_timeout),
--> 226                         self.process.start()
    227                 )
    228             except gen.TimeoutError:

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/anaconda3/lib/python3.7/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    324                 try:
    325                     orig_stack_contexts = stack_context._state.contexts
--> 326                     yielded = next(result)
    327                     if stack_context._state.contexts is not orig_stack_contexts:
    328                         yielded = _create_future()

/anaconda3/lib/python3.7/site-packages/distributed/nanny.py in start(self)
    350 
    351         self.init_result_q = init_q = mp_context.Queue()
--> 352         self.child_stop_q = mp_context.Queue()
    353         uid = uuid.uuid4().hex
    354 

/anaconda3/lib/python3.7/multiprocessing/context.py in Queue(self, maxsize)
    100         '''Returns a queue object'''
    101         from .queues import Queue
--> 102         return Queue(maxsize, ctx=self.get_context())
    103 
    104     def JoinableQueue(self, maxsize=0):

/anaconda3/lib/python3.7/multiprocessing/queues.py in __init__(self, maxsize, ctx)
     39             from .synchronize import SEM_VALUE_MAX as maxsize
     40         self._maxsize = maxsize
---> 41         self._reader, self._writer = connection.Pipe(duplex=False)
     42         self._rlock = ctx.Lock()
     43         self._opid = os.getpid()

/anaconda3/lib/python3.7/multiprocessing/connection.py in Pipe(duplex)
    515             c2 = Connection(s2.detach())
    516         else:
--> 517             fd1, fd2 = os.pipe()
    518             c1 = Connection(fd1, writable=False)
    519             c2 = Connection(fd2, readable=False)

OSError: [Errno 24] Too many open files

[此处][https://github.com/dask/distributed/issues/1941] 提到的 'Tornado' 似乎是问题所在。我的 Anaconda 版本有 Tornado 5.1.1 Python 3.7.3 和 Dask 1.25.1

这是正在使用的代码 运行:

%%time
import pandas as pd
import dask.dataframe as dd
import dask.distributed as dist

client = dist.Client()

几周前,我能够 运行 一些小型分布式 Dask 示例,并且我可以 运行 Dask 在不调用客户端的情况下成功。如果问题出在龙卷风上,是否有解决方法?

如果有人想知道 MacOS 进程文件限制 -

我找到了解决方法。在 MacOS 上,使用命令行更改文件限制似乎只适用于从终端启动。此外,如果您重新启动,限制将重置为原始值(我的机器上默认为 256)。要正确设置限制,您必须在 /etc/Library/LaunchDaemons 中创建文件 'limit.maxfiles.plist' 并重新启动。我从 here 那里得到了这个。这可以解决 'too many files' 错误,但可能只会推迟 Tornado 的问题。

我昨天(5/20/2019)在 master 分支上发现了同样的问题,并发现了这个:https://github.com/dask/distributed/issues/733。对我来说,我只是查看了 dask_scheduler 的底层脚本并将其复制到 Pycharm:

    from distributed.cli.dask_scheduler import go
       if __name__ == '__main__':
         go()

可以启动,很稳定,我已经从命令行为它附加了一个 worker。