当 AWS KCL processRecords 失败时,如何 "mark" 重新处理记录?

When AWS KCL processRecords is failed, how to "mark" that the records should be reprocessed?

我正在使用 AWS DynamoStream,他的 API 基于 AWS KCL。

在某些情况下,我收到了我未能处理的记录,并且我希望这些记录稍后可用,以便对它们进行重新处理。例如,我正在尝试将它们保存到远程数据库,有时我遇到网络问题。

我的问题是:

  1. 我能否以某种方式使用检查指针来表明我没有处理记录?
  2. 我应该避免执行 Checkpointer.checkpoint() 吗?下次调用processRecords时还用它会不会有影响?
  3. 我是否可以为此目的使用任何例外?

KCL 不提供这种内置的重新驱动机制 - 一旦 processRecords returns(无论它抛出异常还是 returned 成功),它认为这些记录已处理并移动上,即使它在内部失败了。

如果您想稍后重新处理一些记录,您需要捕获这些记录并将它们存储在其他地方以供稍后重新处理尝试(明显的警告是它们不会按照其余部分的顺序进行处理)流)。

最简单的解决方案是让您的记录处理器逻辑识别失败的记录(在 returning 到 KCL 之前)并将它们发送到 SQS 队列。然后,记录不会丢失,您可以在闲暇时处理它们(或由另一个使用 SQS 队列的进程处理,可能使用 DLQ 机制来处理重复的失败/放弃场景)。

回答您的具体问题:

  1. 没有。检查点只是说 "I've got this far, don't look at things before the checkpoint"
  2. 将检查点视为全局状态。一旦设置好,它就包含了之前的所有内容。您也不需要在每次调用 processRecords 时都检查点 - 您可以每 X 秒或每 Y 条记录等执行一次。
  3. 不是在 KCL 级别 - 您可以在内部使用特殊的异常类型,并在您 return 到 Kinesis 之前在 processRecords 的外部级别捕获它。或者您可以只捕获所有异常 - 这取决于您以及您希望重新驱动逻辑的具体程度。