Mutiny:处理多个 Unis 并使用来自 "outside" 的价值

Mutiny: Process multiple Unis and use value from "outside"

我有以下方法:

private boolean isAllProcessedForChannel(String channel) {
    KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);

    Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
    Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
    for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
      Long position = entry.getValue();
      TopicPartition partition = entry.getKey();

      Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
      OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
      if (offsetAndMetadata != null) {
        long offset = offsetAndMetadata.offset();
        log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
        if (offset != position) {
          return false;
        }
      }
    }

    return true;
}

最后我需要得到一个 Uni,它指示是否有一个分区尚未处理。我目前的解决方案是 await 在 2 个职位上。我不知道如何“真正地”做出反应。

对于第一个 Uni 映射中的每个条目,我需要调用一个本身 return 是 Uni 的方法。然后我需要将条目(第一个 Uni)的值与第二个 Uni 的结果进行比较。最后,我需要检查所有比较结果是否为真,并且 return 这是一个单一的 Uni。

有没有人提示如何实现这一目标?还是这太复杂了,我应该坚持我的“同步”方式?

您可以先将 positions 条目转换为 Multi,然后,对于每个条目,得到一个 Uni<Boolean>,当 position 与 offset 不同时,它会发出 false。最终你合并结果并只取第一个false

private Uni<Boolean> isAllProcessedForChannel(String channel) {
    return consumer.getPositions()
            .onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
            .onItem().transformToUniAndMerge(entry -> {
                Long position = entry.getValue();
                TopicPartition partition = entry.getKey();

                return consumer.committed(partition).onItem().transform(committed -> {
                    OffsetAndMetadata offsetAndMetadata = committed.get(partition);
                    if (offsetAndMetadata != null) {
                        long offset = offsetAndMetadata.offset();
                        if (offset != position) {
                            return false;
                        }
                    }
                    return true;
                });
            })
            .filter(Boolean.FALSE::equals)
            .toUni();
}