如何获取 Kafka 流聚合任务的结果并将数据发送到另一个服务?
how to get result of Kafka streams aggregate task and send the data to another service?
我使用Kafka streams来处理实时数据,需要对一个窗口时间的数据做一些聚合操作
我有两个关于聚合操作的问题。
- 如何获取聚合数据?我需要将它发送到第三服务。
- 聚合操作后,我无法向第三个服务发送消息,代码没有运行。
这是我的代码:
stream = builder.stream("topic");
windowedKStream = stream.map(XXXXX).groupByKey().windowedBy("5mins");
ktable = windowedKStream.aggregate(()->"", new Aggregator(K,V,result));
// my data is stored in 'result' variable, but I can't get it at the end of the 5 mins window.
// I need to send the 'result' to a 3rd service. But I don't know where to temporarily store it and then how to get it.
// below is the code the call a 3rd service, but the code can't be executed(reachable).
// I think it should be executed every 5 mins when thewindows is over. But it isn't.
result = httpclient.execute('result');
我想可能想做这样的事情:
ktable.toStream().foreach((k,v) -> httpclient.execute(v));
每次更新KTable
(禁用缓存),更新记录将发送到下游,并执行foreach
,v
为当前聚合结果。
我使用Kafka streams来处理实时数据,需要对一个窗口时间的数据做一些聚合操作
我有两个关于聚合操作的问题。
- 如何获取聚合数据?我需要将它发送到第三服务。
- 聚合操作后,我无法向第三个服务发送消息,代码没有运行。
这是我的代码:
stream = builder.stream("topic");
windowedKStream = stream.map(XXXXX).groupByKey().windowedBy("5mins");
ktable = windowedKStream.aggregate(()->"", new Aggregator(K,V,result));
// my data is stored in 'result' variable, but I can't get it at the end of the 5 mins window.
// I need to send the 'result' to a 3rd service. But I don't know where to temporarily store it and then how to get it.
// below is the code the call a 3rd service, but the code can't be executed(reachable).
// I think it should be executed every 5 mins when thewindows is over. But it isn't.
result = httpclient.execute('result');
我想可能想做这样的事情:
ktable.toStream().foreach((k,v) -> httpclient.execute(v));
每次更新KTable
(禁用缓存),更新记录将发送到下游,并执行foreach
,v
为当前聚合结果。