uWSGI 和 joblib 信号量:Joblib 将以串行模式运行

uWSGI and joblib Semaphore: Joblib will operate in serial mode

我在一个 Flask 应用程序中 运行ning joblib,它与由 supervisord 启动的 uWSGI(启动时启用线程)一起生活在 Docker 容器中。

网络服务器启动显示如下错误:

unable to load configuration from from multiprocessing.semaphore_tracker import main;main(15)
/usr/local/lib/python3.5/dist-packages/sklearn/externals/joblib/_multiprocessing_helpers.py:38: UserWarning:

[Errno 32] Broken pipe.  joblib will operate in serial mode

知道如何解决这个问题并并行制作 joblib 运行 吗?谢谢!


docker 容器中安装了以下软件包:

pytest==4.0.1
pytest-cov==2.6.0
flake8==3.6.0
Cython==0.29.3
numpy==1.16.1
pandas==0.24.0
scikit-learn==0.20.2
fancyimpute==0.4.2
scikit-garden==0.1.3
category_encoders==1.3.0
boto3==1.9.86
joblib==0.13.1
dash==0.37.0
dash-renderer==0.18.0
dash-core-components==0.43.1
dash-table==3.4.0
dash-html-components==0.13.5
dash-auth==1.3.2
Flask-Caching==1.4.0
plotly==3.6.1
APScheduler==3.5.3

编辑

这些问题是由于 uWSGI、nginx 或 supervisord。在 dev/shm 上缺少权限不是问题,因为如果我直接 运行 烧瓶服务器就可以创建信号量。在下面找到三个服务的配置文件。免责声明,我是网络服务器菜鸟,配置是通过从不同的博客复制和粘贴而诞生的,只是为了让它工作:-D

所以这是我的 uwsgi 配置:

[uwsgi]
module = prism_dash_frontend.__main__
callable = server

uid = nginx
gid = nginx

plugins = python3

socket = /tmp/uwsgi.sock
chown-socket = nginx:nginx
chmod-socket = 664

# set cheaper algorithm to use, if not set default will be used
cheaper-algo = spare

# minimum number of workers to keep at all times
cheaper = 3

# number of workers to spawn at startup
cheaper-initial = 5

# maximum number of workers that can be spawned
workers = 5

# how many workers should be spawned at a time
cheaper-step = 1
processes = 5

die-on-term = true
enable-threads = true

nginx 配置:

# based on default config of nginx 1.12.1
# Define the user that will own and run the Nginx server
user nginx;
# Define the number of worker processes; recommended value is the number of
# cores that are being used by your server
# auto will default to number of vcpus/cores
worker_processes auto;

# altering default pid file location
pid /tmp/nginx.pid;

# turn off daemon mode to be watched by supervisord
daemon off;

# Enables the use of JIT for regular expressions to speed-up their processing.
pcre_jit on;

# Define the location on the file system of the error log, plus the minimum
# severity to log messages for
error_log /var/log/nginx/error.log warn;

# events block defines the parameters that affect connection processing.
events {
    # Define the maximum number of simultaneous connections that can be opened by a worker process
    worker_connections  1024;
}


# http block defines the parameters for how NGINX should handle HTTP web traffic
http {
    # Include the file defining the list of file types that are supported by NGINX
    include /etc/nginx/mime.types;
    # Define the default file type that is returned to the user
    default_type text/html;

    # Don't tell nginx version to clients.
    server_tokens off;

    # Specifies the maximum accepted body size of a client request, as
    # indicated by the request header Content-Length. If the stated content
    # length is greater than this size, then the client receives the HTTP
    # error code 413. Set to 0 to disable.
    client_max_body_size 0;

    # Define the format of log messages.
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                        '$status $body_bytes_sent "$http_referer" '
                        '"$http_user_agent" "$http_x_forwarded_for"';

    # Define the location of the log of access attempts to NGINX
    access_log /var/log/nginx/access.log  main;

    # Define the parameters to optimize the delivery of static content
    sendfile       on;
    tcp_nopush     on;
    tcp_nodelay    on;

    # Define the timeout value for keep-alive connections with the client
    keepalive_timeout  65;

    # Define the usage of the gzip compression algorithm to reduce the amount of data to transmit
    #gzip  on;

    # Include additional parameters for virtual host(s)/server(s)
    include /etc/nginx/conf.d/*.conf;
}

supervisord 配置:

[supervisord]
nodaemon=true

[program:uwsgi]
command=/usr/bin/uwsgi --ini /etc/uwsgi/uwsgi.ini
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

[program:nginx]
command=/usr/sbin/nginx
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

第二次编辑

从 Python 3.5 移动到 3.7.2 后,错误的性质略有变化:

unable to load configuration from from multiprocessing.semaphore_tracker import main;main(15)
/usr/local/lib/python3.7/multiprocessing/semaphore_tracker.py:55: UserWarning:

semaphore_tracker: process died unexpectedly, relaunching.  Some semaphores might leak.

unable to load configuration from from multiprocessing.semaphore_tracker import main;main(15)

非常感谢帮助,目前这对我来说是一个很大的障碍:-/


第三次编辑:

HERE on my github account 是一个最小的、完整的、可验证的例子。

您可以 运行 通过 make build 后跟 make run.

它将显示以下日志消息:

unable to load configuration from from multiprocessing.semaphore_tracker import main;main(14)

并在您访问 http://127.0.0.1:8080/ 后崩溃并出现以下错误:

exception calling callback for <Future at 0x7fbc520c7eb8 state=finished raised TerminatedWorkerError>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "/usr/local/lib/python3.7/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "/usr/local/lib/python3.7/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/usr/local/lib/python3.7/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "/usr/local/lib/python3.7/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/usr/local/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "/usr/local/lib/python3.7/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
    fn, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker. The exit codes of the workers are {EXIT(1), EXIT(1), EXIT(1), EXIT(1)}

您的映像似乎未启用信号量:Joblib 检查 multiprocessing.Semaphore() 并且只有 root 对 /dev/shm 中的共享内存具有 read/write 权限。 看看 this question and this answer.

这是 运行 在我的一个容器中。

$ ls -ld /dev/shm
drwxrwxrwt 2 root root 40 Feb 19 15:23 /dev/shm

如果您 运行 不是 root,您应该更改 /dev/shm 的权限。要设置正确的权限,您需要修改Docker图像中的/etc/fstab

none /dev/shm tmpfs rw,nosuid,nodev,noexec 0 0

好吧,我确实找到了问题的答案。它解决了能够 运行 一个带有 supervisor 和 nginx 的 joblib 依赖库 docker 的问题。然而,它不是很令人满意。因此,我不会接受我自己的答案,但我将其张贴在这里以防其他人遇到同样的问题并且需要找到 okayish 修复。

解决方案是用 gunicorn 替换 uWSGI。好吧,至少我现在知道这是谁的错了。我仍然很感激使用 uWSGI instaed of gunicorn 解决问题的答案。

这真是一个兔子洞。

Github 上的 joblib 问题页面有类似的 joblib failing with Uwsgi 帖子。但大多数是针对较旧的 multiprocessing 后端。新的 loky 后端应该可以解决这些问题。

multiprocessing 后端有 PR 解决了 uwsgi 的这个问题:

joblib.Parallel(n_jobs=4,backend="multiprocessing")(joblib.delayed(sqrt)(i ** 2) for i in range(10))

但它有时会随机失败并返回到上述 PR 试图解决的同一问题。

进一步挖掘表明,目前的后端 loky 默认情况下在进程上并行化 (docs)。但是这些进程没有共享内存访问,因此需要序列化和排队的通道。这可能是 uWSGI 失败而 gunicorn 工作的原因。

所以我尝试切换到线程而不是进程:

joblib.Parallel(n_jobs=4,prefer="threads")(joblib.delayed(sqrt)(i ** 2) for i in range(10))

而且有效:)