WSL:ERROR/MainProcess] 消费者:无法连接到 amqp://guest:**@127.0.0.1:5672//:套接字已关闭

WSL: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: Socket closed

我无法使用 celery 和 WSL 打开套接字。

查看以下信息:

software -> celery:3.1.26.post2 (Cipater) kombu:3.0.37 py:3.6.7
            billiard:3.3.0.23 py-amqp:1.4.9
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:disabled
BROKER_URL: 'amqp://guest:********@localhost:5672//'

我正在使用 pipenv。文件:

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
django = "*"
django-allauth = "*"
django-crispy-forms = "*"
django-debug-toolbar = "==1.10."
numpy = "==1.15.3"
colorama = "==0.4.0"
dateparser = "==0.7.0"
django-extensions = "*"
python-binance = "*"
misaka = "*"
django-celery = "*"
celery = "*"

[requires]
python_version = "3.6"

重现步骤

我在WSL:

  1. sudo apt-get install rabbitmq-server

  2. sudo service rabbitmq-server restart

  3. chmod -R 777 ./ ##否则我没有权限

其他信息

tasks.py:

from celery import Celery

# app = Celery('tasks', broker='amqp://jm-user1:sample@localhost/jm-vhost')
# app = Celery('tasks', broker='amqp://guest@localhost//')
app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

rabbitmqctl 状态:

[{pid,1716},
 {running_applications,
     [{rabbit,"RabbitMQ","3.6.10"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
      {ssl,"Erlang/OTP SSL application","8.2.3"},
      {public_key,"Public key infrastructure","1.5.2"},
      {asn1,"The Erlang ASN1 compiler version 5.0.4","5.0.4"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.6.10"},
      {xmerl,"XML parser","1.3.16"},
      {crypto,"CRYPTO","4.2"},
      {os_mon,"CPO  CXC 138 46","2.4.4"},
      {compiler,"ERTS  CXC 138 10","7.1.4"},
      {mnesia,"MNESIA  CXC 138 12","4.15.3"},
      {syntax_tools,"Syntax tools","2.1.4"},
      {sasl,"SASL  CXC 138 11","3.1.1"},
      {stdlib,"ERTS  CXC 138 10","3.4.3"},
      {kernel,"ERTS  CXC 138 10","5.4.1"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang/OTP 20 [erts-9.2] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:192] [kernel-poll:true]\n"},
 {memory,
     [{total,55943096},
      {connection_readers,0},
      {connection_writers,0},
      {connection_channels,0},
      {connection_other,0},
      {queue_procs,2744},
      {queue_slave_procs,0},
      {plugins,0},
      {other_proc,19080304},
      {mnesia,65712},
      {metrics,184888},
      {mgmt_db,0},
      {msg_index,42728},
      {other_ets,1769840},
      {binary,62120},
      {code,21390833},
      {atom,891849},
      {other_system,12634158}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,6791299072},
 {disk_free_limit,50000000},
 {disk_free,100481589248},
 {file_descriptors,
     [{total_limit,924},{total_used,2},{sockets_limit,829},{sockets_used,0}]},
 {processes,[{limit,1048576},{used,165}]},
 {run_queue,0},
 {uptime,4073},
 {kernel,{net_ticktime,60}}]

输出:

当 运行: celery -A tasks worker --loglevel=info 我得到以下输出:

 -------------- celery@Alvaro-Laptop v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Linux-4.4.0-17763-Microsoft-x86_64-with-Ubuntu-18.04-bionic
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fd7952bcf60
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2019-01-23 08:38:30,538: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: Socket closed.
Trying again in 2.00 seconds...

[2019-01-23 08:38:32,543: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: Socket closed.
Trying again in 4.00 seconds...

如何打开套接字以允许通信?

听起来你还没有安装和启动rabbitmq。我发现的最简单的方法是使用 docker,但如果您在 WSL 下使用 ubuntu,则可以使用 apt-get 在 WSL 中安装它,方法是遵循 these instructions.

我遇到了完全相同的问题,当我在 Medium 上关注 "Django, Scheduled Tasks & Queues (Part 2)" 文章时,我也有一个 运行 rabbitMQ 服务器,但是我的 CELERY_BROKER_URL 在设置中是 amqp://username:password@192.168.0.38//,事实证明这是不正确的。我使用这个 instructions 重新配置了我的 rabbitMQ 服务器。希望对你也有帮助)

我能够使用 Redis 而不是 Rabbitmq 配置所有内容:

sudo apt-get install redis-server
sudo service redis-server restart
pip install celery
chmod -R 777 ./

将文件 tasks.py:

放置在您想要执行 worker 的任何文件夹中
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
    return x + y

然后执行以下:

celery -A tasks worker --loglevel=info

套接字现已打开!

检查 rabbitmq 是否启动并且 运行。启用管理控制台:

sudo rabbitmq-plugins enable rabbitmq_management

然后使用 guest/guest 作为凭据访问 http://localhost:15672。查找概述页面>端口和上下文。

如果 AMQP 绑定到 IPv6 (::),则可能是问题所在。打开 rabbitmq 服务器配置:

sudo vi /etc/rabbitmq/rabbitmq-env.conf 

并注释掉

# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
NODE_IP_ADDRESS=127.0.0.1

然后重启服务:

sudo service rabbitmq-server restart

并再次检查与 RabbitMQ 的连接