如何为kafka流使用同一主题的多个转换器?
How to use multiple transformers using the same topic for kafka streams?
我需要使用多个转换器在 kafka 上解析复杂的消息。每个转换器解析消息的一部分,并通过在消息上填充一些属性来编辑消息。最后,完全解析的消息使用 Kafka 消费者存储在数据库中。目前,我正在这样做:
streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type X
.filter(filterX)
\ transformer that edits the message and produces new Topic.E messages
.transform(ParseXandProduceE::new)
.to(Topic.A, someProducer)
streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type Y
.filter(filterY)
\ transformer that edits the message and produces new Topic.F messages
.transform(ParseYandProduceF::new)
.to(Topic.A, someProducer)
变形金刚看起来像:
class ParseXandProduceE implements Transformer<...> {
@Override
public KeyValue<String, Message> transform (String key, Message message) {
message.x = parse(message.rawX);
context.forward(newKey, message.x, Topic.E);
return KeyValue.pair(key, message);
}
}
然而,这很麻烦,相同的消息在这些流中流动多次。
此外,还有一个消费者将 topic.A
的消息存储在数据库中。消息当前存储多次,在每次转换之前和每次转换之后。每条消息需要存储一次。
以下可能有效,但似乎不利,因为过滤器+转换的每个块都可以干净地放在其自己的单独 class:
streamsBuilder.stream(Topic.A, someConsumer)
\ transformer that filters and edits the message and produces new Topic.E + Topic.F messages
.transform(someTransformer)
.to(Topic.B, someProducer)
并让持久性消费者收听 Topic.B
。
是后者提出的解决方案,还是有其他方法可以达到相同的结果?也许有源和汇的完整拓扑配置?如果是这样,这种情况会是什么样子?
使用单个变压器似乎是最简单的解决方案。因为您有两个独立的过滤器,所以如果您尝试链接各个运算符,程序将变得更加复杂。如果您知道每条消息只会通过一个过滤器,而不会同时通过两个过滤器,则可以使用 branch()
:
KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
subStream[0].transform(ParseXandProduceE::new)
.merge(subStream[1].transform(ParseYandProduceF::new)
.to(...)
请注意,上述解决方案仅在两个转换器都不需要转换任何消息时才有效(branch()
将每条消息放入第一个匹配谓词的分支,但绝不会放入多个分支)。因此,如果一条消息可以通过两个过滤器,您需要做一些更复杂的事情:
KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
KStream passedX = subStreams[0];
KStream transformedXE = passedX.transform(ParseXandProduceE::new);
// a message that passed filterX may also pass filterY,
// and thus we merge those message back to the "y-stream"
// (of course, those messages would already be transformed by `ParseXandProduceE`)
KStream passedY = subStream[1].merge(transformedXE.filter(filterY);
// the result contains all message that only pass filterX and got transformed,
// plus all messages that passed filterY (and maybe also filterX) and got transformed
KStream result = transformedXE.filterNot(filterY)
.merge(passedY.transform(ParseYandProduceF::new)
result.to(...)
我需要使用多个转换器在 kafka 上解析复杂的消息。每个转换器解析消息的一部分,并通过在消息上填充一些属性来编辑消息。最后,完全解析的消息使用 Kafka 消费者存储在数据库中。目前,我正在这样做:
streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type X
.filter(filterX)
\ transformer that edits the message and produces new Topic.E messages
.transform(ParseXandProduceE::new)
.to(Topic.A, someProducer)
streamsBuilder.stream(Topic.A, someConsumer)
\ filters messages that have unparsed parts of type Y
.filter(filterY)
\ transformer that edits the message and produces new Topic.F messages
.transform(ParseYandProduceF::new)
.to(Topic.A, someProducer)
变形金刚看起来像:
class ParseXandProduceE implements Transformer<...> {
@Override
public KeyValue<String, Message> transform (String key, Message message) {
message.x = parse(message.rawX);
context.forward(newKey, message.x, Topic.E);
return KeyValue.pair(key, message);
}
}
然而,这很麻烦,相同的消息在这些流中流动多次。
此外,还有一个消费者将 topic.A
的消息存储在数据库中。消息当前存储多次,在每次转换之前和每次转换之后。每条消息需要存储一次。
以下可能有效,但似乎不利,因为过滤器+转换的每个块都可以干净地放在其自己的单独 class:
streamsBuilder.stream(Topic.A, someConsumer)
\ transformer that filters and edits the message and produces new Topic.E + Topic.F messages
.transform(someTransformer)
.to(Topic.B, someProducer)
并让持久性消费者收听 Topic.B
。
是后者提出的解决方案,还是有其他方法可以达到相同的结果?也许有源和汇的完整拓扑配置?如果是这样,这种情况会是什么样子?
使用单个变压器似乎是最简单的解决方案。因为您有两个独立的过滤器,所以如果您尝试链接各个运算符,程序将变得更加复杂。如果您知道每条消息只会通过一个过滤器,而不会同时通过两个过滤器,则可以使用 branch()
:
KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
subStream[0].transform(ParseXandProduceE::new)
.merge(subStream[1].transform(ParseYandProduceF::new)
.to(...)
请注意,上述解决方案仅在两个转换器都不需要转换任何消息时才有效(branch()
将每条消息放入第一个匹配谓词的分支,但绝不会放入多个分支)。因此,如果一条消息可以通过两个过滤器,您需要做一些更复杂的事情:
KStream[] subStreams = stream.branch(new Predicates[]{filterX,filterY});
KStream passedX = subStreams[0];
KStream transformedXE = passedX.transform(ParseXandProduceE::new);
// a message that passed filterX may also pass filterY,
// and thus we merge those message back to the "y-stream"
// (of course, those messages would already be transformed by `ParseXandProduceE`)
KStream passedY = subStream[1].merge(transformedXE.filter(filterY);
// the result contains all message that only pass filterX and got transformed,
// plus all messages that passed filterY (and maybe also filterX) and got transformed
KStream result = transformedXE.filterNot(filterY)
.merge(passedY.transform(ParseYandProduceF::new)
result.to(...)