Mutiny:处理多个 Unis 并使用来自 "outside" 的价值
Mutiny: Process multiple Unis and use value from "outside"
我有以下方法:
private boolean isAllProcessedForChannel(String channel) {
KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);
Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
if (offset != position) {
return false;
}
}
}
return true;
}
最后我需要得到一个 Uni,它指示是否有一个分区尚未处理。我目前的解决方案是 await
在 2 个职位上。我不知道如何“真正地”做出反应。
对于第一个 Uni 映射中的每个条目,我需要调用一个本身 return 是 Uni 的方法。然后我需要将条目(第一个 Uni)的值与第二个 Uni 的结果进行比较。最后,我需要检查所有比较结果是否为真,并且 return 这是一个单一的 Uni。
有没有人提示如何实现这一目标?还是这太复杂了,我应该坚持我的“同步”方式?
您可以先将 positions 条目转换为 Multi
,然后,对于每个条目,得到一个 Uni<Boolean>
,当 position 与 offset 不同时,它会发出 false
。最终你合并结果并只取第一个false
:
private Uni<Boolean> isAllProcessedForChannel(String channel) {
return consumer.getPositions()
.onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
.onItem().transformToUniAndMerge(entry -> {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
return consumer.committed(partition).onItem().transform(committed -> {
OffsetAndMetadata offsetAndMetadata = committed.get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
if (offset != position) {
return false;
}
}
return true;
});
})
.filter(Boolean.FALSE::equals)
.toUni();
}
我有以下方法:
private boolean isAllProcessedForChannel(String channel) {
KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);
Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
if (offset != position) {
return false;
}
}
}
return true;
}
最后我需要得到一个 Uni,它指示是否有一个分区尚未处理。我目前的解决方案是 await
在 2 个职位上。我不知道如何“真正地”做出反应。
对于第一个 Uni 映射中的每个条目,我需要调用一个本身 return 是 Uni 的方法。然后我需要将条目(第一个 Uni)的值与第二个 Uni 的结果进行比较。最后,我需要检查所有比较结果是否为真,并且 return 这是一个单一的 Uni。
有没有人提示如何实现这一目标?还是这太复杂了,我应该坚持我的“同步”方式?
您可以先将 positions 条目转换为 Multi
,然后,对于每个条目,得到一个 Uni<Boolean>
,当 position 与 offset 不同时,它会发出 false
。最终你合并结果并只取第一个false
:
private Uni<Boolean> isAllProcessedForChannel(String channel) {
return consumer.getPositions()
.onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
.onItem().transformToUniAndMerge(entry -> {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
return consumer.committed(partition).onItem().transform(committed -> {
OffsetAndMetadata offsetAndMetadata = committed.get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
if (offset != position) {
return false;
}
}
return true;
});
})
.filter(Boolean.FALSE::equals)
.toUni();
}