有没有办法直接从处理器内部将数据发送到 Kafka 主题?
Is there a way to send data to a Kafka topic directly from within Processor?
我正在尝试借助 Kafka Streams 实现以下逻辑:
听取主题中的一些参考数据,例如。 ref-data-topic
并从中创建全局 StateStore
。
收听来自另一个主题 data-topic
的消息,这些消息必须根据参考数据进行验证并且 发送到 success
或 errors
主题.
这是示例伪代码:
class SomeProcessor implements Processor<String, String> {
private KeyValueStore<String, String> refDataStore;
@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
}
@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");
// business logic here
if(ok) {
sendValueToTopic("success");
} else {
sendValueToTopic("errors");
}
}
}
或者实现这种期望行为的规范方法是什么?
就像我现在想到的一个替代方案是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户处理例如validationStatus
在收到的消息中。
虽然,我真的很想有一个包含两个主题的解决方案,因为例如在这种情况下,我可以使用 Kafka Connect,link success topic
直接使用一些数据存储并处理 error topic
有点不同。同样,在只有一个主题的方法中,我不知道如何实现这个 "store_only_successfully_validated_entities" 用例。
有什么想法和建议吗?
如果您使用处理器 API,您可以按名称将数据转发到不同的处理器:
class SomeProcessor implements Processor<String, String> {
private KeyValueStore<String, String> refDataStore;
private ProcessorContext processorContext;
@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
processorContext = context;
}
@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");
// business logic here
if(ok) {
processorContext.forward(key, value, To.child("success"));
} else {
processorContext.forward(key, value, To.child("error"));
}
}
}
插入拓扑时,会添加两个接收器节点,名称为 "success"
和 "error"
,分别写入成功和错误主题。
或者您 forward
将数据发送到单个接收器节点并使用 TopicNameExtractor
添加接收器而不是硬编码的主题名称。 (需要 2.0 版。)
如果你使用DSL,你可以使用KStream#branch()
拆分一个流,通过KStream#to(...)
将不同的数据堆到不同的主题(或者你通过KStream#to(TopicNameExtractor)
使用动态路由--要求版本 2.0)
我正在尝试借助 Kafka Streams 实现以下逻辑:
听取主题中的一些参考数据,例如。
ref-data-topic
并从中创建全局StateStore
。收听来自另一个主题
data-topic
的消息,这些消息必须根据参考数据进行验证并且 发送到success
或errors
主题.
这是示例伪代码:
class SomeProcessor implements Processor<String, String> {
private KeyValueStore<String, String> refDataStore;
@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
}
@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");
// business logic here
if(ok) {
sendValueToTopic("success");
} else {
sendValueToTopic("errors");
}
}
}
或者实现这种期望行为的规范方法是什么?
就像我现在想到的一个替代方案是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户处理例如validationStatus
在收到的消息中。
虽然,我真的很想有一个包含两个主题的解决方案,因为例如在这种情况下,我可以使用 Kafka Connect,link success topic
直接使用一些数据存储并处理 error topic
有点不同。同样,在只有一个主题的方法中,我不知道如何实现这个 "store_only_successfully_validated_entities" 用例。
有什么想法和建议吗?
如果您使用处理器 API,您可以按名称将数据转发到不同的处理器:
class SomeProcessor implements Processor<String, String> {
private KeyValueStore<String, String> refDataStore;
private ProcessorContext processorContext;
@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
processorContext = context;
}
@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");
// business logic here
if(ok) {
processorContext.forward(key, value, To.child("success"));
} else {
processorContext.forward(key, value, To.child("error"));
}
}
}
插入拓扑时,会添加两个接收器节点,名称为 "success"
和 "error"
,分别写入成功和错误主题。
或者您 forward
将数据发送到单个接收器节点并使用 TopicNameExtractor
添加接收器而不是硬编码的主题名称。 (需要 2.0 版。)
如果你使用DSL,你可以使用KStream#branch()
拆分一个流,通过KStream#to(...)
将不同的数据堆到不同的主题(或者你通过KStream#to(TopicNameExtractor)
使用动态路由--要求版本 2.0)