Kafka Streams:混搭 PAPI 和 DSL KTable 不共同分区

Kafka Streams: mix-and-match PAPI and DSL KTable not co-partitioning

我有一个混搭的 Scala 拓扑结构,其中主要工作器是一个 PAPI 处理器,其他部分通过 DSL 连接。

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

整个主题的数据(包括原始 eventsTopic)通过一个分区,我们称之为 DoubleKey,它有两个字段。 访客通过接收器发送到 visitorsTopic

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

在 DSL 中,我针对这个主题创建了一个 KV KTable:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

我稍后连接到 EventProcessor

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

一切都是共同分区的(通过 DoubleKey)。 visitorSinkPartitioner 执行典型的模运算:

Math.abs(partitionKey.hashCode % numPartitions)

在 PAPI 处理器 EventsProcessor 中,我查询此 table 以查看是否已经存在访客。

然而,在我的测试中(使用 EmbeddedKafka,但这应该不会有什么不同),如果我 运行 它们只有一个分区,一切都很好(EventsProcessor 在两个事件上检查 KTable在同一个 DoubleKey 和第二个事件上 - 有一些延迟 - 它可以在商店中看到存在的 Visitor),但是如果我 运行 它具有更高的数字,EventProcessor 永远不会看到 Store 中的值。

但是,如果我通过 API(迭代 store.all())检查商店,记录就在那里。所以我知道它必须去不同的分区。

由于 KTable 应该处理其分区上的数据,并且所有内容都发送到同一个分区(使用调用相同代码的显式分区程序),KTable 应该在同一个分区上获取数据。

我的假设是否正确?可能发生了什么?

KafkaStreams 1.0.0,Scala 2.12.4。

PS。当然,在 PAPI 上执行 puts 通过 PAPI 而不是 StreamsBuilder.table() 创建商店是可行的,因为那肯定会使用与代码相同的分区运行s,但那是不可能的。

是的,假设是正确的。

万一对大家有帮助:

我在将 Partitioner 传递给 Scala EmbeddedKafka 库时遇到了问题。在其中一个测试套件中,它没有正确完成。 现在,按照重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。

def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
    EmbeddedKafkaConfig = {
    val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
        classOf[DoubleKeyPartitioner].getCanonicalName)
    EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
        customProducerProperties = producerProperties)
}