如何在将记录异步放入运动流时确保顺序?
How to ensure ordering while putting records in kinesis stream asynchronously?
我正在编写一个应用程序,它读取 MySQL bin 日志并将更改推送到 Kinesis 流中。我的用例需要 kinesis 流中的 mysql 事件的完美排序,我正在为其使用 putrecord 操作而不是 putrecords 并且还包括 'SequenceNumberForOrdering' 键。但是仍然存在一个失败点,即重试逻辑。作为一个 async 函数(使用 aws 的 js sdk),我如何确保在对 kinesis 的写入操作失败时的顺序。
阻塞写入(阻塞事件循环直到接收到放置记录的回调)是不是太糟糕的解决方案?或者有更好的方法吗?
如果你想要完美的排序,那么你需要确保在插入下一个事件之前插入每个事件,所以是的,你必须等到一个放置请求完成才能执行下一个。问题是您是否真的需要对所有事件进行完美排序,或者您是否需要在某些子集中进行完美排序?因为您使用的是关系数据库,所以您不太可能在同一 table 中的行之间建立关系。您更有可能在 table 之间的行之间建立关系,因此您可以使用一些技巧来利用批量放置请求。
批量放置请求的问题是它在请求中是无序的。因为 bin 日志为您提供了更改后行的完整图像,您实际上只关心 bin 日志中每个主键的最新条目,所以您可以做的是从bin log,要按时间排序,按主键分组,然后每个主键组的最新记录只从binlog记录中取after_values
像。然后,您可以安全地对这些记录中的每一个使用批量放置请求,并确保您不会意外地将给定键的陈旧记录放在该键的最新记录之前的流中。
这不足以满足所有情况,但在许多 CDC (https://en.wikipedia.org/wiki/Change_data_capture) 设置中,这足以将数据准确复制到其他系统中。
假设您的 bin 日志中有以下记录(格式取自 https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/):
{"table": "Users", "row": {"values": {"id": 1, "Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Foo User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User C", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
在此示例中,主键 id
标识了三行。插入id=1
行然后更新两次,插入id=2
行,插入id=3
行。您需要分别处理每种类型的事件(写入、更新、删除),并且只收集每个 id 的最新状态。因此,对于写入,您将为该行取 values
,对于更新,您将为该行取 after_values
,对于 deletes
,您将该行放入一个批处理中的删除。在此示例中,唯一重要的三个条目是:
{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User B", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
这是因为它们是每个 id 的最新版本。您可以对包含这三个写入的批次使用批量放置,而不必担心它们在大多数情况下会乱序,除非您在单个 table 或其他一些非常具体的要求中的条目之间存在相互依赖关系。
如果您有删除,只需将它们放在单独的批量删除中,在批量放置记录后执行。在过去,我通过执行这种压缩和批处理过程看到了非常好的吞吐量改进。但是同样,如果您实际上需要读取 every 事件,而不只是将最新数据复制到其他各种存储,那么这可能行不通。
我能够通过使用内部 FIFO 队列实现完美排序。我将每个事件推送到一个 FIFO 队列中,该队列由递归函数读取,该函数将事件推送到 Kinesis 流中(一次一个)。我还在每次成功的 putRecord 操作时将 bin 日志偏移量存储在外部存储器(在我的例子中是 redis)中,如果对 kinesis 的任何写入失败,我可以重新启动服务器并从上次成功的偏移量值开始再次读取。
我们将不胜感激对此解决方案或其他解决方案的任何建议。
这是我从 fifo 队列读取的递归函数的代码片段。
const fetchAndPutEvent = () => {
let currentEvent = eventQueue.shift(); // dequeue from the fifo queue
if (currentEvent) {
currentEvent = JSON.parse(currentEvent);
// put in the kinesis stream with sequence number of last putRecord operation to achieve ordering of events
return kinesis.putRecord(currentEvent, sequenceNumber, (err, result) => {
if (err) {
// in case of error while putting in kinesis stream kill the server and replay from the last successful offset
logger.fatal('Error in putting kinesis record', err);
return setTimeout(() => {
process.exit(0);
}, 10000);
}
try {
//store the binlog offset and kinesis sequence number in an external memory
sequenceNumber = result.SequenceNumber;
let offsetObject = {
binlogName: currentEvent.currentBinlogName,
binlogPos: currentEvent.currentBinlogPos,
sequenceNumber: sequenceNumber
};
redisClient.hmset(redisKey, offsetObject);
}
catch (ex) {
logger.fatal('Exception in putting kinesis record', ex);
setTimeout(function() {
process.exit(0);
}, 10000);
}
return setImmediate(function() {
return fetchAndPutEvent();
});
});
}
else {
// in case of empty queue just recursively call the function again
return setImmediate(function() {
return fetchAndPutEvent();
});
}
};
与其在向流中添加记录时尝试强制排序,不如在读取记录时对记录进行排序。在您的用例中,每个二进制日志条目都有一个唯一的文件序列、起始位置和结束位置。因此,订购它们并找出任何差距是微不足道的。
如果你在阅读时确实发现了空白,消费者将不得不等到他们被填满。但是,假设没有灾难性故障,流中的所有记录都应该彼此靠近,因此缓冲量应该是最小的。
通过在生产者端强制执行排序,您将整体吞吐量限制为写入单个记录的速度。如果您能跟上实际的数据库更改,那没关系。但是,如果您不能跟上,即使消费者可能负载很轻,管道中的延迟也会越来越大。
此外,您只能在单个分片内强制执行订单,因此如果您的生产者需要摄取超过 1 个 MB/second(或 > 1,000 records/second),您就不走运了(并且根据我的经验,达到 1,000 records/second 的唯一方法是通过 PutRecords
;如果您一次只写一条记录,您将获得大约 20-30 requests/second ).
我正在编写一个应用程序,它读取 MySQL bin 日志并将更改推送到 Kinesis 流中。我的用例需要 kinesis 流中的 mysql 事件的完美排序,我正在为其使用 putrecord 操作而不是 putrecords 并且还包括 'SequenceNumberForOrdering' 键。但是仍然存在一个失败点,即重试逻辑。作为一个 async 函数(使用 aws 的 js sdk),我如何确保在对 kinesis 的写入操作失败时的顺序。
阻塞写入(阻塞事件循环直到接收到放置记录的回调)是不是太糟糕的解决方案?或者有更好的方法吗?
如果你想要完美的排序,那么你需要确保在插入下一个事件之前插入每个事件,所以是的,你必须等到一个放置请求完成才能执行下一个。问题是您是否真的需要对所有事件进行完美排序,或者您是否需要在某些子集中进行完美排序?因为您使用的是关系数据库,所以您不太可能在同一 table 中的行之间建立关系。您更有可能在 table 之间的行之间建立关系,因此您可以使用一些技巧来利用批量放置请求。
批量放置请求的问题是它在请求中是无序的。因为 bin 日志为您提供了更改后行的完整图像,您实际上只关心 bin 日志中每个主键的最新条目,所以您可以做的是从bin log,要按时间排序,按主键分组,然后每个主键组的最新记录只从binlog记录中取after_values
像。然后,您可以安全地对这些记录中的每一个使用批量放置请求,并确保您不会意外地将给定键的陈旧记录放在该键的最新记录之前的流中。
这不足以满足所有情况,但在许多 CDC (https://en.wikipedia.org/wiki/Change_data_capture) 设置中,这足以将数据准确复制到其他系统中。
假设您的 bin 日志中有以下记录(格式取自 https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/):
{"table": "Users", "row": {"values": {"id": 1, "Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Foo User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User C", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
在此示例中,主键 id
标识了三行。插入id=1
行然后更新两次,插入id=2
行,插入id=3
行。您需要分别处理每种类型的事件(写入、更新、删除),并且只收集每个 id 的最新状态。因此,对于写入,您将为该行取 values
,对于更新,您将为该行取 after_values
,对于 deletes
,您将该行放入一个批处理中的删除。在此示例中,唯一重要的三个条目是:
{"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"id": 3, "Name": "User B", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
这是因为它们是每个 id 的最新版本。您可以对包含这三个写入的批次使用批量放置,而不必担心它们在大多数情况下会乱序,除非您在单个 table 或其他一些非常具体的要求中的条目之间存在相互依赖关系。
如果您有删除,只需将它们放在单独的批量删除中,在批量放置记录后执行。在过去,我通过执行这种压缩和批处理过程看到了非常好的吞吐量改进。但是同样,如果您实际上需要读取 every 事件,而不只是将最新数据复制到其他各种存储,那么这可能行不通。
我能够通过使用内部 FIFO 队列实现完美排序。我将每个事件推送到一个 FIFO 队列中,该队列由递归函数读取,该函数将事件推送到 Kinesis 流中(一次一个)。我还在每次成功的 putRecord 操作时将 bin 日志偏移量存储在外部存储器(在我的例子中是 redis)中,如果对 kinesis 的任何写入失败,我可以重新启动服务器并从上次成功的偏移量值开始再次读取。
我们将不胜感激对此解决方案或其他解决方案的任何建议。
这是我从 fifo 队列读取的递归函数的代码片段。
const fetchAndPutEvent = () => {
let currentEvent = eventQueue.shift(); // dequeue from the fifo queue
if (currentEvent) {
currentEvent = JSON.parse(currentEvent);
// put in the kinesis stream with sequence number of last putRecord operation to achieve ordering of events
return kinesis.putRecord(currentEvent, sequenceNumber, (err, result) => {
if (err) {
// in case of error while putting in kinesis stream kill the server and replay from the last successful offset
logger.fatal('Error in putting kinesis record', err);
return setTimeout(() => {
process.exit(0);
}, 10000);
}
try {
//store the binlog offset and kinesis sequence number in an external memory
sequenceNumber = result.SequenceNumber;
let offsetObject = {
binlogName: currentEvent.currentBinlogName,
binlogPos: currentEvent.currentBinlogPos,
sequenceNumber: sequenceNumber
};
redisClient.hmset(redisKey, offsetObject);
}
catch (ex) {
logger.fatal('Exception in putting kinesis record', ex);
setTimeout(function() {
process.exit(0);
}, 10000);
}
return setImmediate(function() {
return fetchAndPutEvent();
});
});
}
else {
// in case of empty queue just recursively call the function again
return setImmediate(function() {
return fetchAndPutEvent();
});
}
};
与其在向流中添加记录时尝试强制排序,不如在读取记录时对记录进行排序。在您的用例中,每个二进制日志条目都有一个唯一的文件序列、起始位置和结束位置。因此,订购它们并找出任何差距是微不足道的。
如果你在阅读时确实发现了空白,消费者将不得不等到他们被填满。但是,假设没有灾难性故障,流中的所有记录都应该彼此靠近,因此缓冲量应该是最小的。
通过在生产者端强制执行排序,您将整体吞吐量限制为写入单个记录的速度。如果您能跟上实际的数据库更改,那没关系。但是,如果您不能跟上,即使消费者可能负载很轻,管道中的延迟也会越来越大。
此外,您只能在单个分片内强制执行订单,因此如果您的生产者需要摄取超过 1 个 MB/second(或 > 1,000 records/second),您就不走运了(并且根据我的经验,达到 1,000 records/second 的唯一方法是通过 PutRecords
;如果您一次只写一条记录,您将获得大约 20-30 requests/second ).