如何正确处理与多个生产者的 AMQP 连接和 api

How to properly handle with AMQP connections to and api with multiple producers

我正在开发一个 api,它使用 RabbitMQ 主题从事件架构与其他服务通信。 来自我的 API 的几条路线将发布事件,我希望在我的 API 中始终有一个实时连接。这样,在每个新请求中,我只创建一个新通道,并且只保留一个连接(在阅读了 amqp 0-9-2 连接的成本后,我决定这样做)。

现在我有这样的东西:

class Singleton:
    def __init__(self, target):
        self.target = target

    def __call__(self, *args, **kwargs) -> Any:
        try:
            return self._instance
        except AttributeError:
            self._instance = self.target(*args, **kwargs)
            return self._instance


@Singleton
class RabbitConnection(pika.BlockingConnection):
    def __init__(self):
        ssl_options = None

        if settings.RABBIT_SSL:
            context = ssl.create_default_context()
            ssl_options = pika.SSLOptions(context)

        credentials = pika.credentials.PlainCredentials(
            username=settings.RABBIT_USER,
            password=str(settings.RABBIT_PASSWORD),
        )
        parameters = pika.ConnectionParameters(
            host=settings.RABBIT_SERVER,
            port=settings.RABBIT_PORT,
            virtual_host="/",
            credentials=credentials,
            ssl_options=ssl_options,
            heartbeat=0
        )
        super().__init__(parameters=parameters)


class RabbitChannelProvider:
    _channel = None

    def __init__(self):
        self._connection = RabbitConnection()

    def __enter__(self) -> BlockingChannel:
        if not self._channel:
            self._channel = self._connection.channel()
            self._channel.exchange_declare(
                exchange=settings.RABBIT_EXCHANGE,
                exchange_type=ExchangeType.topic,
                passive=False,
                durable=True,
                auto_delete=False,
            )

        return self._channel

    def __exit__(self, exc_type, exc_value, tb) -> None:
        self._channel.close()
        self._channel = None


class MessagePublisher(SingletonCreateMixin, PublisherMessageBackend):
    id = "publisher_rabbitmq"

    def publish(self, routing_key: str, body: Any) -> None:
        try:
            message = build_message(body=body)
            logger.info(
                event="message_broker",
                event_type=LogEventType.SUCCESS,
                location=LogLocation.BACKEND,
                body=body,
                message="Sending message",
            )
            with RabbitChannelProvider() as channel:
                channel.basic_publish(
                    exchange=settings.RABBIT_EXCHANGE,
                    routing_key=routing_key,
                    body=message,
                    properties=pika.BasicProperties(
                        content_type="application/json"
                    ),
                )
        except Exception as err:
            logger.error(
                event="message_broker",
                event_type=LogEventType.ERROR,
                location=LogLocation.BACKEND,
                body=body,
                error=err,
            )
            raise MessageBrokerException(message=err)

这是在api进程中只保持一个连接的正确方法吗?我这样做对吗?

形成官方pika documentation

Is Pika thread safe?

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.

因此您的解决方案可以使用单线程