kafka KStream - 进行 n 秒计数的拓扑
kafka KStream - topology to take n-second counts
我有一个 JSON 对象流,我在几个值的散列上键入这些对象。我希望以 n 秒(10?60?)间隔按键计数,并使用这些值进行一些模式分析。
我的拓扑:K->aggregateByKey(n seconds)->process()
在 process - init()
步骤中,我调用了 ProcessorContent.schedule(60 * 1000L)
,希望调用 .punctuate()
。从这里开始,我将遍历内部散列中的值并采取相应的行动。
我看到值来自聚合步骤并命中 process()
函数,但 .punctuate()
从未被调用。
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit() returns 空。
我想我可以用一个简单的计时器来完成 .punctuate()
的等效操作,但我想知道为什么这段代码没有按我希望的方式工作。
我认为这与kafka集群设置不当有关。将 文件描述符计数 更改为比默认值 (1024 -> 65535) 高得多的值后,这似乎符合规范。
我有一个 JSON 对象流,我在几个值的散列上键入这些对象。我希望以 n 秒(10?60?)间隔按键计数,并使用这些值进行一些模式分析。
我的拓扑:K->aggregateByKey(n seconds)->process()
在 process - init()
步骤中,我调用了 ProcessorContent.schedule(60 * 1000L)
,希望调用 .punctuate()
。从这里开始,我将遍历内部散列中的值并采取相应的行动。
我看到值来自聚合步骤并命中 process()
函数,但 .punctuate()
从未被调用。
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit() returns 空。
我想我可以用一个简单的计时器来完成 .punctuate()
的等效操作,但我想知道为什么这段代码没有按我希望的方式工作。
我认为这与kafka集群设置不当有关。将 文件描述符计数 更改为比默认值 (1024 -> 65535) 高得多的值后,这似乎符合规范。