从转换中的数据更新全局存储
Updating global store from data within transform
我目前有一个简单的拓扑:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
我的事件有时有 key/value 对,而其他时候只有密钥。我希望能够为那些缺少值的事件添加值。我在本地状态存储中运行良好,但是当我添加更多任务时,有时 key/value 事件和值事件位于不同的线程中,因此它们没有正确更新。
我想为此使用全局状态存储,但我很难弄清楚当新的 key/value 对出现时如何更新全局存储。我创建了一个全局状态存储使用以下代码:
builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(final String key, final String value) {
context.forward(key, value);
}
@Override
public void close() {
}
};
}
});
据我所知,它正在运行,但由于主题中没有数据,我不确定。
所以我的问题是如何从 transformValues 内部更新全局存储? store.put()
失败并出现全局存储为只读错误。
我找到了 ,但接受的答案只是说要更新基础主题,但我不知道该怎么做,因为该主题不在我的信息流中。
---已编辑---
我根据接受的答案中的 #1 更新了代码。我看到 global_store
中出现了新的 key/value 对。但是 globalStore 似乎没有看到新密钥。如果我重新启动应用程序,它会用主题中的数据填充缓存,但新键在我 stop/start 应用程序之后才可见。
我在全局存储处理器中向 process(String, String)
添加了日志记录,它显示了正在处理的新密钥。有什么想法吗?
- 您只能在 transformValues 中获得对 Global state store 的 real-only 访问权限,如果您想更新 global state store,是的,您必须将更新发送到 Global state store 的底层输入主题,并且您的状态将在使用此更新消息时更新该值。这背后的原因是,全局状态存储在所有应用程序实例上填充,并使用此输入主题进行容错。您可以通过分支拓扑来做到这一点:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
.to("global_store");
- 使用低级别 API 到 construct your Topology manually,因此您可以使用
ProcessorContext.forward
将消息转发到 destinationTopic
主题和 global_state
主题使用接收器处理器名称的接收器处理器节点。
我目前有一个简单的拓扑:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
我的事件有时有 key/value 对,而其他时候只有密钥。我希望能够为那些缺少值的事件添加值。我在本地状态存储中运行良好,但是当我添加更多任务时,有时 key/value 事件和值事件位于不同的线程中,因此它们没有正确更新。
我想为此使用全局状态存储,但我很难弄清楚当新的 key/value 对出现时如何更新全局存储。我创建了一个全局状态存储使用以下代码:
builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(final String key, final String value) {
context.forward(key, value);
}
@Override
public void close() {
}
};
}
});
据我所知,它正在运行,但由于主题中没有数据,我不确定。
所以我的问题是如何从 transformValues 内部更新全局存储? store.put()
失败并出现全局存储为只读错误。
我找到了
---已编辑---
我根据接受的答案中的 #1 更新了代码。我看到 global_store
中出现了新的 key/value 对。但是 globalStore 似乎没有看到新密钥。如果我重新启动应用程序,它会用主题中的数据填充缓存,但新键在我 stop/start 应用程序之后才可见。
我在全局存储处理器中向 process(String, String)
添加了日志记录,它显示了正在处理的新密钥。有什么想法吗?
- 您只能在 transformValues 中获得对 Global state store 的 real-only 访问权限,如果您想更新 global state store,是的,您必须将更新发送到 Global state store 的底层输入主题,并且您的状态将在使用此更新消息时更新该值。这背后的原因是,全局状态存储在所有应用程序实例上填充,并使用此输入主题进行容错。您可以通过分支拓扑来做到这一点:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
.to("global_store");
- 使用低级别 API 到 construct your Topology manually,因此您可以使用
ProcessorContext.forward
将消息转发到destinationTopic
主题和global_state
主题使用接收器处理器名称的接收器处理器节点。