如何在 KTable 中动态查找?
How to lookup dynamically in a KTable?
我目前正在开发一个 Kafka Streams 应用程序,它使用我们数据库中的数据来丰富传入的事件。丰富数据存储在 Debezium 不断更新的主题中。
一些充实很容易实现,因为它们只是来自事件 ID 的 equi-join/left-join。
但其他丰富需要从传入事件时间戳计算一个值:
假设我的传入事件主题具有以下架构:
user_id: Long
timestamp: Instant
然后我需要将此事件映射到以下输出:
user_id: Long
has_planned_meetings_in_the_future: Boolean
会议table存储在一个单独的主题中,记录结构如下:
user_id: Long
meeting_date: Instant
因此,对于每个事件,如果它们是该特定用户的记录并且会议日期大于当前时间戳,我将需要在会议主题中查找。
怎么做?
一种可行的方法是在您的应用程序中使用会议主题并将会议存储在状态存储中。
然后您可以使用您描述的条件高效地查询状态存储。
这里有一个存储会议的简单例子:
public class MyMeetingsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Meeting value) {
meetings.put(key, value);
}
}
要在消费事件时查询状态存储,您可以这样做:
public class MyEventsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Event value) {
Meeting meeting = meetings.get(key);
if (meeting != null) {
// do something fun
}
}
}
我目前正在开发一个 Kafka Streams 应用程序,它使用我们数据库中的数据来丰富传入的事件。丰富数据存储在 Debezium 不断更新的主题中。 一些充实很容易实现,因为它们只是来自事件 ID 的 equi-join/left-join。 但其他丰富需要从传入事件时间戳计算一个值:
假设我的传入事件主题具有以下架构:
user_id: Long
timestamp: Instant
然后我需要将此事件映射到以下输出:
user_id: Long
has_planned_meetings_in_the_future: Boolean
会议table存储在一个单独的主题中,记录结构如下:
user_id: Long
meeting_date: Instant
因此,对于每个事件,如果它们是该特定用户的记录并且会议日期大于当前时间戳,我将需要在会议主题中查找。
怎么做?
一种可行的方法是在您的应用程序中使用会议主题并将会议存储在状态存储中。
然后您可以使用您描述的条件高效地查询状态存储。
这里有一个存储会议的简单例子:
public class MyMeetingsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Meeting value) {
meetings.put(key, value);
}
}
要在消费事件时查询状态存储,您可以这样做:
public class MyEventsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Event value) {
Meeting meeting = meetings.get(key);
if (meeting != null) {
// do something fun
}
}
}