Kafka Streams:一条记录到多条记录
Kafka Streams: one record to multiple records
给定:我在 Kafka 中有两个主题,假设是主题 A 和主题 B。Kafka Stream 从主题 A 中读取一条记录,对其进行处理并生成与消费记录相对应的多条记录(假设是 recordA 和 recordB)。现在,问题是如何使用 Kafka Streams 实现此目的。
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
这里读取的记录是Message;处理后 returns 一个消息列表。如何将此列表划分为两个生产者流?任何帮助将不胜感激。
你的密钥(类型)是什么?我猜它不是 String
。执行 mapValues
后,您将得到这个 - KStream<K,List<Message>>
。如果 K
不是 String
那么 someFunction()
可以是一个 map
它将把 K
转换成 String
(如果是,你已经有了结果)并保持 List<Message>
(值)不变,因为这是您预期的最终结果
我不确定我是否理解正确的问题,我也不理解@Abhishek 的回答:(
如果您有一个输入流,并且您希望每个输入记录获得零个、一个或多个输出记录,您可以应用 flatMap()
或 flatMapValues()
(取决于您是否想要是否修改密钥)。
你也在问“我怎样才能把这个列表分成两个生产者流?”如果你想把一个流分成多个,你可以使用 branch()
.
更多细节,我参考文档:
https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations
给定:我在 Kafka 中有两个主题,假设是主题 A 和主题 B。Kafka Stream 从主题 A 中读取一条记录,对其进行处理并生成与消费记录相对应的多条记录(假设是 recordA 和 recordB)。现在,问题是如何使用 Kafka Streams 实现此目的。
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
这里读取的记录是Message;处理后 returns 一个消息列表。如何将此列表划分为两个生产者流?任何帮助将不胜感激。
你的密钥(类型)是什么?我猜它不是 String
。执行 mapValues
后,您将得到这个 - KStream<K,List<Message>>
。如果 K
不是 String
那么 someFunction()
可以是一个 map
它将把 K
转换成 String
(如果是,你已经有了结果)并保持 List<Message>
(值)不变,因为这是您预期的最终结果
我不确定我是否理解正确的问题,我也不理解@Abhishek 的回答:(
如果您有一个输入流,并且您希望每个输入记录获得零个、一个或多个输出记录,您可以应用 flatMap()
或 flatMapValues()
(取决于您是否想要是否修改密钥)。
你也在问“我怎样才能把这个列表分成两个生产者流?”如果你想把一个流分成多个,你可以使用 branch()
.
更多细节,我参考文档: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations