KeyValueStore.get() returns 不一致的结果
KeyValueStore.get() returns inconsistent results
stateStore.get()
returns 不一致的结果,从 transform()
到 KStream
使用时。它 returns 为 null,即使相应的键值已 put()
进入商店。
有人可以解释 KeyValueStore<>
的这种行为吗?
@Component
public class StreamProcessor {
@StreamListener
public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
KStream<String, JsonNode> joinedEvents = inputStream
.selectKey((key, value) -> computeKey(value))
.transform(
() -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
"join_store"
);
joinedEvents
.foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
}
private JsonNode join(JsonNode left, JsonNode right) {
((ObjectNode) left).set("right", right);
return left;
}
}
public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
private KeyValueStore<String, JsonNode> stateStore;
private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
private String storeName;
public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
this.storeName = storeName;
this.valueJoiner = valueJoiner;
}
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
JsonNode oldValue = stateStore.get(key);
if (oldValue != null) { //this condition rarely holds true
stateStore.delete(key);
System.out.format("%s,joined\n", key);
return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
}
stateStore.put(key, value);
return null;
}
}
似乎消息正在消失的原因(假设标点符号没有删除它们)是因为您使用了 KStream::selectKey(...),它更改了密钥,但没有重新分区
您可能会在错误的分区中查找密钥。
看下面的场景:
- 消息 1:
k1
、v1
(partition0
)
- 消息 2:
k2
、v2
(partition1
)
假设消息被放在不同的分区(因为key)
selectKey后:k1 -> k
,k2 -> k
- 消息 1:
k
、v1
- 消息 2:
k
、v2
操作selectKey
是无状态的,因此消息不会发送到下游(主题)并且不会发生重新分区。
对于第一条消息:值被放入商店(partition0)中的键 - k
当第二条消息到达时:对于 key - k 没有消息,因为它是不同的分区 (partition1)
stateStore.get()
returns 不一致的结果,从 transform()
到 KStream
使用时。它 returns 为 null,即使相应的键值已 put()
进入商店。
有人可以解释 KeyValueStore<>
的这种行为吗?
@Component
public class StreamProcessor {
@StreamListener
public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
KStream<String, JsonNode> joinedEvents = inputStream
.selectKey((key, value) -> computeKey(value))
.transform(
() -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
"join_store"
);
joinedEvents
.foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
}
private JsonNode join(JsonNode left, JsonNode right) {
((ObjectNode) left).set("right", right);
return left;
}
}
public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
private KeyValueStore<String, JsonNode> stateStore;
private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
private String storeName;
public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
this.storeName = storeName;
this.valueJoiner = valueJoiner;
}
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
JsonNode oldValue = stateStore.get(key);
if (oldValue != null) { //this condition rarely holds true
stateStore.delete(key);
System.out.format("%s,joined\n", key);
return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
}
stateStore.put(key, value);
return null;
}
}
似乎消息正在消失的原因(假设标点符号没有删除它们)是因为您使用了 KStream::selectKey(...),它更改了密钥,但没有重新分区 您可能会在错误的分区中查找密钥。
看下面的场景:
- 消息 1:
k1
、v1
(partition0
) - 消息 2:
k2
、v2
(partition1
)
假设消息被放在不同的分区(因为key)
selectKey后:k1 -> k
,k2 -> k
- 消息 1:
k
、v1
- 消息 2:
k
、v2
操作selectKey
是无状态的,因此消息不会发送到下游(主题)并且不会发生重新分区。
对于第一条消息:值被放入商店(partition0)中的键 - k
当第二条消息到达时:对于 key - k 没有消息,因为它是不同的分区 (partition1)