Kafka Streams:用于聚合的自定义 TimestampExtractor
Kafka Streams: Custom TimestampExtractor for aggregation
我正在构建一个非常简单的 KafkaStreams 演示应用程序来测试用例。
我无法升级我正在使用的 Kafka 代理(当前版本为 0.10.0),并且有几条消息是由 0.10.0 之前的生产者编写的,所以我使用的是自定义 TimestampExtractor ,我将其作为默认值添加到我的主要 class:
开头的配置中
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);
当使用我的源主题时,这工作得很好。但是当使用聚合运算符时,我 运行 进入异常 因为从内部聚合主题消费时使用 TimestampExtractor
的 FailOnInvalidTimestamp
实现而不是自定义实现。
Streams 应用程序的代码如下所示:
...
KStream<String, MyValueClass> clickStream = streamsBuilder
.stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));
KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
.map(((key, value) -> new KeyValue<>(value.getId(), value)))
.groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.count();
...
我遇到的异常如下:
Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
现在的问题是:有什么方法可以让 Kafka Streams 在读取内部聚合主题时使用自定义 TimestampExtractor
(最好同时仍然使用 Streams DSL)?
您无法更改时间戳提取器(从 v1.0.0
开始)。出于正确性原因,这是不允许的。
但是我真的很想知道,时间戳为-1的记录是如何首先写入该主题的。 Kafka Streams 使用您的自定义提取器在写入记录时提供的时间戳。另请注意,KafkaProducer
不允许写入具有负时间戳的记录。
因此,我能想到的唯一解释是其他生产者确实写入了重新分区主题——这是不允许的……只有 Kafka Streams 应该写入重新分区主题。
我想,您需要删除此主题并让 Kafka Streams 重新创建它以恢复到干净状态。
来自其他答案的discussion/comment:
You need 0.10+ format to work with Kafka Streams. If you upgrade your brokers and keep 0.9 format or older, Kafka Streams might not work as expected.
这是众所周知的问题:-)。我在项目中的老客户遇到了同样的问题,这些老客户仍在使用像 0.9 这样的旧 Kafka 客户端,并且在与一些“未认证”的 .NET 客户端通信时也是如此。
所以我专门写了class:
public class MyTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LogManager.getLogger( MyTimestampExtractor.class );
@Override
public long extract ( ConsumerRecord<Object, Object> consumerRecord, long previousTimestamp ) {
final long timestamp = consumerRecord.timestamp();
if ( timestamp < 0 ) {
final String msg = consumerRecord.toString().trim();
LOG.warn( "Record has wrong Kafka timestamp: {}. It will be patched with local timestamp. Details: {}", timestamp, msg );
return System.currentTimeMillis();
}
return timestamp;
}
}
当有很多消息时,您可以跳过记录,因为它可能会泛滥。
阅读 Matthias 的回答后,我仔细检查了所有内容,问题的原因是 Kafka Broker 和 Kafka Streams 应用程序之间的版本不兼容。 我愚蠢到将 Kafka Streams 1.0.0 与 0.10.1.1 Broker 一起使用,这在 Kafka Wiki 中明确说明为不兼容 here。
编辑(感谢 Matthias):问题的真正原因是我们的 0.10.1.x 代理使用的日志格式仍然是 0.9.0.x,这是不兼容的与卡夫卡流。
我正在构建一个非常简单的 KafkaStreams 演示应用程序来测试用例。
我无法升级我正在使用的 Kafka 代理(当前版本为 0.10.0),并且有几条消息是由 0.10.0 之前的生产者编写的,所以我使用的是自定义 TimestampExtractor ,我将其作为默认值添加到我的主要 class:
开头的配置中config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);
当使用我的源主题时,这工作得很好。但是当使用聚合运算符时,我 运行 进入异常 因为从内部聚合主题消费时使用 TimestampExtractor
的 FailOnInvalidTimestamp
实现而不是自定义实现。
Streams 应用程序的代码如下所示:
...
KStream<String, MyValueClass> clickStream = streamsBuilder
.stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));
KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
.map(((key, value) -> new KeyValue<>(value.getId(), value)))
.groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.count();
...
我遇到的异常如下:
Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
现在的问题是:有什么方法可以让 Kafka Streams 在读取内部聚合主题时使用自定义 TimestampExtractor
(最好同时仍然使用 Streams DSL)?
您无法更改时间戳提取器(从 v1.0.0
开始)。出于正确性原因,这是不允许的。
但是我真的很想知道,时间戳为-1的记录是如何首先写入该主题的。 Kafka Streams 使用您的自定义提取器在写入记录时提供的时间戳。另请注意,KafkaProducer
不允许写入具有负时间戳的记录。
因此,我能想到的唯一解释是其他生产者确实写入了重新分区主题——这是不允许的……只有 Kafka Streams 应该写入重新分区主题。
我想,您需要删除此主题并让 Kafka Streams 重新创建它以恢复到干净状态。
来自其他答案的discussion/comment:
You need 0.10+ format to work with Kafka Streams. If you upgrade your brokers and keep 0.9 format or older, Kafka Streams might not work as expected.
这是众所周知的问题:-)。我在项目中的老客户遇到了同样的问题,这些老客户仍在使用像 0.9 这样的旧 Kafka 客户端,并且在与一些“未认证”的 .NET 客户端通信时也是如此。
所以我专门写了class:
public class MyTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LogManager.getLogger( MyTimestampExtractor.class );
@Override
public long extract ( ConsumerRecord<Object, Object> consumerRecord, long previousTimestamp ) {
final long timestamp = consumerRecord.timestamp();
if ( timestamp < 0 ) {
final String msg = consumerRecord.toString().trim();
LOG.warn( "Record has wrong Kafka timestamp: {}. It will be patched with local timestamp. Details: {}", timestamp, msg );
return System.currentTimeMillis();
}
return timestamp;
}
}
当有很多消息时,您可以跳过记录,因为它可能会泛滥。
阅读 Matthias 的回答后,我仔细检查了所有内容,问题的原因是 Kafka Broker 和 Kafka Streams 应用程序之间的版本不兼容。 我愚蠢到将 Kafka Streams 1.0.0 与 0.10.1.1 Broker 一起使用,这在 Kafka Wiki 中明确说明为不兼容 here。
编辑(感谢 Matthias):问题的真正原因是我们的 0.10.1.x 代理使用的日志格式仍然是 0.9.0.x,这是不兼容的与卡夫卡流。