Kafka 的 retention.ms 没有被 Kafka 0.10.2 强制执行?
Kafka's retention.ms is not being enforced with Kafka 0.10.2?
#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')
NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)
function red() {
echo -e "$RED$*$NORMAL"
}
function green() {
echo -e "$GREEN$*$NORMAL"
}
function yellow() {
echo -e "$YELLOW$*$NORMAL"
}
for topic in $topics; do
yellow "Cleaning up messages in topic @ " $topic
yellow "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done
red "Waiting 120 seconds for messages to expire"
sleep 120
for topic in $topics; do
green "Restoring config of topic @ " $topic
green "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
$KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done
当我 运行 这个脚本时 - 我可以看到 config.retention.ms 已更改为 100 毫秒,但在延迟 120 秒后 - 我仍然在所有 kafka 主题中看到相同的消息。
那么如何清除消息?
谢谢,
德米特里
您必须等待 log.retention.check.interval.ms
,默认为 5 分钟。
它比公认的答案要多一些。 Kafka 将消息存储在文件系统上的日志文件中。这些文件有翻转(按时间或大小配置)。一旦文件不再是当前文件,Kafka 将不再附加到该文件。
现在是有趣的部分:Kafka 不会使单个消息过期。一旦该文件中消息的最高时间戳早于 retention.ms
,它将(对于非压缩主题)删除整个日志文件。保留时间告诉您消息可用至少这么久,但它可能可用的时间更长(取决于滚动配置和消息量)。
在旧的 Kafka 版本中,这不是基于消息时间戳而是基于对日志文件的写入访问。感谢@dawsaw 指出这一点。
#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')
NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)
function red() {
echo -e "$RED$*$NORMAL"
}
function green() {
echo -e "$GREEN$*$NORMAL"
}
function yellow() {
echo -e "$YELLOW$*$NORMAL"
}
for topic in $topics; do
yellow "Cleaning up messages in topic @ " $topic
yellow "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done
red "Waiting 120 seconds for messages to expire"
sleep 120
for topic in $topics; do
green "Restoring config of topic @ " $topic
green "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
$KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done
当我 运行 这个脚本时 - 我可以看到 config.retention.ms 已更改为 100 毫秒,但在延迟 120 秒后 - 我仍然在所有 kafka 主题中看到相同的消息。
那么如何清除消息?
谢谢, 德米特里
您必须等待 log.retention.check.interval.ms
,默认为 5 分钟。
它比公认的答案要多一些。 Kafka 将消息存储在文件系统上的日志文件中。这些文件有翻转(按时间或大小配置)。一旦文件不再是当前文件,Kafka 将不再附加到该文件。
现在是有趣的部分:Kafka 不会使单个消息过期。一旦该文件中消息的最高时间戳早于 retention.ms
,它将(对于非压缩主题)删除整个日志文件。保留时间告诉您消息可用至少这么久,但它可能可用的时间更长(取决于滚动配置和消息量)。
在旧的 Kafka 版本中,这不是基于消息时间戳而是基于对日志文件的写入访问。感谢@dawsaw 指出这一点。