如何正确处理与多个生产者的 AMQP 连接和 api
How to properly handle with AMQP connections to and api with multiple producers
我正在开发一个 api,它使用 RabbitMQ 主题从事件架构与其他服务通信。
来自我的 API 的几条路线将发布事件,我希望在我的 API 中始终有一个实时连接。这样,在每个新请求中,我只创建一个新通道,并且只保留一个连接(在阅读了 amqp 0-9-2 连接的成本后,我决定这样做)。
现在我有这样的东西:
class Singleton:
def __init__(self, target):
self.target = target
def __call__(self, *args, **kwargs) -> Any:
try:
return self._instance
except AttributeError:
self._instance = self.target(*args, **kwargs)
return self._instance
@Singleton
class RabbitConnection(pika.BlockingConnection):
def __init__(self):
ssl_options = None
if settings.RABBIT_SSL:
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context)
credentials = pika.credentials.PlainCredentials(
username=settings.RABBIT_USER,
password=str(settings.RABBIT_PASSWORD),
)
parameters = pika.ConnectionParameters(
host=settings.RABBIT_SERVER,
port=settings.RABBIT_PORT,
virtual_host="/",
credentials=credentials,
ssl_options=ssl_options,
heartbeat=0
)
super().__init__(parameters=parameters)
class RabbitChannelProvider:
_channel = None
def __init__(self):
self._connection = RabbitConnection()
def __enter__(self) -> BlockingChannel:
if not self._channel:
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=settings.RABBIT_EXCHANGE,
exchange_type=ExchangeType.topic,
passive=False,
durable=True,
auto_delete=False,
)
return self._channel
def __exit__(self, exc_type, exc_value, tb) -> None:
self._channel.close()
self._channel = None
class MessagePublisher(SingletonCreateMixin, PublisherMessageBackend):
id = "publisher_rabbitmq"
def publish(self, routing_key: str, body: Any) -> None:
try:
message = build_message(body=body)
logger.info(
event="message_broker",
event_type=LogEventType.SUCCESS,
location=LogLocation.BACKEND,
body=body,
message="Sending message",
)
with RabbitChannelProvider() as channel:
channel.basic_publish(
exchange=settings.RABBIT_EXCHANGE,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
content_type="application/json"
),
)
except Exception as err:
logger.error(
event="message_broker",
event_type=LogEventType.ERROR,
location=LogLocation.BACKEND,
body=body,
error=err,
)
raise MessageBrokerException(message=err)
这是在api进程中只保持一个连接的正确方法吗?我这样做对吗?
Is Pika thread safe?
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.
因此您的解决方案可以使用单线程
我正在开发一个 api,它使用 RabbitMQ 主题从事件架构与其他服务通信。 来自我的 API 的几条路线将发布事件,我希望在我的 API 中始终有一个实时连接。这样,在每个新请求中,我只创建一个新通道,并且只保留一个连接(在阅读了 amqp 0-9-2 连接的成本后,我决定这样做)。
现在我有这样的东西:
class Singleton:
def __init__(self, target):
self.target = target
def __call__(self, *args, **kwargs) -> Any:
try:
return self._instance
except AttributeError:
self._instance = self.target(*args, **kwargs)
return self._instance
@Singleton
class RabbitConnection(pika.BlockingConnection):
def __init__(self):
ssl_options = None
if settings.RABBIT_SSL:
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context)
credentials = pika.credentials.PlainCredentials(
username=settings.RABBIT_USER,
password=str(settings.RABBIT_PASSWORD),
)
parameters = pika.ConnectionParameters(
host=settings.RABBIT_SERVER,
port=settings.RABBIT_PORT,
virtual_host="/",
credentials=credentials,
ssl_options=ssl_options,
heartbeat=0
)
super().__init__(parameters=parameters)
class RabbitChannelProvider:
_channel = None
def __init__(self):
self._connection = RabbitConnection()
def __enter__(self) -> BlockingChannel:
if not self._channel:
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=settings.RABBIT_EXCHANGE,
exchange_type=ExchangeType.topic,
passive=False,
durable=True,
auto_delete=False,
)
return self._channel
def __exit__(self, exc_type, exc_value, tb) -> None:
self._channel.close()
self._channel = None
class MessagePublisher(SingletonCreateMixin, PublisherMessageBackend):
id = "publisher_rabbitmq"
def publish(self, routing_key: str, body: Any) -> None:
try:
message = build_message(body=body)
logger.info(
event="message_broker",
event_type=LogEventType.SUCCESS,
location=LogLocation.BACKEND,
body=body,
message="Sending message",
)
with RabbitChannelProvider() as channel:
channel.basic_publish(
exchange=settings.RABBIT_EXCHANGE,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
content_type="application/json"
),
)
except Exception as err:
logger.error(
event="message_broker",
event_type=LogEventType.ERROR,
location=LogLocation.BACKEND,
body=body,
error=err,
)
raise MessageBrokerException(message=err)
这是在api进程中只保持一个连接的正确方法吗?我这样做对吗?
Is Pika thread safe?
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.
因此您的解决方案可以使用单线程