kafka-streams:如果缺少单个元素,则重试完整的消息连接

kafka-streams: retry complete message join if single element is missing

我正在使用 kafka-streams DSL 通过 KStream-KTable 左连接执行消息丰富。除了一个微妙的问题外,一切都很顺利。

在当前架构中,我们在主题(placements、KStream)中接收到一些消息,这些消息需要使用压缩主题(descriptions、KTable)中的数据进行丰富。消息类似于:

{
  "order_id": 123456789,
  "user_id": 987654,
  "placed_at": "2020-07-20T11:31:00",
  "amount": 5.79,
  "items" : [
     {"item_id": 13579, "quantity": 1, "price": 1.23},
     {"item_id": 24680, "quantity": 1, "price": 4.56}
  ]
}

我目前的做法是从placements获取传入消息,将其拆分为N条消息(N是items数组的长度),在[=17上执行左连接=] 添加项目描述,然后将结果流分组到 order_id(丰富的拆分消息的键)上以重建完整消息。

问题是描述可能会延迟几秒钟到达,因此在极少数情况下,我会收到一条重建消息,其中的项目少于原始未丰富的项目。

我在 custom join example 中看到了该方法。这对我来说非常好,但不幸的是它并不完全适合。事实上,就我而言,如果缺少单个项目的描述,则应该延迟完整的消息。目前我无法弄清楚如何在这种情况下进行。欢迎任何建议。

经过仔细分析custom join example,解决方案是稍微改变一下它的逻辑。

示例摘录如下:

private static final class StreamTableJoinStreamSideLogic
      implements TransformerSupplier<String, Double, KeyValue<String, Pair<Double, Long>>> {

/* ... */

private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSide(final String key,
                                                                                          final Double value,
                                                                                          final long streamRecordTimestamp) {
          final ValueAndTimestamp<Long> tableValue = tableStore.get(key);
          if (tableValue != null &&
              withinAcceptableBounds(Instant.ofEpochMilli(tableValue.timestamp()), Instant.ofEpochMilli(streamRecordTimestamp))) {
            final KeyValue<String, Pair<Double, Long>> joinRecord = KeyValue.pair(key, new Pair<>(value, tableValue.value()));
            LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
            return joinRecord;
          } else {
            LOG.info("Table data unavailable for key {}, sending the join result as null", key);
            return KeyValue.pair(key, new Pair<>(value, null));
          }
        }

/* ... */

}

特别是,需要修改方法 sendFullJoinRecordOrWaitForTableSide(),以便以全或 none 的方式将相同的逻辑应用于 items