Python stomp.py 连接断开,监听器停止工作

Python stomp.py connection gets disconnected and listener stops working

我正在编写一个 python 脚本,使用 python stomp 库来连接和订阅 ActiveMQ 消息队列。

我的代码与文档“Dealing with disconnects”中的示例非常相似,只是添加了一个计时器,该计时器被放置在一个循环中以供长时间 运行ning 侦听器使用。

侦听器class 正在努力接收和处理消息。但是几分钟后,连接断开,然后侦听器停止接收消息。

问题:

正在调用 on_disconnected 方法,运行 是 connect_and_subscribe() 方法,但在这种情况发生后,侦听器似乎停止工作。也许监听器需要重新初始化?脚本再次 运行 后,侦听器被重新创建,它再次开始接收消息,但是定期再次 运行 保持脚本是不切实际的。

问题 1:如何将其设置为自动重新连接并重新创建侦听器?

问题 2: 是否有比 the timeout loop 更好的初始化长 运行ning 侦听器的方法?

import os, time, datetime, stomp

_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
sub_id = 1

def connect_and_subscribe(conn):
  conn.connect(_user, _password, wait=True)
  conn.subscribe(destination=_queue, id=sub_id, ack='client-individual')
  print('connect_and_subscribe connecting {} to with connection id {}'.format(_queue, sub_id), flush=True)

class MqListener(stomp.ConnectionListener):
  def __init__(self, conn):
    self.conn = conn
    self.sub_id = sub_id
    print('MqListener init')

  def on_error(self, frame):
    print('received an error "%s"' % frame.body)

  def on_message(self, headers, body):
    print('received a message headers "%s"' % headers)
    print('message body "%s"' % body)
    time.sleep(1)
    print('processed message')
    print('Acknowledging')
    self.conn.ack(headers['message-id'], self.sub_id)

  def on_disconnected(self):
    print('disconnected! reconnecting...')
    connect_and_subscribe(self.conn)

def initialize_mqlistener():
  conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
  conn.set_listener('', MqListener(conn))
  connect_and_subscribe(conn)
  # https://github.com/jasonrbriggs/stomp.py/issues/206
  while conn.is_connected():
    time.sleep(2)
  conn.disconnect()

if __name__ == '__main__':
  initialize_mqlistener()

我能够通过重构重试尝试循环和 on_error 处理程序来解决这个问题。另外,我已经在 docker 容器中安装并配置了 supervisor 到 运行 并管理监听器进程。这样,如果监听程序停止,它将由主管进程管理器自动重新启动。

已更新 python 踩踏侦听器脚本

init_listener.py

import os, json, time, datetime, stomp

_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
# The listener will listen for messages that are relevant to this specific worker
# Queue name must match the 'worker_type' in job tracker file
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
_sub_id = 1
_reconnect_attempts = 0
_max_attempts = 1000

def connect_and_subscribe(conn):
  global _reconnect_attempts
  _reconnect_attempts = _reconnect_attempts + 1
  if _reconnect_attempts <= _max_attempts:
    try:
      conn.connect(_user, _password, wait=True)
      print('connect_and_subscribe connecting {} to with connection id {} reconnect attempts: {}'.format(_queue, _sub_id, _reconnect_attempts), flush=True)
    except Exception as e:
      print('Exception on disconnect. reconnecting...')
      print(e)
      connect_and_subscribe(conn)
    else:
      conn.subscribe(destination=_queue, id=_sub_id, ack='client-individual')
      _reconnect_attempts = 0
  else:
    print('Maximum reconnect attempts reached for this connection. reconnect attempts: {}'.format(_reconnect_attempts), flush=True)

class MqListener(stomp.ConnectionListener):
  def __init__(self, conn):
    self.conn = conn
    self._sub_id = _sub_id
    print('MqListener init')

  def on_error(self, headers, body):
    print('received an error "%s"' % body)

  def on_message(self, headers, body):
    print('received a message headers "%s"' % headers)
    print('message body "%s"' % body)

    message_id = headers.get('message-id')
    message_data = json.loads(body)
    task_name = message_data.get('task_name')
    prev_status = message_data.get('previous_step_status')

    if prev_status == "success":
        print('CALLING DO TASK')
        resp = True
    else:
        print('CALLING REVERT TASK')
        resp = True
    if (resp):
        print('Ack message_id {}'.format(message_id))
        self.conn.ack(message_id, self._sub_id)
    else:
        print('NON Ack message_id {}'.format(message_id))
        self.conn.nack(message_id, self._sub_id)
    print('processed message message_id {}'.format(message_id))

  def on_disconnected(self):
    print('disconnected! reconnecting...')
    connect_and_subscribe(self.conn)

def initialize_mqlistener():
  conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
  conn.set_listener('', MqListener(conn))
  connect_and_subscribe(conn)
  # https://github.com/jasonrbriggs/stomp.py/issues/206
  while True:
    time.sleep(2)
    if not conn.is_connected():
      print('Disconnected in loop, reconnecting')
      connect_and_subscribe(conn)

if __name__ == '__main__':
  initialize_mqlistener()

Supervisor 安装与配置

Docker 文件

为简洁起见删除了一些细节

# Install supervisor
RUN apt-get update && apt-get install -y supervisor

# Add the supervisor configuration file
ADD supervisord.conf /etc/supervisor/conf.d/supervisord.conf

# Start supervisor with the configuration file
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

supervisor.conf

[supervisord]
nodaemon=true
logfile=/home/exampleuser/logs/supervisord.log

[program:mqutils]
command=python3 init_listener.py
directory=/home/exampleuser/mqutils
user=exampleuser
autostart=true
autorestart=true