Python3 鼠兔 channel.basic_consume() 导致 MySQL 连接过多

Python3 pika channel.basic_consume() causing MySQL too many connections

我曾使用 pika 连接到 RabbitMQ 并使用消息,一旦我在 ubuntu prod 环境中启动脚本,它就会按预期工作但正在打开 mysql 连接并且从不关闭它们并最终在 mysql 服务器上连接过多。

将不胜感激对下面代码的任何建议,以及不明白哪里出了问题。提前致谢。

流程如下

  1. Python3
  2. 开始鼠兔
  3. 订阅频道并等待消息
  4. 在回调中,我进行各种验证并在 MySql
  5. 中保存或更新数据
  6. 显示问题的结果是问题末尾来自 ubuntu htop 的屏幕截图,显示 MySql 上的新连接并继续将它们添加到顶部

鼠兔版本 = 0.13.0

对于MySql我使用pymysql.

鼠兔脚本

def main():
    credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])

    while True:
        try:
            cp = pika.ConnectionParameters(
                host=tunnel['queue']['host'],
                port=tunnel['queue']['port'],
                credentials=credentials,
                ssl=tunnel['queue']['ssl'],
                heartbeat=600,
                blocked_connection_timeout=300
            )

            connection = pika.BlockingConnection(cp)
            channel = connection.channel()

            def callback(ch, method, properties, body):
                if 'messageType' in properties.headers:
                    message_type = properties.headers['messageType']

                    if events.get(message_type):
                        result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
                        if result:
                            result = protobuf_to_dict(result)
                            model.write_response(external_response=result, message_type=message_type)
                    else:
                        app_log.warning('Message type not in allowed list = ' + str(message_type))
                        app_log.warning('continue listening...')

            channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
                connection.close()
                break
        except pika.connection.exceptions.ConnectionClosed as e:
            app_log.error('ConnectionClosed :: %s' % str(e))
            continue
        except pika.connection.exceptions.AMQPChannelError as e:
            app_log.error('AMQPChannelError :: %s' % str(e))
            continue
        except Exception as e:
            app_log.error('Connection was closed, retrying... %s' % str(e))
            continue


if __name__ == '__main__':
    main()

在脚本中我有一个模型在数据库中进行插入或更新,代码如下

def write_response(self, external_response, message_type):
    table_name = events[message_type]['table_name']
    original_response = external_response[events[message_type]['response']]
    if isinstance(original_response, list):
        external_response = []
        for o in original_response:
            record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
            external_response.append(self.validate_fields(record))
    else:
        external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
        external_response = self.validate_fields(external_response)

    if not self.mysql.open:
        self.mysql.ping(reconnect=True)

    with self.mysql.cursor() as cursor:
        if isinstance(original_response, list):
            for e in external_response:
                id_name = events[message_type]['id_name']
                filters = {id_name: e[id_name]}
                self.event(
                    cursor=cursor,
                    table_name=table_name,
                    filters=filters,
                    external_response=e,
                    message_type=message_type,
                    event_id=e[id_name],
                    original_response=e  # not required here
                )
        else:
            id_name = events[message_type]['id_name']
            filters = {id_name: external_response[id_name]}
            self.event(
                cursor=cursor,
                table_name=table_name,
                filters=filters,
                external_response=external_response,
                message_type=message_type,
                event_id=external_response[id_name],
                original_response=original_response
            )
    cursor.close()
    self.mysql.close()

    return

在 ubuntu 我使用 systemd 来 运行 脚本并重新启动以防出现问题,下面是 systemd 文件

[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service

[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always

[Install]
WantedBy=multi-user.target

图片来自 ubuntu htop,MySql 如何不断添加列表并且从不关闭它

错误

tornado_mysql.err.OperationalError: (1040, 'Too many connections')

根据 this 的回答,如果你有 MySQL 5.7 和 5.8 :

It is worth knowing that if you run out of usable disc space on your server partition or drive, that this will also cause MySQL to return this error. If you're sure it's not the actual number of users connected then the next step is to check that you have free space on your MySQL server drive/partition.

来自同一个话题。您可以检查并增加 MySQL 连接数。

我找到了问题,发布如果对其他人有帮助。

问题是 mysqld 进入了无限循环,试图为特定数据库创建索引,在发现哪个数据库正在尝试创建索引但从未成功并且一次又一次地尝试之后。

解决方法是删除数据库并重新创建,mysqld 进程恢复正常。以及创建索引的无限循环也消失了。

我想说增加连接可能会温和地解决你的问题。

首先找出应用程序在完成任务后没有关闭连接的原因。

2nd 数据库上的任何慢 queries/calls 并修复它们(如果有)。

第三次考虑在数据库上 queries/calls 不慢,并且应用程序在立即完成任务后关闭 connection/thread,然后考虑在 mysql 上玩“wait_timeout”边.