有没有办法直接从处理器内部将数据发送到 Kafka 主题?

Is there a way to send data to a Kafka topic directly from within Processor?

我正在尝试借助 Kafka Streams 实现以下逻辑:

  1. 听取主题中的一些参考数据,例如。 ref-data-topic 并从中创建全局 StateStore

  2. 收听来自另一个主题 data-topic 的消息,这些消息必须根据参考数据进行验证并且 发送到 successerrors 主题.

这是示例伪代码:

class SomeProcessor implements Processor<String, String> {

    private KeyValueStore<String, String> refDataStore;

    @Override
    public void init(final ProcessorContext context) {
        refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
    }

    @Override
    public void process(String key String value) {
        Object refData = refDataStore.get("some_key");

        // business logic here

        if(ok) {
           sendValueToTopic("success");
        } else {
           sendValueToTopic("errors");
        }
    }
}

或者实现这种期望行为的规范方法是什么?

就像我现在想到的一个替代方案是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户处理例如validationStatus 在收到的消息中。

虽然,我真的很想有一个包含两个主题的解决方案,因为例如在这种情况下,我可以使用 Kafka Connect,link success topic 直接使用一些数据存储并处理 error topic 有点不同。同样,在只有一个主题的方法中,我不知道如何实现这个 "store_only_successfully_validated_entities" 用例。

有什么想法和建议吗?

如果您使用处理器 API,您可以按名称将数据转发到不同的处理器:

class SomeProcessor implements Processor<String, String> {

    private KeyValueStore<String, String> refDataStore;
    private ProcessorContext processorContext;

    @Override
    public void init(final ProcessorContext context) {
        refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
        processorContext = context;
    }

    @Override
    public void process(String key String value) {
        Object refData = refDataStore.get("some_key");

        // business logic here

        if(ok) {
           processorContext.forward(key, value, To.child("success"));
        } else {
           processorContext.forward(key, value, To.child("error"));
        }
    }
}

插入拓扑时,会添加两个接收器节点,名称为 "success""error",分别写入成功和错误主题。

或者您 forward 将数据发送到单个接收器节点并使用 TopicNameExtractor 添加接收器而不是硬编码的主题名称。 (需要 2.0 版。)

如果你使用DSL,你可以使用KStream#branch()拆分一个流,通过KStream#to(...)将不同的数据堆到不同的主题(或者你通过KStream#to(TopicNameExtractor)使用动态路由--要求版本 2.0)