Spring Kafka - 添加的 Store 无法从流进程访问

Spring Kafka - Added Store cannot access from stream process

我在使用 Spring Kafka 时遇到一个问题,即它无法从进程事件访问状态存储,我将该特定存储添加到 topology/streams。

方法一:

@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {

    private static final Logger logger =
            LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);

    @Autowired
    private StreamConfiguration configuration;
    @Autowired
    private TimeLineChangesCaptureService timeLineChangesCaptureService;

    @Autowired
    public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
        final Serde<String> stringSerde = Serdes.String();
        final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
        timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);

        final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
        paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);

        KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
                Stores.persistentKeyValueStore("demo-store-2"),
                stringSerde,
                paymentChangedSpecificAvroSerde,
                new SystemTime());



        KStream<String, TimelineVersionUpdated> stream = builder.stream(
                Topics.MOS_BUDGET_TIMELINE_VERSION,
                Consumed.with(
                        stringSerde,
                        timelineVersionUpdatedSpecificAvroSerde
                ));

        StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);

        stream.process(new ProcessorSupplier<>() {
            @Override
            public Processor<String, TimelineVersionUpdated> get() {
                return new Processor<>() {

                    private ProcessorContext context;
                    private KeyValueStore<String, PaymentChanged> store;

                    @Override
                    public void init(ProcessorContext processorContext) {
                        this.context = processorContext;
                        this.store = context.getStateStore("demo-store-2");
                    }

                    @Override
                    public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
                        logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
                        if (timelineVersionUpdated == null) {
                            return;
                        }
                        timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
                    }

                    @Override
                    public void close() {
                    }
                };
            }
        });


        Topology topology = builder.build();
        logger.info("{}", topology.describe().toString());
    }

当我 运行 上面的代码时,我得到以下异常:

org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
    at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
    at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init[=12=](ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
    ... 7 common frames omitted

然后我尝试添加如下商店: 方法二:

    @Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() {
        final Serde<String> stringSerde = Serdes.String();
        final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
        paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);

        return factoryBean -> {
            try {
                final StreamsBuilder streamsBuilder = factoryBean.getObject();
                streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("store-demo-3"),
                        stringSerde,
                        paymentChangedSpecificAvroSerde
                ));
            } catch (Exception e) {
                logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
            }
        };
    }

然后尝试从进程访问该存储,但最终得到相同的异常。

请帮助理解问题。

将状态存储添加到 Topology 只是第一步,但不会使其可用:为了允许 Processor 使用状态存储,您必须连接两者。

最简单的方法是在添加Processor时传入state store名称:

stream.process(..., "storeName");