Kafka 流处理器 API context.forward
Kafka Streams processor API context.forward
对于传入的记录,我需要验证值并根据结果 object 我需要将错误转发到不同的主题,如果验证成功,则使用 context.forward() 转发相同的内容。
可以使用此 link
中提供的 DSL 来完成
我没有找到在处理器 API 中执行此操作的明确方法。
ValidateProcessor.java
@Override
public void process(String key, String value) {
Object result = //validation logic
if(result.isSuccessful()) {
context().forward(key, value);
}else {
context.forward("error",Object)
}
}
现在调用者又需要检查并根据需要区分sink topic的key。
我正在使用 processorAPI,因为我需要使用 headers.
编辑:
branch(new predicate{
business logic
if(condition)
return true
else
return false;
当条件为假时如何推送到不同的流。当前正在创建另一个谓词,该谓词收集所有其他不满足链中上述谓词的记录。
有没有办法在同一个谓词中做?
当您指定 Topology
时,您为所有节点分配名称并连接它们:
Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z
如果处理器“X”连接到下游处理器“Y”和“Z”,您可以使用节点名称将记录发送到“Y”或“Z”。如果您不指定名称,记录将发送到 所有 下游(“子”)处理器。
// this is `process()` of "X"
public void process(String key, String value) {
context.forward(newKey, newValue); // send to both Y and Z
context.forward(newKey, newValue, To.child("Y")); // send it only to Y
context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}
对于传入的记录,我需要验证值并根据结果 object 我需要将错误转发到不同的主题,如果验证成功,则使用 context.forward() 转发相同的内容。 可以使用此 link
中提供的 DSL 来完成我没有找到在处理器 API 中执行此操作的明确方法。
ValidateProcessor.java
@Override
public void process(String key, String value) {
Object result = //validation logic
if(result.isSuccessful()) {
context().forward(key, value);
}else {
context.forward("error",Object)
}
}
现在调用者又需要检查并根据需要区分sink topic的key。 我正在使用 processorAPI,因为我需要使用 headers.
编辑:
branch(new predicate{
business logic
if(condition)
return true
else
return false;
当条件为假时如何推送到不同的流。当前正在创建另一个谓词,该谓词收集所有其他不满足链中上述谓词的记录。 有没有办法在同一个谓词中做?
当您指定 Topology
时,您为所有节点分配名称并连接它们:
Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z
如果处理器“X”连接到下游处理器“Y”和“Z”,您可以使用节点名称将记录发送到“Y”或“Z”。如果您不指定名称,记录将发送到 所有 下游(“子”)处理器。
// this is `process()` of "X"
public void process(String key, String value) {
context.forward(newKey, newValue); // send to both Y and Z
context.forward(newKey, newValue, To.child("Y")); // send it only to Y
context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}