Kafka 将单个日志事件行聚合为一个组合日志事件
Kafka aggregate single log event lines to a combined log event
我正在使用 Kafka 处理日志事件。对于简单的连接器和流转换,我具有 Kafka Connect 和 Kafka Streams 的基本知识。
现在我有一个具有以下结构的日志文件:
timestamp event_id event
一个日志事件有多个日志行,由 event_id 连接(例如邮件日志)
示例:
1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END
而且一般有多个事件:
示例:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
时间 window(开始和结束之间)最长可达 5 分钟。
因此我想要一个像
这样的话题
event_id combined_log
示例:
1 START,INFO1,INFO2,END
2 START,INFO2,END
实现这一目标的正确工具是什么?我试图用 Kafka Streams 解决它,但我能弄清楚如何..
在您的用例中,您实际上是在根据消息有效负载重建会话或事务。目前还没有对此类功能的内置、即用型支持。但是,您可以使用 Kafka 的 Streams API 的 Processor API 部分来自己实现此功能。您可以编写使用状态存储的自定义处理器来跟踪对于给定键,session/transaction 何时开始、添加和结束。
邮件列表中的一些用户一直在执行 IIRC,但我不知道可以向您指出的现有代码示例。
需要注意的是正确处理乱序数据。在上面的示例中,您按正确顺序列出了所有输入数据:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
但实际上,messages/records 可能会乱序到达,就像这样(我只显示带有键 1
的消息以简化示例):
1234 1 START
1237 1 END
1236 1 INFO2
1235 1 INFO1
即使发生这种情况,我知道在您的用例中您仍然希望将此数据解释为:START -> INFO1 -> INFO2 -> END
而不是 START -> END
(ignoring/dropping INFO1
和INFO2
= 数据丢失)或 START -> END -> INFO2 -> INFO1
(顺序不正确,可能也违反了您的语义约束)。
我正在使用 Kafka 处理日志事件。对于简单的连接器和流转换,我具有 Kafka Connect 和 Kafka Streams 的基本知识。
现在我有一个具有以下结构的日志文件:
timestamp event_id event
一个日志事件有多个日志行,由 event_id 连接(例如邮件日志)
示例:
1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END
而且一般有多个事件:
示例:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
时间 window(开始和结束之间)最长可达 5 分钟。
因此我想要一个像
这样的话题event_id combined_log
示例:
1 START,INFO1,INFO2,END
2 START,INFO2,END
实现这一目标的正确工具是什么?我试图用 Kafka Streams 解决它,但我能弄清楚如何..
在您的用例中,您实际上是在根据消息有效负载重建会话或事务。目前还没有对此类功能的内置、即用型支持。但是,您可以使用 Kafka 的 Streams API 的 Processor API 部分来自己实现此功能。您可以编写使用状态存储的自定义处理器来跟踪对于给定键,session/transaction 何时开始、添加和结束。
邮件列表中的一些用户一直在执行 IIRC,但我不知道可以向您指出的现有代码示例。
需要注意的是正确处理乱序数据。在上面的示例中,您按正确顺序列出了所有输入数据:
1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END
但实际上,messages/records 可能会乱序到达,就像这样(我只显示带有键 1
的消息以简化示例):
1234 1 START
1237 1 END
1236 1 INFO2
1235 1 INFO1
即使发生这种情况,我知道在您的用例中您仍然希望将此数据解释为:START -> INFO1 -> INFO2 -> END
而不是 START -> END
(ignoring/dropping INFO1
和INFO2
= 数据丢失)或 START -> END -> INFO2 -> INFO1
(顺序不正确,可能也违反了您的语义约束)。