KeyValueStore.get() returns 不一致的结果

KeyValueStore.get() returns inconsistent results

stateStore.get() returns 不一致的结果,从 transform()KStream 使用时。它 returns 为 null,即使相应的键值已 put() 进入商店。

有人可以解释 KeyValueStore<> 的这种行为吗?

@Component
public class StreamProcessor {

    @StreamListener
    public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
        KStream<String, JsonNode> joinedEvents = inputStream
           .selectKey((key, value) -> computeKey(value))
           .transform(
               () -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
               "join_store"
            );
        
        joinedEvents
               .foreach((key, value) -> System.out.format("%s,joined=%b\n",key, value.has("right")));
    }

    private JsonNode join(JsonNode left, JsonNode right) {
        ((ObjectNode) left).set("right", right);
        return left;
    }
}

public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
  private KeyValueStore<String, JsonNode> stateStore;
  private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
  private String storeName;

  public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
    this.storeName = storeName;
    this.valueJoiner = valueJoiner;
  }

  @Override
  public void init(ProcessorContext context) {
     this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
  }

  @Override
  public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
    JsonNode oldValue = stateStore.get(key);
    if (oldValue != null) { //this condition rarely holds true
        stateStore.delete(key);
        System.out.format("%s,joined\n", key);
        return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
    }
    stateStore.put(key, value);
    return null;
  }
}

似乎消息正在消失的原因(假设标点符号没有删除它们)是因为您使用了 KStream::selectKey(...),它更改了密钥,但没有重新分区 您可能会在错误的分区中查找密钥。

看下面的场景:

  • 消息 1:k1v1 (partition0)
  • 消息 2:k2v2 (partition1)

假设消息被放在不同的分区(因为key) selectKey后:k1 -> k,k2 -> k

  • 消息 1:kv1
  • 消息 2:kv2

操作selectKey 是无状态的,因此消息不会发送到下游(主题)并且不会发生重新分区。 对于第一条消息:值被放入商店(partition0)中的键 - k 当第二条消息到达时:对于 key - k 没有消息,因为它是不同的分区 (partition1)