Kafka 消费者 - 如何识别偏移量 skipping/missing 偏移量?

Kafka consumer - how to recognized offset skipping/missing offsets?

设置:
我们有一个 Debezium/Kafka Connect 设置与一个 Debezium Oracle 生产者和一个 Confluend JDBC consumer/sink.

起始位置/背景/问题:
由于高流量,我们已将 log.retention.minutes 减少到 1h,这在 99% 的时间里都是合适的。 但是在极少数情况下,其中一个 kafka 消费者会变慢并且无法再跟上。在这种情况下,消息将在消费者接收和处理之前在 Kafka 中删除(由于上述保留期)。 在默认配置中,消费者将跳过丢失的记录,选择最早的可用偏移量。这会导致目标端的不一致。

问题:
如何处理这些情况(如果提高 log.retention.minutes 不是一个选项)?
注意:如果消费者在找不到给定偏移量的消息时抛出 exception/stop/etc,我们会很好。

我们到目前为止所做的努力...
我们尝试将消费者的 auto.offset.reset 设置为 none,并希望消费者在找不到偏移量时停止。从理论上讲,这应该有效。实际上,当消费者被实例化时,它会立即抛出异常,因为没有 first/initial 偏移量。

最后的想法 那么我们可以使用另一个配置参数吗? (比如“如果偏移量为 missing/skipped 但不是在第一次启动时抛出异常”?)或者是否有我们可以监控的 JMX 指标以防消费者跳过消息?

setting auto.offset.reset to none for the consumer and expected the consumer to stop in case it can't find an offset

这就是它要做的,是的。

In practice it immediately throws an exception when the consumer gets instantiated because there's no first/initial offset

您需要首先实际初始化组,然后将其查找到最早的偏移量。例如。 kafka-consumer-offsets --reset-offsets --to-earliest --group connect-<name>

Something like "throw exception if offset is missing/skipped, but not on first start"?)

auto.offset.reset“第一次”和“第二次”开始之间没有任何区别。但是,您可以使用 consumer.override.auto.offset.reset=earliest 创建连接器,然后等待它成为 运行,然后使用 PUT /config 调用将其设置回 none。然后在它再次停止时重复 运行。

JMX metric we could monitor in case a consumer is skipping messages

据我所知没有;指标主要是报告处理的字节数。您必须另外跟踪您希望它读取多少字节。

您需要其他监控解决方案来检测代理上正在删除的日志段,并跟踪这些偏移范围与您的消费者当前正在读取的偏移量的比较。