Spring Kafka - 添加的 Store 无法从流进程访问
Spring Kafka - Added Store cannot access from stream process
我在使用 Spring Kafka 时遇到一个问题,即它无法从进程事件访问状态存储,我将该特定存储添加到 topology/streams。
方法一:
@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {
private static final Logger logger =
LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);
@Autowired
private StreamConfiguration configuration;
@Autowired
private TimeLineChangesCaptureService timeLineChangesCaptureService;
@Autowired
public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);
KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
Stores.persistentKeyValueStore("demo-store-2"),
stringSerde,
paymentChangedSpecificAvroSerde,
new SystemTime());
KStream<String, TimelineVersionUpdated> stream = builder.stream(
Topics.MOS_BUDGET_TIMELINE_VERSION,
Consumed.with(
stringSerde,
timelineVersionUpdatedSpecificAvroSerde
));
StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);
stream.process(new ProcessorSupplier<>() {
@Override
public Processor<String, TimelineVersionUpdated> get() {
return new Processor<>() {
private ProcessorContext context;
private KeyValueStore<String, PaymentChanged> store;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.store = context.getStateStore("demo-store-2");
}
@Override
public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
if (timelineVersionUpdated == null) {
return;
}
timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
}
@Override
public void close() {
}
};
}
});
Topology topology = builder.build();
logger.info("{}", topology.describe().toString());
}
当我 运行 上面的代码时,我得到以下异常:
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init[=12=](ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
... 7 common frames omitted
然后我尝试添加如下商店:
方法二:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);
return factoryBean -> {
try {
final StreamsBuilder streamsBuilder = factoryBean.getObject();
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store-demo-3"),
stringSerde,
paymentChangedSpecificAvroSerde
));
} catch (Exception e) {
logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
}
};
}
然后尝试从进程访问该存储,但最终得到相同的异常。
请帮助理解问题。
将状态存储添加到 Topology
只是第一步,但不会使其可用:为了允许 Processor
使用状态存储,您必须连接两者。
最简单的方法是在添加Processor
时传入state store名称:
stream.process(..., "storeName");
我在使用 Spring Kafka 时遇到一个问题,即它无法从进程事件访问状态存储,我将该特定存储添加到 topology/streams。
方法一:
@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {
private static final Logger logger =
LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);
@Autowired
private StreamConfiguration configuration;
@Autowired
private TimeLineChangesCaptureService timeLineChangesCaptureService;
@Autowired
public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);
KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
Stores.persistentKeyValueStore("demo-store-2"),
stringSerde,
paymentChangedSpecificAvroSerde,
new SystemTime());
KStream<String, TimelineVersionUpdated> stream = builder.stream(
Topics.MOS_BUDGET_TIMELINE_VERSION,
Consumed.with(
stringSerde,
timelineVersionUpdatedSpecificAvroSerde
));
StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);
stream.process(new ProcessorSupplier<>() {
@Override
public Processor<String, TimelineVersionUpdated> get() {
return new Processor<>() {
private ProcessorContext context;
private KeyValueStore<String, PaymentChanged> store;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.store = context.getStateStore("demo-store-2");
}
@Override
public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
if (timelineVersionUpdated == null) {
return;
}
timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
}
@Override
public void close() {
}
};
}
});
Topology topology = builder.build();
logger.info("{}", topology.describe().toString());
}
当我 运行 上面的代码时,我得到以下异常:
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init[=12=](ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
... 7 common frames omitted
然后我尝试添加如下商店: 方法二:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);
return factoryBean -> {
try {
final StreamsBuilder streamsBuilder = factoryBean.getObject();
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store-demo-3"),
stringSerde,
paymentChangedSpecificAvroSerde
));
} catch (Exception e) {
logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
}
};
}
然后尝试从进程访问该存储,但最终得到相同的异常。
请帮助理解问题。
将状态存储添加到 Topology
只是第一步,但不会使其可用:为了允许 Processor
使用状态存储,您必须连接两者。
最简单的方法是在添加Processor
时传入state store名称:
stream.process(..., "storeName");