通过 NESTJs 微服务问题处理兔子消息
Handling of rabbit messages via NESTJs microservice issue
我目前遇到无法解决的问题。这只是在极端情况下,当服务器由于某种原因离线时,消息会累积(100k 或更多),然后它们需要同时处理。尽管我计划永远不会发生这种情况,但我还是希望有一个备份计划,并在这个问题上有更多的控制权。
我是 运行 针对 RabbitMQ 代理的 NestJS 微服务,用于获取从 IOT 设备到达的消息并将它们插入 MySQL 数据库。
每条消息都有一些 conversion/translation 操作需要在插入之前完成。此转换基于针对同一 SQL 服务器上的 table 执行的单行查询。
顺序如下:
- 已读消息;
- select 来自数据库的 1 行(table 有几千行);
- 向数据库中插入 1 行;
现在,我遇到了这个错误:
(node:1129233) UnhandledPromiseRejectionWarning: SequelizeConnectionAcquireTimeoutError: Operation timeout
at ConnectionManager.getConnection (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/connection-manager.js:288:48)
at runNextTicks (internal/process/task_queues.js:60:5)
at listOnTimeout (internal/timers.js:526:9)
at processTimers (internal/timers.js:500:7)
at /home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/sequelize.js:613:26
at MySQLQueryInterface.select (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/query-interface.js:953:12)
at Function.findAll (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1752:21)
at Function.findOne (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1916:12)
node_modules/source-map-support/source-map-support.js:516
(node:1129233) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1349)
我认为 promise 拒绝在 sequelize 模块中。
这是我的续集配置:
useFactory: async (ConfigService: ConfigService) => ({
dialect: 'mysql',
host: 'someserver',
port: 3306,
username: 'dede',
password: 'dudu!',
database: 'dada',
autoLoadModels: true,
pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 },
synchronize: true,
logQueryParameters: true,
这是我的消息服务的一部分:
@RabbitRPC({
exchange: 'BACKEND_MAINEXCHANGE',
routingKey: 'Facility_DeviceReadings',
queue: 'Facility_DeviceReadings',
})
public async rpcHandlerDeviceReadings(mensagem: ReadingDevicesPerFacility) {
const schemavalid = mensagem;
this.mylogger.log( 'Received message from BACKEND_MAINEXCHANGE - listening to the queue Facility_DeviceReadings : ' +
' was registered locally on ' +
schemavalid.DateTimeRegistered,
MessagingService.name,
'rpcHandlerDeviceReadings',
);
if (schemavalid) {
try {
let finalschema = new CreateReadingDevicesDto();
if (element.Slot > 0) {
const result = this.readingTransService
.findOneByPlcId(element.deviceId, element.Slot)
.then((message) => {
if (!message) {
throw new NotFoundException('Message with ID not found');
} else {
finalschema.deviceId = message.deviceId;
finalschema.Slot = message.Slot2;
if (this.isNumeric(element.ReadingValue)) {
finalschema.ReadingValue = element.ReadingValue;
finalschema.DateTimeRegistered =
schemavalid.DateTimeRegistered;
this.readingDeviceService
.create(finalschema)
.then((message) => {
this.mylogger.debug(
'Saved',
MessagingService.name,
'rpcHandlerDeviceReadings',
);
return 42;
});
} else {
this.mylogger.error(
'error',
MessagingService.name,
'rpcHandlerDeviceReadings',
);
}
return message;
}
});
问题似乎是这个 RPC 在 SQL 之前一直在反对 rabbit 和 reading/consuming 消息(每毫秒 8 个)作为重放的机会,迫使 sequelize 进入它不能的状态处理并因此抛出上述错误。
我试过调整 sequelize 配置但没有好的结果。
有什么方法可以强制 RPC 在处理完前一条消息后只处理下一条消息?
如果有人能引导我朝着正确的方向前进,我会很高兴,因为这最终可能会成为一个重大问题。
提前感谢您提供的任何意见。
在我看来你的 Sequelize connection pool options 需要一些调整。
你有
pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 }
adquire
不是东西。也许 acquire
?半小时(180 万毫秒)是等待连接的非常长的时间。缩短它? acquire: 300000
给你五分钟。像您这样的大型生产应用程序可能应该始终保持一两个连接打开。将 min
增加到 1 或 2。
适度的最大连接数是好的,只要每个操作从池中获取一个连接、使用它并释放它。如果您的操作获取一个连接然后等待外部的东西,您将需要更多的连接。
如果可以让您的程序一次读取一大堆消息(至少 10 条),然后使用 bulkCreate() 将它们一次性放入您的数据库中,您将加快速度向上。很多。那是因为插入很便宜,但是这些插入之后的提交操作并不那么便宜。因此,在单个事务中执行多个插入,然后一次提交它们,可以使事情变得更快。阅读关于自动提交的更多信息。
编写您的服务以快速处理大量消息积压将不太可能出现您向我们展示的错误。
编辑 要使用.bulkCreate()
,您需要积累多条传入消息。尝试这样的事情。
创建一个包含您收到的 CreateReadingDevicesDto
消息的数组。 let incomingMessages = []
不是使用 .create()
在您完成接收和验证后将每条新消息放入您的数据库,而是将其放入您的数组。 incomingMessages.push(finalschema)
.
设置一个Javascript interval从数组中取出数据并用.bulkCreate()
将它放入你的数据库中。这将每 500 毫秒执行一次。
setInterval(
function (this) {
if (incomingMessages.length > 0) {
/* create all the items in the array */
this.readingDeviceService
.bulkCreate(incomingMessages)
/* empty out the array */
incomingMessages = []
}, 500, this);
以 0 到 500 毫秒的额外延迟为代价,这会分批处理您的消息并让您更快地处理积压。
我没有对此进行调试,它可能比您在生产代码中想要的要粗糙一些。但是我用过类似的技巧,效果不错。
我目前遇到无法解决的问题。这只是在极端情况下,当服务器由于某种原因离线时,消息会累积(100k 或更多),然后它们需要同时处理。尽管我计划永远不会发生这种情况,但我还是希望有一个备份计划,并在这个问题上有更多的控制权。 我是 运行 针对 RabbitMQ 代理的 NestJS 微服务,用于获取从 IOT 设备到达的消息并将它们插入 MySQL 数据库。 每条消息都有一些 conversion/translation 操作需要在插入之前完成。此转换基于针对同一 SQL 服务器上的 table 执行的单行查询。
顺序如下:
- 已读消息;
- select 来自数据库的 1 行(table 有几千行);
- 向数据库中插入 1 行;
现在,我遇到了这个错误:
(node:1129233) UnhandledPromiseRejectionWarning: SequelizeConnectionAcquireTimeoutError: Operation timeout
at ConnectionManager.getConnection (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/connection-manager.js:288:48)
at runNextTicks (internal/process/task_queues.js:60:5)
at listOnTimeout (internal/timers.js:526:9)
at processTimers (internal/timers.js:500:7)
at /home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/sequelize.js:613:26
at MySQLQueryInterface.select (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/dialects/abstract/query-interface.js:953:12)
at Function.findAll (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1752:21)
at Function.findOne (/home/nunovivas/NestJSProjects/integrador/node_modules/sequelize/lib/model.js:1916:12)
node_modules/source-map-support/source-map-support.js:516
(node:1129233) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1349)
我认为 promise 拒绝在 sequelize 模块中。
这是我的续集配置:
useFactory: async (ConfigService: ConfigService) => ({
dialect: 'mysql',
host: 'someserver',
port: 3306,
username: 'dede',
password: 'dudu!',
database: 'dada',
autoLoadModels: true,
pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 },
synchronize: true,
logQueryParameters: true,
这是我的消息服务的一部分:
@RabbitRPC({
exchange: 'BACKEND_MAINEXCHANGE',
routingKey: 'Facility_DeviceReadings',
queue: 'Facility_DeviceReadings',
})
public async rpcHandlerDeviceReadings(mensagem: ReadingDevicesPerFacility) {
const schemavalid = mensagem;
this.mylogger.log( 'Received message from BACKEND_MAINEXCHANGE - listening to the queue Facility_DeviceReadings : ' +
' was registered locally on ' +
schemavalid.DateTimeRegistered,
MessagingService.name,
'rpcHandlerDeviceReadings',
);
if (schemavalid) {
try {
let finalschema = new CreateReadingDevicesDto();
if (element.Slot > 0) {
const result = this.readingTransService
.findOneByPlcId(element.deviceId, element.Slot)
.then((message) => {
if (!message) {
throw new NotFoundException('Message with ID not found');
} else {
finalschema.deviceId = message.deviceId;
finalschema.Slot = message.Slot2;
if (this.isNumeric(element.ReadingValue)) {
finalschema.ReadingValue = element.ReadingValue;
finalschema.DateTimeRegistered =
schemavalid.DateTimeRegistered;
this.readingDeviceService
.create(finalschema)
.then((message) => {
this.mylogger.debug(
'Saved',
MessagingService.name,
'rpcHandlerDeviceReadings',
);
return 42;
});
} else {
this.mylogger.error(
'error',
MessagingService.name,
'rpcHandlerDeviceReadings',
);
}
return message;
}
});
问题似乎是这个 RPC 在 SQL 之前一直在反对 rabbit 和 reading/consuming 消息(每毫秒 8 个)作为重放的机会,迫使 sequelize 进入它不能的状态处理并因此抛出上述错误。 我试过调整 sequelize 配置但没有好的结果。 有什么方法可以强制 RPC 在处理完前一条消息后只处理下一条消息? 如果有人能引导我朝着正确的方向前进,我会很高兴,因为这最终可能会成为一个重大问题。
提前感谢您提供的任何意见。
在我看来你的 Sequelize connection pool options 需要一些调整。
你有
pool: { max: 5, min: 0, adquire: 1800000, idle: 5000 }
adquire
不是东西。也许 acquire
?半小时(180 万毫秒)是等待连接的非常长的时间。缩短它? acquire: 300000
给你五分钟。像您这样的大型生产应用程序可能应该始终保持一两个连接打开。将 min
增加到 1 或 2。
适度的最大连接数是好的,只要每个操作从池中获取一个连接、使用它并释放它。如果您的操作获取一个连接然后等待外部的东西,您将需要更多的连接。
如果可以让您的程序一次读取一大堆消息(至少 10 条),然后使用 bulkCreate() 将它们一次性放入您的数据库中,您将加快速度向上。很多。那是因为插入很便宜,但是这些插入之后的提交操作并不那么便宜。因此,在单个事务中执行多个插入,然后一次提交它们,可以使事情变得更快。阅读关于自动提交的更多信息。
编写您的服务以快速处理大量消息积压将不太可能出现您向我们展示的错误。
编辑 要使用.bulkCreate()
,您需要积累多条传入消息。尝试这样的事情。
创建一个包含您收到的
CreateReadingDevicesDto
消息的数组。let incomingMessages = []
不是使用
.create()
在您完成接收和验证后将每条新消息放入您的数据库,而是将其放入您的数组。incomingMessages.push(finalschema)
.设置一个Javascript interval从数组中取出数据并用
.bulkCreate()
将它放入你的数据库中。这将每 500 毫秒执行一次。setInterval( function (this) { if (incomingMessages.length > 0) { /* create all the items in the array */ this.readingDeviceService .bulkCreate(incomingMessages) /* empty out the array */ incomingMessages = [] }, 500, this);
以 0 到 500 毫秒的额外延迟为代价,这会分批处理您的消息并让您更快地处理积压。
我没有对此进行调试,它可能比您在生产代码中想要的要粗糙一些。但是我用过类似的技巧,效果不错。