具有虚拟输入主题的消息生成的挂钟时间调度
Wall clock time scheduling for message generation with dummy input topic
我对我们现在使用的基于状态存储的消息生成调度模式有疑问:
我们正在根据使用 DSL 构建的“正常”处理流中的处理状态写入状态存储。我们将 transform()
与挂钟时间安排挂钩,以每 n 秒查看一次状态存储。
根据状态,我们 forward()
一条新消息下游进行处理(我们进行延迟重试的方式)。 transform()
需要一个以虚拟主题作为输入的输入流,永远不会看到数据。
没有虚拟输入主题可以完成吗?
其他人是怎么做到的?
Matthias 回答后的附加信息:
schedule()
用作基于状态存储中数据的定时查找的数据生成器。这用于重试副作用。
看来你想分享一些代码,用于常规处理有副作用和基于标点符号的处理没有副作用。否则,您不会对这两种情况都使用 MyTransformer
?
因此,我想知道您是否可以一次完成所有事情 Transformer
而不是三个。
MyTransformer<K,V,R> implement Transformer<K,V,R> {
public void init(ProcessorContext context) {
context.schedule(..., new MyPunctuator());
}
public R transform(K key, V value) {
// for every record from the source topic do everything
doSharedStuff();
doStuffWithSideEffect();
}
private doSharedStuff() {...}
private doStuffWithSideEffect() {...}
private class MyPunctuator implements Punctuator {
public void punctuate(long timestamp) {
for(KeyValue kv : ...) { // whatever k/v-pair to want to "forward"
// for every record you want to emit delayed, do only some part
doSharedStuff();
}
}
}
}
我对我们现在使用的基于状态存储的消息生成调度模式有疑问:
我们正在根据使用 DSL 构建的“正常”处理流中的处理状态写入状态存储。我们将 transform()
与挂钟时间安排挂钩,以每 n 秒查看一次状态存储。
根据状态,我们 forward()
一条新消息下游进行处理(我们进行延迟重试的方式)。 transform()
需要一个以虚拟主题作为输入的输入流,永远不会看到数据。
没有虚拟输入主题可以完成吗?
其他人是怎么做到的?
Matthias 回答后的附加信息:
schedule()
用作基于状态存储中数据的定时查找的数据生成器。这用于重试副作用。
看来你想分享一些代码,用于常规处理有副作用和基于标点符号的处理没有副作用。否则,您不会对这两种情况都使用 MyTransformer
?
因此,我想知道您是否可以一次完成所有事情 Transformer
而不是三个。
MyTransformer<K,V,R> implement Transformer<K,V,R> {
public void init(ProcessorContext context) {
context.schedule(..., new MyPunctuator());
}
public R transform(K key, V value) {
// for every record from the source topic do everything
doSharedStuff();
doStuffWithSideEffect();
}
private doSharedStuff() {...}
private doStuffWithSideEffect() {...}
private class MyPunctuator implements Punctuator {
public void punctuate(long timestamp) {
for(KeyValue kv : ...) { // whatever k/v-pair to want to "forward"
// for every record you want to emit delayed, do only some part
doSharedStuff();
}
}
}
}