Kafka Streams:混搭 PAPI 和 DSL KTable 不共同分区
Kafka Streams: mix-and-match PAPI and DSL KTable not co-partitioning
我有一个混搭的 Scala 拓扑结构,其中主要工作器是一个 PAPI 处理器,其他部分通过 DSL 连接。
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
整个主题的数据(包括原始 eventsTopic
)通过一个分区,我们称之为 DoubleKey
,它有两个字段。
访客通过接收器发送到 visitorsTopic
:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
在 DSL 中,我针对这个主题创建了一个 KV KTable:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
我稍后连接到 EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
一切都是共同分区的(通过 DoubleKey)。 visitorSinkPartitioner
执行典型的模运算:
Math.abs(partitionKey.hashCode % numPartitions)
在 PAPI 处理器 EventsProcessor 中,我查询此 table 以查看是否已经存在访客。
然而,在我的测试中(使用 EmbeddedKafka,但这应该不会有什么不同),如果我 运行 它们只有一个分区,一切都很好(EventsProcessor 在两个事件上检查 KTable在同一个 DoubleKey
和第二个事件上 - 有一些延迟 - 它可以在商店中看到存在的 Visitor
),但是如果我 运行 它具有更高的数字,EventProcessor 永远不会看到 Store 中的值。
但是,如果我通过 API(迭代 store.all()
)检查商店,记录就在那里。所以我知道它必须去不同的分区。
由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用调用相同代码的显式分区程序),KTable 应该在同一个分区上获取数据。
我的假设是否正确?可能发生了什么?
KafkaStreams 1.0.0,Scala 2.12.4。
PS。当然,在 PAPI 上执行 put
s 通过 PAPI 而不是 StreamsBuilder.table()
创建商店是可行的,因为那肯定会使用与代码相同的分区运行s,但那是不可能的。
是的,假设是正确的。
万一对大家有帮助:
我在将 Partitioner 传递给 Scala EmbeddedKafka 库时遇到了问题。在其中一个测试套件中,它没有正确完成。
现在,按照重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。
def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) :
EmbeddedKafkaConfig = {
val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
classOf[DoubleKeyPartitioner].getCanonicalName)
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort,
customProducerProperties = producerProperties)
}
我有一个混搭的 Scala 拓扑结构,其中主要工作器是一个 PAPI 处理器,其他部分通过 DSL 连接。
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
整个主题的数据(包括原始 eventsTopic
)通过一个分区,我们称之为 DoubleKey
,它有两个字段。
访客通过接收器发送到 visitorsTopic
:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
在 DSL 中,我针对这个主题创建了一个 KV KTable:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
我稍后连接到 EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
一切都是共同分区的(通过 DoubleKey)。 visitorSinkPartitioner
执行典型的模运算:
Math.abs(partitionKey.hashCode % numPartitions)
在 PAPI 处理器 EventsProcessor 中,我查询此 table 以查看是否已经存在访客。
然而,在我的测试中(使用 EmbeddedKafka,但这应该不会有什么不同),如果我 运行 它们只有一个分区,一切都很好(EventsProcessor 在两个事件上检查 KTable在同一个 DoubleKey
和第二个事件上 - 有一些延迟 - 它可以在商店中看到存在的 Visitor
),但是如果我 运行 它具有更高的数字,EventProcessor 永远不会看到 Store 中的值。
但是,如果我通过 API(迭代 store.all()
)检查商店,记录就在那里。所以我知道它必须去不同的分区。
由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用调用相同代码的显式分区程序),KTable 应该在同一个分区上获取数据。
我的假设是否正确?可能发生了什么?
KafkaStreams 1.0.0,Scala 2.12.4。
PS。当然,在 PAPI 上执行 put
s 通过 PAPI 而不是 StreamsBuilder.table()
创建商店是可行的,因为那肯定会使用与代码相同的分区运行s,但那是不可能的。
是的,假设是正确的。
万一对大家有帮助:
我在将 Partitioner 传递给 Scala EmbeddedKafka 库时遇到了问题。在其中一个测试套件中,它没有正确完成。 现在,按照重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。
def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) :
EmbeddedKafkaConfig = {
val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
classOf[DoubleKeyPartitioner].getCanonicalName)
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort,
customProducerProperties = producerProperties)
}