触发后取消 Kafka Streams 上的标点符号
Cancel Punctuator on Kafka Streams after is triggered
我在转换器上创建了一个预定标点符号,并定期将其安排到 运行(使用 kafka v2.1.0)。每次我接受一个特定的密钥时,我都会像这样创建一个新密钥
scheduled = context.schedule (Duration.ofMillis(scheduleTime),
PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));
我的问题是,我不断创建所有这些标点符号 运行,但我找不到取消它们的方法。我在互联网上找到了一个片段可以使用
private Cancellable scheduled;
@Override
public void init(PorcessorContext processContext) {
this.context = processorContext;
scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
this::punctuateCancel);
}
private void punctuateCancel(long timestamp) {
scheduled.cancel();
}
但不幸的是,这似乎只取消了最新创建的标点符号。
我正在编辑我的 post 只是为了进一步了解我的方法以及这与 wardzinia 发表的评论有何关系。所以我的方法非常相似,只是使用一个 Map,因为每个事件键我只需要激活一个标点符号,所以在我的 Transformer class 中,我启动
private Map<String,Cancellable> scheduled = new HashMap<>();
在我的转换方法中,我确实执行了下面的代码
{
final Cancellable cancelSched = scheduled.get(recordKey);
// Every time I get a new event I cancel my previous Punctuator
// and schedule a new one ( context.schedule a few lines later)
if(cancelSched != null)
cancelSched.cancel();
// This is supposed to work like a closure by capturing the currentCancellable which in the next statement
// is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
// specific String as it is constantly renewed
// Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
// statement (cancelSched.cancel();)
Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
new CustomPunctuator(context, recordKey ,()-> scheduled ));
// Update Active Punctuators for a specific key.
scheduled.put(recordKey,currentCancellable);
}
然后我在 Punctuator punctuate 方法上使用注册的回调来取消最后一个活动的 Punctuator
在它开始之后。好像行得通(虽然不确定)但是感觉很"hacky"而不是那种解决方案
这当然是可取的。
那么如何在触发后取消标点符号。有办法解决这个问题吗?
我认为你可以做的一件事是:
class CustomPunctuator implements Punctuator {
final Cancellable schedule;
public void punctuate(final long timestamp) {
// business logic
if (/* do cancel */) {
schedule.cancel()
}
}
}
// registering a punctuation
{
final CustomPunctuator punctuation = new CustomPunctuator();
final Cancellable currentCancellable = context.schedule(
Duration.ofMillis(scheduleTime),
PunctuationType.WALL_CLOCK_TIME,
punctuation);
punctuation.schedule = currentCancellable;
}
这样,您就不需要维护 HashMap
并为每个 CustomPunctuator
实例提供一种取消自身的方法。
我遇到了同样的情况,只是为了对 scala 感兴趣的人,我将其处理为
val punctuation = new myPunctuation()
val scheduled:Cancellable=context.schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, punctuation)
punctuation.schedule=scheduled
class
class myPunctuation() extends Punctuator{
var schedule: Cancellable = _
override def punctuate(timestamp: Long): Unit = {
println("hello")
schedule.cancel()
}
}
很有魅力
我在转换器上创建了一个预定标点符号,并定期将其安排到 运行(使用 kafka v2.1.0)。每次我接受一个特定的密钥时,我都会像这样创建一个新密钥
scheduled = context.schedule (Duration.ofMillis(scheduleTime),
PunctuationType.WALL_CLOCK_TIME,new CustomPunctuator(context, customStateStoreName));
我的问题是,我不断创建所有这些标点符号 运行,但我找不到取消它们的方法。我在互联网上找到了一个片段可以使用
private Cancellable scheduled;
@Override
public void init(PorcessorContext processContext) {
this.context = processorContext;
scheduled = context.schedule(TimeUnit.SECONDS.toMillis(5), PunctuationType.WALL_CLOCK_TIME,
this::punctuateCancel);
}
private void punctuateCancel(long timestamp) {
scheduled.cancel();
}
但不幸的是,这似乎只取消了最新创建的标点符号。
我正在编辑我的 post 只是为了进一步了解我的方法以及这与 wardzinia 发表的评论有何关系。所以我的方法非常相似,只是使用一个 Map,因为每个事件键我只需要激活一个标点符号,所以在我的 Transformer class 中,我启动
private Map<String,Cancellable> scheduled = new HashMap<>();
在我的转换方法中,我确实执行了下面的代码
{
final Cancellable cancelSched = scheduled.get(recordKey);
// Every time I get a new event I cancel my previous Punctuator
// and schedule a new one ( context.schedule a few lines later)
if(cancelSched != null)
cancelSched.cancel();
// This is supposed to work like a closure by capturing the currentCancellable which in the next statement
// is moved to the scheduled map. Scheduled map at any point will have the only active Punctuator for a
// specific String as it is constantly renewed
// Note: Previous registered punctuators have already been cancelled as it can be seen by the previous
// statement (cancelSched.cancel();)
Cancellable currentCancellable = context.schedule(Duration.ofMillis(scheduleTime), PunctuationType.WALL_CLOCK_TIME,
new CustomPunctuator(context, recordKey ,()-> scheduled ));
// Update Active Punctuators for a specific key.
scheduled.put(recordKey,currentCancellable);
}
然后我在 Punctuator punctuate 方法上使用注册的回调来取消最后一个活动的 Punctuator 在它开始之后。好像行得通(虽然不确定)但是感觉很"hacky"而不是那种解决方案 这当然是可取的。
那么如何在触发后取消标点符号。有办法解决这个问题吗?
我认为你可以做的一件事是:
class CustomPunctuator implements Punctuator {
final Cancellable schedule;
public void punctuate(final long timestamp) {
// business logic
if (/* do cancel */) {
schedule.cancel()
}
}
}
// registering a punctuation
{
final CustomPunctuator punctuation = new CustomPunctuator();
final Cancellable currentCancellable = context.schedule(
Duration.ofMillis(scheduleTime),
PunctuationType.WALL_CLOCK_TIME,
punctuation);
punctuation.schedule = currentCancellable;
}
这样,您就不需要维护 HashMap
并为每个 CustomPunctuator
实例提供一种取消自身的方法。
我遇到了同样的情况,只是为了对 scala 感兴趣的人,我将其处理为
val punctuation = new myPunctuation()
val scheduled:Cancellable=context.schedule(Duration.ofSeconds(5), PunctuationType.WALL_CLOCK_TIME, punctuation)
punctuation.schedule=scheduled
class
class myPunctuation() extends Punctuator{
var schedule: Cancellable = _
override def punctuate(timestamp: Long): Unit = {
println("hello")
schedule.cancel()
}
}
很有魅力