在一段时间内使用kafka-streams处理和检查事件
Process and check event using kafka-streams during some period
我有一个 KStream eventsStream,它从主题 "events" 获取数据。
有两种类型的事件,它们的键:
1. {user_id = X, event_id = 1} {..value, include time_event...}
2. {user_id = X, event_id = 2} {..value, include time_event...}
如果在 10 分钟内没有用户提供 event_id = 2
的事件,我需要将 event_id = 1
的事件迁移到主题 "results"。
例如,
1. 第一种情况:我们得到数据 {user_id = 100, event_id = 1} {.. time_event = xxxx ...}
并且在 10 分钟内 {user_id = 100, event_id = 2} {.. time_event = xxxx + 10 minutes...}
没有任何事件,所以我们将它写入 results-topic
2.第二种情况:我们得到数据{user_id = 100, event_id = 1} {.. time_event = xxxx ...}
和10分钟内的一个事件{user_id = 100, event_id = 2} {.. time_event = xxxx + 5 minutes...}
,所以我们不会把它写到results-topic
如何使用 kafka-streams 在 java 代码中实现这种行为?
我的代码:
public class 结果流 {
public static KafkaStreams newStream() {
Properties properties = Config.getProperties("ResultStream");
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, String>> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("inmemory"),
stringSerde,
stringSerde
);
builder.addStateStore(store);
KStream<String, String> resourceEventStream = builder.stream(EVENTS.topicName(), Consumed.with(stringSerde, stringSerde));
resourceEventStream.print(Printed.toSysOut());
resourceEventStream.process(() -> new CashProcessor("inmemory"), "inmemory");
resourceEventStream.process(() -> new FilterProcessor("inmemory", resourceEventStream), "inmemory");
Topology topology = builder.build();
return new KafkaStreams(topology, properties);
}
}
public class FilterProcessor 实现 Processor {
private ProcessorContext context;
private String eventStoreName;
private KeyValueStore<String, String> eventStore;
private KStream<String, String> stream;
public FilterProcessor(String eventStoreName, KStream<String, String> stream) {
this.eventStoreName = eventStoreName;
this.stream = stream;
}
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
eventStore = (KeyValueStore) processorContext.getStateStore(eventStoreName);
}
@Override
public void process(Object key, Object value) {
this.context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
System.out.println("Scheduler is working");
stream.filter((k, v) -> {
JsonObject events = new Gson().fromJson(k, JsonObject.class);
if (***condition***) {
return true;
}
return false;
}).to("results");
});
}
@Override
public void close() {
}
}
CashProcessor 的作用只是将事件放入本地存储,如果给定一个 event_id = 2 的同一用户,则用户删除 event_id = 1 的记录。
FilterProcess 应每分钟使用本地存储过滤事件。但我无法正确调用此处理(实际上我就是这样做的)...
我真的需要帮助。
为什么要将 KStream
传递给处理器?这不是 DSL 的工作方式。
当您已经通过 resourceEventStream.process()
“连接”您的处理器时,您的 FilterProcessor#process(key, value)
方法将自动为流中的每条记录调用——但是,KStream#process()
是一个终端操作,因此不允许您向下游发送任何数据。相反,您可能想要使用 transform()
(这与 process()
基本相同加上输出 KStream
)。
要在标点符号中实际向下游转发数据,您应该使用通过 init()
方法提供的 ProcessorContext
使用 context.forward()
。
我有一个 KStream eventsStream,它从主题 "events" 获取数据。
有两种类型的事件,它们的键:
1. {user_id = X, event_id = 1} {..value, include time_event...}
2. {user_id = X, event_id = 2} {..value, include time_event...}
如果在 10 分钟内没有用户提供 event_id = 2
的事件,我需要将 event_id = 1
的事件迁移到主题 "results"。
例如,
1. 第一种情况:我们得到数据 {user_id = 100, event_id = 1} {.. time_event = xxxx ...}
并且在 10 分钟内 {user_id = 100, event_id = 2} {.. time_event = xxxx + 10 minutes...}
没有任何事件,所以我们将它写入 results-topic
2.第二种情况:我们得到数据{user_id = 100, event_id = 1} {.. time_event = xxxx ...}
和10分钟内的一个事件{user_id = 100, event_id = 2} {.. time_event = xxxx + 5 minutes...}
,所以我们不会把它写到results-topic
如何使用 kafka-streams 在 java 代码中实现这种行为?
我的代码:
public class 结果流 {
public static KafkaStreams newStream() {
Properties properties = Config.getProperties("ResultStream");
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, String>> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("inmemory"),
stringSerde,
stringSerde
);
builder.addStateStore(store);
KStream<String, String> resourceEventStream = builder.stream(EVENTS.topicName(), Consumed.with(stringSerde, stringSerde));
resourceEventStream.print(Printed.toSysOut());
resourceEventStream.process(() -> new CashProcessor("inmemory"), "inmemory");
resourceEventStream.process(() -> new FilterProcessor("inmemory", resourceEventStream), "inmemory");
Topology topology = builder.build();
return new KafkaStreams(topology, properties);
}
}
public class FilterProcessor 实现 Processor {
private ProcessorContext context;
private String eventStoreName;
private KeyValueStore<String, String> eventStore;
private KStream<String, String> stream;
public FilterProcessor(String eventStoreName, KStream<String, String> stream) {
this.eventStoreName = eventStoreName;
this.stream = stream;
}
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
eventStore = (KeyValueStore) processorContext.getStateStore(eventStoreName);
}
@Override
public void process(Object key, Object value) {
this.context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
System.out.println("Scheduler is working");
stream.filter((k, v) -> {
JsonObject events = new Gson().fromJson(k, JsonObject.class);
if (***condition***) {
return true;
}
return false;
}).to("results");
});
}
@Override
public void close() {
}
}
CashProcessor 的作用只是将事件放入本地存储,如果给定一个 event_id = 2 的同一用户,则用户删除 event_id = 1 的记录。
FilterProcess 应每分钟使用本地存储过滤事件。但我无法正确调用此处理(实际上我就是这样做的)...
我真的需要帮助。
为什么要将 KStream
传递给处理器?这不是 DSL 的工作方式。
当您已经通过 resourceEventStream.process()
“连接”您的处理器时,您的 FilterProcessor#process(key, value)
方法将自动为流中的每条记录调用——但是,KStream#process()
是一个终端操作,因此不允许您向下游发送任何数据。相反,您可能想要使用 transform()
(这与 process()
基本相同加上输出 KStream
)。
要在标点符号中实际向下游转发数据,您应该使用通过 init()
方法提供的 ProcessorContext
使用 context.forward()
。