Kafka 保留策略未按预期工作
Kafka retention policy doesn't work as expected
我想为我们的一些用例实施数据重放,为此,我需要使用 Kafka 保留策略(我正在使用连接,我需要 window 时间是准确的)。
P.S。我正在使用 Kafka 版本 0.10.1.1
我正在像这样向主题发送数据:
kafkaProducer.send(
new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
);
然后我这样创建主题:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms=172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms=172800000
所以根据上面的设置,我应该把我的主题的保留时间设置为48小时。
我扩展 TimestampExtractor
以记录每条消息的实际时间。
public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord) {
LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
}
}
为了测试,我已经向我的主题发送了 4 条消息,我收到了这 4 条日志消息。
2017-02-28 10:23:39 INFO
ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP :
1488295086292 Human readble -Tue Feb 28 10:18:06 EST 2017
2017-02-28
10:24:01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 -
TIMESTAMP : 1483272000000 Human readble -Sun Jan 01 07:00:00 EST 2017
2017-02-28 10:26:11 INFO
ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP :
1485820800000 Human readble -Mon Jan 30 19:00:00 EST 2017
2017-02-28
10:27:22 INFO ConsumerRecordOrWallclockTimestampExtractor:21 -
TIMESTAMP : 1488295604411 Human readble -Tue Feb 28 10:26:44 EST 2017
因此,根据 Kafka's retention policy,我希望在 5 分钟后看到我的两条消息得到 purged/deleted(第二条和第三条消息是在 1 月 1 日和 1 月 30 日收到的)。但是我尝试使用我的主题一个小时,每次我使用我的主题时,我都会收到所有 4 条消息。
kafka-avro-console-consumer --zookeeper localhost:2181
--from-beginning --topic myTopic
我的 Kafka 配置是这样的:
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
我是做错了什么还是漏掉了什么?
Kafka 通过删除日志段来实现其保留策略。 Kafka 永远不会删除活动段,该段将在其中附加发送到分区的新消息。 Kafka 只删除旧的段。当新消息发送到分区时,Kafka 将活动段滚动到旧段,并且
- 包含新消息的活动段的大小将超过
log.segment.bytes
,或者
- 活动段中第一条消息的时间戳早于
log.roll.ms
(默认为 7 天)
因此在您的示例中,您必须在 2017 年美国东部时间 2 月 28 日星期二 10:18:06 之后等待 7 天,发送一条新消息,然后所有 4 条初始消息将被删除。
我想为我们的一些用例实施数据重放,为此,我需要使用 Kafka 保留策略(我正在使用连接,我需要 window 时间是准确的)。 P.S。我正在使用 Kafka 版本 0.10.1.1
我正在像这样向主题发送数据:
kafkaProducer.send(
new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
);
然后我这样创建主题:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms=172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms=172800000
所以根据上面的设置,我应该把我的主题的保留时间设置为48小时。
我扩展 TimestampExtractor
以记录每条消息的实际时间。
public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord) {
LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
}
}
为了测试,我已经向我的主题发送了 4 条消息,我收到了这 4 条日志消息。
2017-02-28 10:23:39 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP : 1488295086292 Human readble -Tue Feb 28 10:18:06 EST 2017
2017-02-28 10:24:01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP : 1483272000000 Human readble -Sun Jan 01 07:00:00 EST 2017
2017-02-28 10:26:11 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP : 1485820800000 Human readble -Mon Jan 30 19:00:00 EST 2017
2017-02-28 10:27:22 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP : 1488295604411 Human readble -Tue Feb 28 10:26:44 EST 2017
因此,根据 Kafka's retention policy,我希望在 5 分钟后看到我的两条消息得到 purged/deleted(第二条和第三条消息是在 1 月 1 日和 1 月 30 日收到的)。但是我尝试使用我的主题一个小时,每次我使用我的主题时,我都会收到所有 4 条消息。
kafka-avro-console-consumer --zookeeper localhost:2181 --from-beginning --topic myTopic
我的 Kafka 配置是这样的:
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
我是做错了什么还是漏掉了什么?
Kafka 通过删除日志段来实现其保留策略。 Kafka 永远不会删除活动段,该段将在其中附加发送到分区的新消息。 Kafka 只删除旧的段。当新消息发送到分区时,Kafka 将活动段滚动到旧段,并且
- 包含新消息的活动段的大小将超过
log.segment.bytes
,或者 - 活动段中第一条消息的时间戳早于
log.roll.ms
(默认为 7 天)
因此在您的示例中,您必须在 2017 年美国东部时间 2 月 28 日星期二 10:18:06 之后等待 7 天,发送一条新消息,然后所有 4 条初始消息将被删除。