如何从 Transformer 输出多条记录?
How to output multiple records from Transformer?
给定:具有 KStream::transform
的 DSL 拓扑。作为 Transformer::transform
执行的一部分,从输入消息 (KeyValue<String, Message>
) 生成多条消息。
我可能可以 return KeyValue<String, List<Message>>
来自 Transformer::transform
的对象,并将 flatMapValues
应用为拓扑中的下一个处理器以使列表变平。但是我想知道是否可以将 ProcessorContext::forward
用于同一目标,即
public KeyValue<String, Message> transform(String key, Message message) {
Iterable<Message> messages = generateMultipleFromOne(message);
messages.forEach(m->context.forward(key, m));
return null;
}
来自javadoc of Transformer.transform(K key, V value):
If more than one output record should be forwarded downstream,
ProcessorContext.forward(Object, Object)
and
ProcessorContext.forward(Object, Object, To)
can be used.
Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object)
and returning null.
给定:具有 KStream::transform
的 DSL 拓扑。作为 Transformer::transform
执行的一部分,从输入消息 (KeyValue<String, Message>
) 生成多条消息。
我可能可以 return KeyValue<String, List<Message>>
来自 Transformer::transform
的对象,并将 flatMapValues
应用为拓扑中的下一个处理器以使列表变平。但是我想知道是否可以将 ProcessorContext::forward
用于同一目标,即
public KeyValue<String, Message> transform(String key, Message message) {
Iterable<Message> messages = generateMultipleFromOne(message);
messages.forEach(m->context.forward(key, m));
return null;
}
来自javadoc of Transformer.transform(K key, V value):
If more than one output record should be forwarded downstream,
ProcessorContext.forward(Object, Object)
andProcessorContext.forward(Object, Object, To)
can be used.Note that returning a new KeyValue is merely for convenience. The same can be achieved by using
ProcessorContext.forward(Object, Object)
and returning null.