使用kafka检测值的变化
Use kafka to detect changes on values
我有一个流式应用程序,它不断接收坐标流以及一些还包含位串的自定义元数据。使用生产者 API 将此流生成到 kafka 主题上。现在另一个应用程序需要处理此流 [Streams API] 并存储位串中的特定位并在该位更改时生成警报
下面是需要处理的连续消息流
{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0
现在我想将这些警报写入另一个 kafka 主题,例如
{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}
我可以使用类似
的方式将当前位保存在状态存储中
streamsBuilder
.stream("my-topic")
.mapValues((key, value) -> value.getStatusBit())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));
但我无法理解如何从现有位中检测到此更改。我是否需要以某种方式在处理器拓扑结构中查询状态存储?如是?如何?如果不?还有什么可以做的?
我可以尝试的任何suggestions/ideas(可能与我的想法完全不同)也很感激。我是 Kafka 的新手,对事件驱动流的思考让我难以理解。
提前致谢。
我不确定这是最好的方法,但在类似的任务中,我使用了一个中间实体来捕获状态变化。在您的情况下,它将类似于
streamsBuilder.stream("my-topic").groupByKey()
.aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
public DeviceState apply(String key, Device newValue, DeviceState state) {
if(!newValue.getStatusBit().equals(state.getStatusBit())){
state.setChanged(true);
}
state.setStatusBit(newValue.getStatusBit());
state.setDeviceId(newValue.getDeviceId());
state.setKey(key);
return state;
}
}, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();
在生成的主题中,您将进行更改。也可以在DeviceState中添加一些属性先初始化,具体取决于是否发送事件,第一条设备记录何时到达等
我有一个流式应用程序,它不断接收坐标流以及一些还包含位串的自定义元数据。使用生产者 API 将此流生成到 kafka 主题上。现在另一个应用程序需要处理此流 [Streams API] 并存储位串中的特定位并在该位更改时生成警报
下面是需要处理的连续消息流
{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0
现在我想将这些警报写入另一个 kafka 主题,例如
{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}
我可以使用类似
的方式将当前位保存在状态存储中streamsBuilder
.stream("my-topic")
.mapValues((key, value) -> value.getStatusBit())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));
但我无法理解如何从现有位中检测到此更改。我是否需要以某种方式在处理器拓扑结构中查询状态存储?如是?如何?如果不?还有什么可以做的?
我可以尝试的任何suggestions/ideas(可能与我的想法完全不同)也很感激。我是 Kafka 的新手,对事件驱动流的思考让我难以理解。
提前致谢。
我不确定这是最好的方法,但在类似的任务中,我使用了一个中间实体来捕获状态变化。在您的情况下,它将类似于
streamsBuilder.stream("my-topic").groupByKey()
.aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
public DeviceState apply(String key, Device newValue, DeviceState state) {
if(!newValue.getStatusBit().equals(state.getStatusBit())){
state.setChanged(true);
}
state.setStatusBit(newValue.getStatusBit());
state.setDeviceId(newValue.getDeviceId());
state.setKey(key);
return state;
}
}, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();
在生成的主题中,您将进行更改。也可以在DeviceState中添加一些属性先初始化,具体取决于是否发送事件,第一条设备记录何时到达等