如何一次将记录转发给多个 Kafka Stream 子处理器?
How to forward records to multiple Kafka Stream child Processors at once?
在 Kafka Stream API 中,是否可以一次将多个记录转发给不同的子处理器?例如,假设我们有一个名为 Processor-Parent 的父处理器和两个子处理器,Child-1,Child-2。
当 Processor-Parent 收到要处理的记录时,我想执行以下操作。
new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))
像这样转发记录是个好习惯吗?
这不是最佳做法。相反,创建具有一个父处理器和多个子处理器的拓扑。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(),SOURCE)
.addProcessor("child2", () -> new child2(),SOURCE);
通过这种方式,kafka 流确保到达源的每条消息都到达两个子处理器。
这取决于您的要求:
如果您的逻辑很直接,您甚至可以使用 Kafka Streams DSL。
如果 稍微 更复杂并且您需要处理器 API,但您希望将相同的记录传递给两个处理器,您可以这样做就像@Sameer Killamsetty 提到的那样。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
- 如果它更复杂并且取决于处理器中的某些逻辑,您可以将消息传递给不同的处理器节点。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("child1"));
context().forward(key, value, To.child("child2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("child2"));
}
}
}
在 Kafka Stream API 中,是否可以一次将多个记录转发给不同的子处理器?例如,假设我们有一个名为 Processor-Parent 的父处理器和两个子处理器,Child-1,Child-2。
当 Processor-Parent 收到要处理的记录时,我想执行以下操作。
new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))
像这样转发记录是个好习惯吗?
这不是最佳做法。相反,创建具有一个父处理器和多个子处理器的拓扑。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(),SOURCE)
.addProcessor("child2", () -> new child2(),SOURCE);
通过这种方式,kafka 流确保到达源的每条消息都到达两个子处理器。
这取决于您的要求:
如果您的逻辑很直接,您甚至可以使用 Kafka Streams DSL。
如果 稍微 更复杂并且您需要处理器 API,但您希望将相同的记录传递给两个处理器,您可以这样做就像@Sameer Killamsetty 提到的那样。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
- 如果它更复杂并且取决于处理器中的某些逻辑,您可以将消息传递给不同的处理器节点。
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("child1"));
context().forward(key, value, To.child("child2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("child2"));
}
}
}