在 Processor Api 中,如果 addSink 函数中没有指定分区器,是否会应用 `DefaultStreamPartitioner`?
In Processor Api, will `DefaultStreamPartitioner` be applied when partitioner not specified in `addSink` function ?
当我使用处理器 API 编写应用程序时,如果我使用 addSink
将日志推送到下游主题,日志不会按它们的键进行分区。
一步步调试,DefaultStreamPartitioner
没有执行,源码中没有找到sink streamPartitioner的注释。
那么DefaultStreamPartitioner
在addSink
函数中没有指定partitioner时会应用吗?
即:
topology
.addSource("Source", inputTopic)
.addProcessor("LogToEvents", () -> new LogToEventProcessor(), "Source")
.addSink("Events-Sink", outputTopic, "LogToEvents");
在你的情况下 DefaultStreamPartitioner
将不会被执行,而是使用 DefaultPartitioner
partition(..)
方法。
如果未指定分区程序但存在键,则将使用键的哈希值选择分区(该逻辑由 DefaultPartitioner
实现)。如果键和分区都不存在,将以循环方式分配分区。
在 class KafkaProducer
你可以找到:
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
默认值为org.apache.kafka.clients.producer.internals.DefaultPartitioner
甚至 DefaultStreamPartitioner
class 在 DefaultPartitioner
实例上调用 partition(..)
方法,所以它是 StreamPartitioner
的基本实现,它只是代理调用 DefaultPartitioner
.
当我使用处理器 API 编写应用程序时,如果我使用 addSink
将日志推送到下游主题,日志不会按它们的键进行分区。
一步步调试,DefaultStreamPartitioner
没有执行,源码中没有找到sink streamPartitioner的注释。
那么DefaultStreamPartitioner
在addSink
函数中没有指定partitioner时会应用吗?
即:
topology
.addSource("Source", inputTopic)
.addProcessor("LogToEvents", () -> new LogToEventProcessor(), "Source")
.addSink("Events-Sink", outputTopic, "LogToEvents");
在你的情况下 DefaultStreamPartitioner
将不会被执行,而是使用 DefaultPartitioner
partition(..)
方法。
如果未指定分区程序但存在键,则将使用键的哈希值选择分区(该逻辑由 DefaultPartitioner
实现)。如果键和分区都不存在,将以循环方式分配分区。
在 class KafkaProducer
你可以找到:
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
默认值为org.apache.kafka.clients.producer.internals.DefaultPartitioner
甚至 DefaultStreamPartitioner
class 在 DefaultPartitioner
实例上调用 partition(..)
方法,所以它是 StreamPartitioner
的基本实现,它只是代理调用 DefaultPartitioner
.