kafka-streams:如果缺少单个元素,则重试完整的消息连接
kafka-streams: retry complete message join if single element is missing
我正在使用 kafka-streams DSL 通过 KStream
-KTable
左连接执行消息丰富。除了一个微妙的问题外,一切都很顺利。
在当前架构中,我们在主题(placements
、KStream)中接收到一些消息,这些消息需要使用压缩主题(descriptions
、KTable)中的数据进行丰富。消息类似于:
{
"order_id": 123456789,
"user_id": 987654,
"placed_at": "2020-07-20T11:31:00",
"amount": 5.79,
"items" : [
{"item_id": 13579, "quantity": 1, "price": 1.23},
{"item_id": 24680, "quantity": 1, "price": 4.56}
]
}
我目前的做法是从placements
获取传入消息,将其拆分为N条消息(N是items
数组的长度),在[=17上执行左连接=] 添加项目描述,然后将结果流分组到 order_id
(丰富的拆分消息的键)上以重建完整消息。
问题是描述可能会延迟几秒钟到达,因此在极少数情况下,我会收到一条重建消息,其中的项目少于原始未丰富的项目。
我在 custom join example 中看到了该方法。这对我来说非常好,但不幸的是它并不完全适合。事实上,就我而言,如果缺少单个项目的描述,则应该延迟完整的消息。目前我无法弄清楚如何在这种情况下进行。欢迎任何建议。
经过仔细分析custom join example,解决方案是稍微改变一下它的逻辑。
示例摘录如下:
private static final class StreamTableJoinStreamSideLogic
implements TransformerSupplier<String, Double, KeyValue<String, Pair<Double, Long>>> {
/* ... */
private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSide(final String key,
final Double value,
final long streamRecordTimestamp) {
final ValueAndTimestamp<Long> tableValue = tableStore.get(key);
if (tableValue != null &&
withinAcceptableBounds(Instant.ofEpochMilli(tableValue.timestamp()), Instant.ofEpochMilli(streamRecordTimestamp))) {
final KeyValue<String, Pair<Double, Long>> joinRecord = KeyValue.pair(key, new Pair<>(value, tableValue.value()));
LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
return joinRecord;
} else {
LOG.info("Table data unavailable for key {}, sending the join result as null", key);
return KeyValue.pair(key, new Pair<>(value, null));
}
}
/* ... */
}
特别是,需要修改方法 sendFullJoinRecordOrWaitForTableSide()
,以便以全或 none 的方式将相同的逻辑应用于 items
。
我正在使用 kafka-streams DSL 通过 KStream
-KTable
左连接执行消息丰富。除了一个微妙的问题外,一切都很顺利。
在当前架构中,我们在主题(placements
、KStream)中接收到一些消息,这些消息需要使用压缩主题(descriptions
、KTable)中的数据进行丰富。消息类似于:
{
"order_id": 123456789,
"user_id": 987654,
"placed_at": "2020-07-20T11:31:00",
"amount": 5.79,
"items" : [
{"item_id": 13579, "quantity": 1, "price": 1.23},
{"item_id": 24680, "quantity": 1, "price": 4.56}
]
}
我目前的做法是从placements
获取传入消息,将其拆分为N条消息(N是items
数组的长度),在[=17上执行左连接=] 添加项目描述,然后将结果流分组到 order_id
(丰富的拆分消息的键)上以重建完整消息。
问题是描述可能会延迟几秒钟到达,因此在极少数情况下,我会收到一条重建消息,其中的项目少于原始未丰富的项目。
我在 custom join example 中看到了该方法。这对我来说非常好,但不幸的是它并不完全适合。事实上,就我而言,如果缺少单个项目的描述,则应该延迟完整的消息。目前我无法弄清楚如何在这种情况下进行。欢迎任何建议。
经过仔细分析custom join example,解决方案是稍微改变一下它的逻辑。
示例摘录如下:
private static final class StreamTableJoinStreamSideLogic
implements TransformerSupplier<String, Double, KeyValue<String, Pair<Double, Long>>> {
/* ... */
private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSide(final String key,
final Double value,
final long streamRecordTimestamp) {
final ValueAndTimestamp<Long> tableValue = tableStore.get(key);
if (tableValue != null &&
withinAcceptableBounds(Instant.ofEpochMilli(tableValue.timestamp()), Instant.ofEpochMilli(streamRecordTimestamp))) {
final KeyValue<String, Pair<Double, Long>> joinRecord = KeyValue.pair(key, new Pair<>(value, tableValue.value()));
LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
return joinRecord;
} else {
LOG.info("Table data unavailable for key {}, sending the join result as null", key);
return KeyValue.pair(key, new Pair<>(value, null));
}
}
/* ... */
}
特别是,需要修改方法 sendFullJoinRecordOrWaitForTableSide()
,以便以全或 none 的方式将相同的逻辑应用于 items
。