从多租户 Kafka 主题处理 Apache Beam 中的乱序事件窗口
Handling Out-Of-Order Event Windowing in Apache Beam from a Multitenant Kafka Topic
我一直在思考如何在 Beam 中解决给定的问题,并认为我应该向更多的观众寻求一些建议。目前一切似乎都在稀疏地工作,我很好奇是否有人可以提供一个共鸣板来查看这个工作流程是否有意义。
主要的高级目标是从 Kafka 中读取记录,这些记录可能是无序的,需要根据在记录中找到的另一个 属性 在事件时间 windowed,并最终发出那些 windows 的内容并将它们写到 GCS。
当前管道大致如下所示:
val partitionedEvents = pipeline
.apply("Read Events from Kafka",
KafkaIO
.read<String, Log>()
.withBootstrapServers(options.brokerUrl)
.withTopic(options.incomingEventsTopic)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializerAndCoder(
SpecificAvroDeserializer<Log>()::class.java,
AvroCoder.of(Log::class.java)
)
.withReadCommitted()
.commitOffsetsInFinalize()
// Set the watermark to use a specific field for event time
.withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
.withConsumerConfigUpdates(
ImmutableMap.of<String, Any?>(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
"schema.registry.url", options.schemaRegistryUrl
)
).withoutMetadata()
)
.apply("Logging Incoming Logs", ParDo.of(Events.log()))
.apply("Rekey Logs by Tenant", ParDo.of(Events.key()))
.apply("Partition Logs by Source",
// This is a custom function that will partition incoming records by a specific
// datasource field
Partition.of(dataSources.size, Events.partition<KV<String, Log>>(dataSources))
)
dataSources.forEach { dataSource ->
// Store a reference to the data source name to avoid serialization issues
val sourceName = dataSource.name
val tempDirectory = Directories.resolveTemporaryDirectory(options.output)
// Grab all of the events for this specific partition and apply the source-specific windowing
// strategies
partitionedEvents[dataSource.partition]
.apply(
"Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource)
)
.apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
.apply("Log Events After Windowing for $sourceName", ParDo.of(Events.logAfterWindowing()))
.apply(
"Writing Windowed Logs to Files for $sourceName",
FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
.withNumShards(1)
.by { row -> "${row.key}/${sourceName}" }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
.to(options.output)
.withNaming { partition -> Files.name(partition)}
.withTempDirectory(tempDirectory)
)
}
在更简单的项目符号形式中,它可能看起来像这样:
- 从单个 Kafka 主题读取记录
- 按租户键入所有记录
- 通过另一个事件正确分割流
- 遍历上一步中的已知分区
- 为每个分区应用自定义 windowing 规则(与数据源相关,自定义 window 规则)
- 按键(租户)windowed 项目
- 通过 FileIO 将租户密钥对分组写入 GCP
问题是传入的 Kafka 主题包含跨多个租户的乱序数据(例如,tenant1 的事件现在可能正在流式传输,但几分钟后您就会得到它们对于同一分区中的 tenant2,等等)。这会导致水印及时来回反弹,因为不能保证每条传入记录持续增加,这听起来像是个问题,但我不确定。显然,当数据流过时,有些文件根本就没有发出。
自定义 windowing 函数非常简单,旨在在允许的延迟和 windowing 持续时间结束后发出单个 window:
object SourceSpecificWindow {
fun <T> of(dataSource: DataSource): Window<T> {
return Window.into<T>(FixedWindows.of(dataSource.windowDuration()))
.triggering(Never.ever())
.withAllowedLateness(dataSource.allowedLateness(), Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes()
}
}
然而,这似乎不一致,因为我们会看到日志记录在 window 关闭后出现,但不一定是文件被写入 GCS。
这种方法有没有明显错误或不正确的地方?由于数据在来源中可能出现乱序(即现在、2 小时前、5 分钟后)并涵盖多个租户的数据,但目的是尝试确保保持最新状态的一个租户不会不要淹没过去可能来的租户。
我们是否可能需要另一个 Beam 应用程序或其他东西来将这个单一的事件流“拆分”成每个独立处理的子流(以便每个水印自己处理)?那是 SplittableDoFn
进来的地方吗?由于我在 SparkRunner
上 运行,这似乎不支持这一点 - 但它似乎是一个有效的用例。
任何建议将不胜感激,甚至只是另一双眼睛。我很乐意提供任何其他详细信息。
环境
- 目前运行对阵SparkRunner
虽然这可能不是最有帮助的回复,但我会坦诚告知最终结果。最终,这个特定用例所需的逻辑远远超出了 Apache Beam 中那些内置功能的范围,主要是在 windowing/governance 左右的时间范围内。
最终的解决方案是将首选的流技术从 Apache Beam 切换到 Apache Flink,正如您想象的那样,这是一个很大的飞跃。 Flink 以状态为中心的特性使我们能够更轻松地处理我们的用例,围绕窗口定义自定义驱逐标准(和排序),同时在其上失去一层抽象。
我一直在思考如何在 Beam 中解决给定的问题,并认为我应该向更多的观众寻求一些建议。目前一切似乎都在稀疏地工作,我很好奇是否有人可以提供一个共鸣板来查看这个工作流程是否有意义。
主要的高级目标是从 Kafka 中读取记录,这些记录可能是无序的,需要根据在记录中找到的另一个 属性 在事件时间 windowed,并最终发出那些 windows 的内容并将它们写到 GCS。
当前管道大致如下所示:
val partitionedEvents = pipeline
.apply("Read Events from Kafka",
KafkaIO
.read<String, Log>()
.withBootstrapServers(options.brokerUrl)
.withTopic(options.incomingEventsTopic)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializerAndCoder(
SpecificAvroDeserializer<Log>()::class.java,
AvroCoder.of(Log::class.java)
)
.withReadCommitted()
.commitOffsetsInFinalize()
// Set the watermark to use a specific field for event time
.withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
.withConsumerConfigUpdates(
ImmutableMap.of<String, Any?>(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
"schema.registry.url", options.schemaRegistryUrl
)
).withoutMetadata()
)
.apply("Logging Incoming Logs", ParDo.of(Events.log()))
.apply("Rekey Logs by Tenant", ParDo.of(Events.key()))
.apply("Partition Logs by Source",
// This is a custom function that will partition incoming records by a specific
// datasource field
Partition.of(dataSources.size, Events.partition<KV<String, Log>>(dataSources))
)
dataSources.forEach { dataSource ->
// Store a reference to the data source name to avoid serialization issues
val sourceName = dataSource.name
val tempDirectory = Directories.resolveTemporaryDirectory(options.output)
// Grab all of the events for this specific partition and apply the source-specific windowing
// strategies
partitionedEvents[dataSource.partition]
.apply(
"Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource)
)
.apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
.apply("Log Events After Windowing for $sourceName", ParDo.of(Events.logAfterWindowing()))
.apply(
"Writing Windowed Logs to Files for $sourceName",
FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
.withNumShards(1)
.by { row -> "${row.key}/${sourceName}" }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
.to(options.output)
.withNaming { partition -> Files.name(partition)}
.withTempDirectory(tempDirectory)
)
}
在更简单的项目符号形式中,它可能看起来像这样:
- 从单个 Kafka 主题读取记录
- 按租户键入所有记录
- 通过另一个事件正确分割流
- 遍历上一步中的已知分区
- 为每个分区应用自定义 windowing 规则(与数据源相关,自定义 window 规则)
- 按键(租户)windowed 项目
- 通过 FileIO 将租户密钥对分组写入 GCP
问题是传入的 Kafka 主题包含跨多个租户的乱序数据(例如,tenant1 的事件现在可能正在流式传输,但几分钟后您就会得到它们对于同一分区中的 tenant2,等等)。这会导致水印及时来回反弹,因为不能保证每条传入记录持续增加,这听起来像是个问题,但我不确定。显然,当数据流过时,有些文件根本就没有发出。
自定义 windowing 函数非常简单,旨在在允许的延迟和 windowing 持续时间结束后发出单个 window:
object SourceSpecificWindow {
fun <T> of(dataSource: DataSource): Window<T> {
return Window.into<T>(FixedWindows.of(dataSource.windowDuration()))
.triggering(Never.ever())
.withAllowedLateness(dataSource.allowedLateness(), Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes()
}
}
然而,这似乎不一致,因为我们会看到日志记录在 window 关闭后出现,但不一定是文件被写入 GCS。
这种方法有没有明显错误或不正确的地方?由于数据在来源中可能出现乱序(即现在、2 小时前、5 分钟后)并涵盖多个租户的数据,但目的是尝试确保保持最新状态的一个租户不会不要淹没过去可能来的租户。
我们是否可能需要另一个 Beam 应用程序或其他东西来将这个单一的事件流“拆分”成每个独立处理的子流(以便每个水印自己处理)?那是 SplittableDoFn
进来的地方吗?由于我在 SparkRunner
上 运行,这似乎不支持这一点 - 但它似乎是一个有效的用例。
任何建议将不胜感激,甚至只是另一双眼睛。我很乐意提供任何其他详细信息。
环境
- 目前运行对阵SparkRunner
虽然这可能不是最有帮助的回复,但我会坦诚告知最终结果。最终,这个特定用例所需的逻辑远远超出了 Apache Beam 中那些内置功能的范围,主要是在 windowing/governance 左右的时间范围内。
最终的解决方案是将首选的流技术从 Apache Beam 切换到 Apache Flink,正如您想象的那样,这是一个很大的飞跃。 Flink 以状态为中心的特性使我们能够更轻松地处理我们的用例,围绕窗口定义自定义驱逐标准(和排序),同时在其上失去一层抽象。