如何在 rabbitmq(rascal.js) 上管理每个请求的发布连接

How to manage publish connection per request on rabbitmq(rascal.js)

我正在使用 Rascal.Js(它使用 amqplib)作为我在 node.js 应用程序上使用 rabbitMq 的消息传递逻辑。

我在项目启动时使用了类似于他们示例的东西,它创建了一个永久实例并“注册”了我的所有订阅者并在消息到达队列时(在后台)重定向消息。

我的问题出在出版商身上。有来自外部的 http 请求应该触发我的发布者。用户单击各种创建按钮,这会导致特定的操作流程。在某些时候它达到了我需要使用发布者的程度。

在这里我不确定正确的方法。每次需要发布消息都需要打开一个新的连接吗?结束后关闭它?或者,也许我应该以一种为所有发布者保持相同连接打开的方式来实现它? (我实际上不太确定如何以可以从我的应用程序的其他部分访问的方式创建它)。

目前我正在使用以下内容:

async publishMessage(publisherName, message) {
        const dynamicSettings = setupDynamicVariablesFromConfigFiles(minimalPublishSettings);
        const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(dynamicSettings.rascal));

        broker.on('error', async function(err) {
            loggerUtil.writeToLog('error', 'publishMessage() broker_error_event: ' + publisherName + err + err.stack);
            await broker.shutdown();
        })
   
        const publication = await broker.publish(publisherName, message);
        try {
            publication.on('error', async function(err) {
                loggerUtil.writeToLog('error', 'publishMessage() publish_error_event: ' + err + err.stack);
                await broker.shutdown();
            }).on("success", async (messageId) => {
                await broker.shutdown();
            }).on("return", async (message) => {
                loggerUtil.writeToLog('error', 'publishMessage() publish_return_event: ' + err + err.stack);
                await broker.shutdown();
            })
        }
        catch(err) {
            loggerUtil.writeToLog('error', 'Something went wrong ' + err + err.stack);
            await broker.shutdown();
        }

    }

当我需要发布消息时,我会在应用程序的不同部分使用此功能。 我想只为所有端点添加 broker.shutdown() 但在错误发生后的某个时候,我得到一个关于关闭已经关闭的连接的异常,这让我担心关闭方法(这可能不是一个好人)。我认为这与此有关- 我尝试这样做(注释代码)但我认为它在某些情况下效果不佳。如果一切正常,它就会“成功”,然后我就可以关闭它了。 但是有一次我遇到了错误而不是成功,当我尝试使用 broker.shutdown() 它给了我另一个使应用程序崩溃的异常。我认为这与此有关- https://github.com/squaremo/amqp.node/issues/111

我不确定处理此问题的最安全方法是什么?

编辑:

实际上,现在我想起来了,这个异常可能也与我试图关闭 catch{} 区域中的代理有关。我会继续调查。

Rascal 设计为在应用程序启动时启动一次,而不是根据 HTTP 请求创建。如果您以这种方式使用它,您的应用程序将非常慢,并且根据您需要处理的并发请求数量,很容易超过您可以与代理建立的最大连接数。此外,您将获得 none Rascal 提供的好处,例如失败的连接恢复。

如果您可以预先确定您需要发布到的队列或交换器,然后在应用程序启动时配置 Rascal(在您的 http 服务器之前),并在请求之间共享发布者。如果您在收到 http 请求之前无法确定队列或交换器,那么 Rascal 不是一个合适的选择。相反,您最好直接使用 amqplib,但仍应建立共享连接和通道。不过,您必须手动处理连接和通道错误,否则它们会使您的应用程序崩溃。