kafka 流中的处理器节点
Processor node in kafka streams
我正在研究 processor node
中的 kafka 流。对于一个简单的代码,我像下面这样写只是为了过滤 UserID
,这是在 kafka 流中做 processor node
的正确方法吗?
但是,下面的代码无法编译,抛出错误:The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})
KStreamBuilder builder = new KStreamBuilder();
builder.stream(topic)
.filter(new Predicate <String, String>() {
//@Override
public boolean test(String key, String value) {
Hashtable<Object, Object> message;
// put you processor logic here
return message.get("UserID").equals("1");
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
有人可以指导我吗?
可能您正在使用另一个包中的 Predicate
class。您需要使用
import org.apache.kafka.streams.kstream.Predicate;
builder.stream(topic)
returns KStream<Object,Object>
类型,因为您没有指定泛型类型。 <Object,Object>
与 <String,String>
不兼容。
如果您知道实际类型是 KStream<String,String>
您可以按如下方式指定类型:
builder.<Sting,String>stream(topic)
.filter(...)
回答您有关 "processor nodes" 的问题:是的,添加 filter()
将在内部添加一个处理器节点。请注意,在 DSL 级别,您通常不需要考虑处理器。
如果您想明确使用处理器,可以使用处理器 API 而不是 DSL。查看 WordCount 示例:https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
请注意,使用 DSL,代码将在内部转换为处理器拓扑,这是 Kafka Streams 的运行时模型。
我正在研究 processor node
中的 kafka 流。对于一个简单的代码,我像下面这样写只是为了过滤 UserID
,这是在 kafka 流中做 processor node
的正确方法吗?
但是,下面的代码无法编译,抛出错误:The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})
KStreamBuilder builder = new KStreamBuilder();
builder.stream(topic)
.filter(new Predicate <String, String>() {
//@Override
public boolean test(String key, String value) {
Hashtable<Object, Object> message;
// put you processor logic here
return message.get("UserID").equals("1");
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
有人可以指导我吗?
可能您正在使用另一个包中的 Predicate
class。您需要使用
import org.apache.kafka.streams.kstream.Predicate;
builder.stream(topic)
returns KStream<Object,Object>
类型,因为您没有指定泛型类型。 <Object,Object>
与 <String,String>
不兼容。
如果您知道实际类型是 KStream<String,String>
您可以按如下方式指定类型:
builder.<Sting,String>stream(topic)
.filter(...)
回答您有关 "processor nodes" 的问题:是的,添加 filter()
将在内部添加一个处理器节点。请注意,在 DSL 级别,您通常不需要考虑处理器。
如果您想明确使用处理器,可以使用处理器 API 而不是 DSL。查看 WordCount 示例:https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
请注意,使用 DSL,代码将在内部转换为处理器拓扑,这是 Kafka Streams 的运行时模型。