Kafka Stream ProcessorContext 的 ProcessorRecordContext 为空
Kafka Stream ProcessorContext has null ProcessorRecordContext
我正在使用 Kafka Streams 2.3.0 订阅一个有 60 个分区的主题。我正在使用自定义处理器,我看到在内部 KafkaConsumer 轮询 return 之后多次调用 init,其中 ProcessorContext.ProcessorRecordContext 为空。
如示例中所指定,自定义进程存储此上下文。
然后当调用 process 方法时,我们正在使用此上下文,这会导致错误,因为记录上下文为空。在处理方法调用之前,正确的记录上下文由调用者 ProcessorContext 实现创建,并将调用转发给自定义处理器,但自定义处理器持有的上下文未设置。
init 方法传入未设置记录上下文的 ProcessContext 的原因可能是什么?
感谢您的指点。
这是我正在使用的部分代码。
// Topology snippet
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bservers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mygroupId");
Topology builder = new Topology();
builder.addSource(SOURCE_NAME, topic).addProcessor("FilterAndPublish", () -> _processor, SOURCE_NAME);
streams = new KafkaStreams(builder, props);
streams.start();
这是处理器片段
class CustProcessor extends AbstractProcessor<byte[], byte[]> {
private EventFilter _filter = new EventFilter();
@Autowired
private Publisher _publisher;
@Override
public void process(byte[] key, byte[] value) {
try {
long offset = context().offset();
Event evt = _filter.filter(key, value, offset);
if (evt != null) {
_publisher.index(evt);
}
} catch (Exception e) {
// TODO
}
}
注意同一代码适用于具有单个分区的主题。
问题在于您如何添加 Processor
:
addProcessor("FilterAndPublish", () -> _processor, SOURCE_NAME)
第二个参数是 Supplier
,供应商每次调用时都需要 return 一个新的 object/instance。但是,您的代码 return 每次调用都使用相同的对象,因此违反了 "supplier pattern" 的约定,这会导致您获得异常。
您需要将代码更改为:
addProcessor("FilterAndPublish", () -> new MyProcessor(), SOURCE_NAME)
我正在使用 Kafka Streams 2.3.0 订阅一个有 60 个分区的主题。我正在使用自定义处理器,我看到在内部 KafkaConsumer 轮询 return 之后多次调用 init,其中 ProcessorContext.ProcessorRecordContext 为空。
如示例中所指定,自定义进程存储此上下文。
然后当调用 process 方法时,我们正在使用此上下文,这会导致错误,因为记录上下文为空。在处理方法调用之前,正确的记录上下文由调用者 ProcessorContext 实现创建,并将调用转发给自定义处理器,但自定义处理器持有的上下文未设置。
init 方法传入未设置记录上下文的 ProcessContext 的原因可能是什么?
感谢您的指点。
这是我正在使用的部分代码。
// Topology snippet
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bservers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mygroupId");
Topology builder = new Topology();
builder.addSource(SOURCE_NAME, topic).addProcessor("FilterAndPublish", () -> _processor, SOURCE_NAME);
streams = new KafkaStreams(builder, props);
streams.start();
这是处理器片段
class CustProcessor extends AbstractProcessor<byte[], byte[]> {
private EventFilter _filter = new EventFilter();
@Autowired
private Publisher _publisher;
@Override
public void process(byte[] key, byte[] value) {
try {
long offset = context().offset();
Event evt = _filter.filter(key, value, offset);
if (evt != null) {
_publisher.index(evt);
}
} catch (Exception e) {
// TODO
}
}
注意同一代码适用于具有单个分区的主题。
问题在于您如何添加 Processor
:
addProcessor("FilterAndPublish", () -> _processor, SOURCE_NAME)
第二个参数是 Supplier
,供应商每次调用时都需要 return 一个新的 object/instance。但是,您的代码 return 每次调用都使用相同的对象,因此违反了 "supplier pattern" 的约定,这会导致您获得异常。
您需要将代码更改为:
addProcessor("FilterAndPublish", () -> new MyProcessor(), SOURCE_NAME)