Kafka Streams:提交不会发生
Kafka Streams : Commit doesn't happen
我是 Kafka Streams 的新手,我正在尝试在超时情况下试验 kafka 流的行为。
这是我使用处理器 API 测试的场景:
我的 kafka 流应用程序从一个 kafka 主题(字符串键,字符串消息)中消费并写入一个 kafka 主题(字符串键,字符串消息)
我已将消费者配置参数 max.poll.interval.ms 设置为 60000 毫秒。
我的处理方法是这样的:
public void process(String key, String value) {
System.out.println("the key is : " +key);
LocalDateTime start= LocalDateTime.now();
System.out.println("startTime:" + dtf.format(start));
if ( key.startsWith("12345678"){
Thread.sleep(80000);
}
System.out.println("done sleeping");
LocalDateTime end=LocalDateTime.now();
System.out.println("endTime:" + dtf.format(end));
System.out.println("Offset*****"+context.offset()+"
partitionId****"+context.partition()+"taskId*****"+context.taskId()+
"javaThreadId*******"+ Thread.currentThread().getId()+ "
value****"+value);
}
所有其他配置均设置为默认值。
我正在尝试查看如果处理时间超过 max.poll.interval.ms 时应用程序的行为方式。
事情是这样的:
在第一次尝试时,它开始使用来自 kafka 主题的消息,并在调用 process() 时开始休眠。
60000 毫秒后,它再次调用 process 方法,没有抛出任何异常,但此时它在 20000 毫秒打印后退出睡眠,"done sleeping" 并将消息发布到输出主题。在此之后,它再次开始使用来自相同偏移量的相同消息而不提交。这是循环发生的。
示例输出:
关键是:12345678
开始时间:2018/07/09 07:34:25
关键是:12345678
开始时间:2018/07/09 07:35:27
睡觉了
结束时间:2018/07/09 07:35:45
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc
关键是:12345678
开始时间:2018/07/09 07:36:27
睡觉了
结束时间:2018/07/09 07:36:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******14 value*****abc
关键是:12345678
开始时间:2018/07/09 07:37:27
睡觉了
结束时间:2018/07/09 07:37:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc
- 我试过显式调用 context#commit() 但它也不起作用。我在这里错过了什么? kafka streams会记住之前的处理状态吗?如果不是,为什么它在第一次尝试后 20000 毫秒后说 "done sleeping"(mall.poll.interval.ms- 60000,处理时间(睡眠)设置为 80000 毫秒)?
附加信息:
我的输入和输出主题各有 2 个分区,我已将 StreamsConfig num.streams.threads 配置为 2。
我有一个 3 节点的 kafka 集群 - Kafka 和 Kafka Streams 版本 1.1.0
我在任何地方都不使用标点符号方法或任何复杂的处理。
提前致谢。
不是 100% 确定,但是请注意:如果你调用 context#commit()
你只 "request" 一个提交并且 Kafka Streams 尝试尽快提交——但是在 context#commit()
返回之后,提交还没有发生...
另请注意,如果您的超时时间为 60.000
并且您休眠了 80.000
,则您的应用程序应该从消费者组中删除,因此不允许再提交。对于这种情况,日志中应该有 WARN 日志消息。
希望对您有所帮助。
我是 Kafka Streams 的新手,我正在尝试在超时情况下试验 kafka 流的行为。
这是我使用处理器 API 测试的场景:
我的 kafka 流应用程序从一个 kafka 主题(字符串键,字符串消息)中消费并写入一个 kafka 主题(字符串键,字符串消息)
我已将消费者配置参数 max.poll.interval.ms 设置为 60000 毫秒。
我的处理方法是这样的:
public void process(String key, String value) { System.out.println("the key is : " +key); LocalDateTime start= LocalDateTime.now(); System.out.println("startTime:" + dtf.format(start)); if ( key.startsWith("12345678"){ Thread.sleep(80000); } System.out.println("done sleeping"); LocalDateTime end=LocalDateTime.now(); System.out.println("endTime:" + dtf.format(end)); System.out.println("Offset*****"+context.offset()+" partitionId****"+context.partition()+"taskId*****"+context.taskId()+ "javaThreadId*******"+ Thread.currentThread().getId()+ " value****"+value); }
所有其他配置均设置为默认值。
我正在尝试查看如果处理时间超过 max.poll.interval.ms 时应用程序的行为方式。
事情是这样的: 在第一次尝试时,它开始使用来自 kafka 主题的消息,并在调用 process() 时开始休眠。 60000 毫秒后,它再次调用 process 方法,没有抛出任何异常,但此时它在 20000 毫秒打印后退出睡眠,"done sleeping" 并将消息发布到输出主题。在此之后,它再次开始使用来自相同偏移量的相同消息而不提交。这是循环发生的。
示例输出:
关键是:12345678
开始时间:2018/07/09 07:34:25
关键是:12345678
开始时间:2018/07/09 07:35:27
睡觉了
结束时间:2018/07/09 07:35:45
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc
关键是:12345678
开始时间:2018/07/09 07:36:27
睡觉了
结束时间:2018/07/09 07:36:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******14 value*****abc
关键是:12345678
开始时间:2018/07/09 07:37:27
睡觉了
结束时间:2018/07/09 07:37:47
Offset*****224 partitionId****0taskId*****0_0javaThreadId*******12 value*****abc
- 我试过显式调用 context#commit() 但它也不起作用。我在这里错过了什么? kafka streams会记住之前的处理状态吗?如果不是,为什么它在第一次尝试后 20000 毫秒后说 "done sleeping"(mall.poll.interval.ms- 60000,处理时间(睡眠)设置为 80000 毫秒)?
附加信息:
我的输入和输出主题各有 2 个分区,我已将 StreamsConfig num.streams.threads 配置为 2。
我有一个 3 节点的 kafka 集群 - Kafka 和 Kafka Streams 版本 1.1.0
我在任何地方都不使用标点符号方法或任何复杂的处理。
提前致谢。
不是 100% 确定,但是请注意:如果你调用 context#commit()
你只 "request" 一个提交并且 Kafka Streams 尝试尽快提交——但是在 context#commit()
返回之后,提交还没有发生...
另请注意,如果您的超时时间为 60.000
并且您休眠了 80.000
,则您的应用程序应该从消费者组中删除,因此不允许再提交。对于这种情况,日志中应该有 WARN 日志消息。
希望对您有所帮助。