Flink 处理中来自 Kinesis Shard 的记录顺序

Order of records from Kinesis Shard in Flink processing

我无法理解在使用 Flink 从 Kinesis 流中使用记录时如何保留事件的顺序。我们的设置如下所示:

在 Flink 中,我们使用 Table API 来使用 Kinesis 流,进行一些处理并将事件写入(自定义)同步 HTTP 接收器。期望的结果是每个分片处理子任务将事件一个接一个地写入接收器,等待接收器 return 后再写入下一个事件。为了测试这一点,我们让接收器函数在 returning 之前随机执行 Thread.sleep() 几秒钟。查看日志输出,我们现在可以看到:

13:00:06.120 c.s.d.a.p.p.f.sinks.HttpSinkFunction - BLOCKING 802719369 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}
13:00:06.476 c.s.d.a.p.p.f.sinks.HttpSinkFunction - 1973378384 {"userId":"6383449","eventTime":"2022-02-15T11:59:37.792Z","shardId":"shardId-000000000005"}

第一行来自一个阻塞接收器,第二行来自非阻塞接收器。这两个事件都来自同一个用户(=同一个分片,请参阅 JSON 对象中的 shardId)并且彼此之间的处理时间相差几毫秒,即使第一个接收器在写入日志行后会休眠 10 秒.这也意味着结果将乱序到达 HTTP 端点。

我已经研究了有关并行性和背压的 Flink 文档,但我仍然不确定如何实现所需的行为。是否可以一次将输出写入每个分片的一个接收器函数,以便在接收器响应缓慢时延迟分片的完整处理?

更新:有关设置的更多信息

首先,我们定义一个输入 table(使用 Kinesis 连接器)和一个输出 table(使用我们的自定义 http 连接器)。然后我们创建一个语句集,向其中添加几个 insert SQLs 并执行该语句集。代码看起来很像这样(extractionSql 是一个查询字符串列表,见下​​文):

StatementSet statementSet = tableEnv.createStatementSet();
for (String extractionSql : extractionSqls) {
    statementSet.addInsertSql(extractionSql);
}
statementSet.execute();

插入 SQL 看起来非常相似,基本上只是从输入事件中提取属性,还涉及一个 window 函数(翻滚 window)。示例 SQL 如下所示:

INSERT INTO output_table
SELECT userId, 'replace', eventTime, MAP['heroLevel',payload['newLevel']], shardId
FROM input_table
WHERE `eventType` = 'LEVELUP'

想法是,每当类型为 'LEVELUP' 的事件到达时,我们要向我们的 API 发送一个 http 请求。由于稍后的处理方式,我们需要确保单个用户的事件按顺序同步发送。

在 Flink 仪表板中,生成的图表如下所示:

根据您的要求,我能看到的唯一方法是将每个用户的所有结果放在一起,以便它们由相同的接收器实例写入。

也许可以将其重写为按时间戳排序的 user-id 上的一个大型连接(或并集)。或者您可以将 SQL 查询的结果转换为您通过 user-id 键入的数据流,然后在您的自定义接收器中实施一些缓冲和排序。