从转换中的数据更新全局存储

Updating global store from data within transform

我目前有一个简单的拓扑:

    KStream<String, Event> eventsStream = builder.stream(sourceTopic);
    eventsStream.transformValues(processorSupplier, "nameCache")
        .to(destinationTopic);

我的事件有时有 key/value 对,而其他时候只有密钥。我希望能够为那些缺少值的事件添加值。我在本地状态存储中运行良好,但是当我添加更多任务时,有时 key/value 事件和值事件位于不同的线程中,因此它们没有正确更新。

我想为此使用全局状态存储,但我很难弄清楚当新的 key/value 对出现时如何更新全局存储。我创建了一个全局状态存储使用以下代码:

    builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
      @Override
      public Processor<String, String> get() {
        return new Processor<String, String>() {
          private ProcessorContext context;
          @Override
          public void init(final ProcessorContext processorContext) {
            this.context = processorContext;
          }

          @Override
          public void process(final String key, final String value) {
            context.forward(key, value);
          }

          @Override
          public void close() {
          }
        };
      }
    });

据我所知,它正在运行,但由于主题中没有数据,我不确定。

所以我的问题是如何从 transformValues 内部更新全局存储? store.put() 失败并出现全局存储为只读错误。

我找到了 ,但接受的答案只是说要更新基础主题,但我不知道该怎么做,因为该主题不在我的信息流中。

---已编辑---

我根据接受的答案中的 #1 更新了代码。我看到 global_store 中出现了新的 key/value 对。但是 globalStore 似乎没有看到新密钥。如果我重新启动应用程序,它会用主题中的数据填充缓存,但新键在我 stop/start 应用程序之后才可见。

我在全局存储处理器中向 process(String, String) 添加了日志记录,它显示了正在处理的新密钥。有什么想法吗?

  1. 您只能在 transformValues 中获得对 Global state store 的 real-only 访问权限,如果您想更新 global state store,是的,您必须将更新发送到 Global state store 的底层输入主题,并且您的状态将在使用此更新消息时更新该值。这背后的原因是,全局状态存储在所有应用程序实例上填充,并使用此输入主题进行容错。您可以通过分支拓扑来做到这一点:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
        .to(destinationTopic);

//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
        .to("global_store");
  1. 使用低级别 API 到 construct your Topology manually,因此您可以使用 ProcessorContext.forward 将消息转发到 destinationTopic 主题和 global_state 主题使用接收器处理器名称的接收器处理器节点。