Kafka state store stand by 任务在激活时需要很长时间
Kafka state store stand by task takes long time when becomes active
我们正面临一个与具有备用副本的 kafka 状态存储有关的问题。
我们有两个拓扑(流)。一种没有任何内部状态存储,一种带有内部状态存储。
我们有状态存储的拓扑问题。所以在这里我只提供有关该拓扑的信息:
该流的活动和备用任务在两个节点中的两个单独的 kubernetes pods 中 运行。
活动任务消耗一些kafka消息,处理它们并将它们写入状态存储。
持有活动任务的 pod 被杀死。
备用任务分配给变更日志主题。
但消费者正在寻求尽早抵消。由于此更改日志主题有数千条消息,因此需要很长时间才能完成。
我原以为与前一个活动任务的最新偏移量应该可用于备用(现在活动)任务,这样它就不需要抽取所有事件。
请纠正我,因为我 运行 没有选项,不知道去哪里进一步检查。
我使用的是kafka-streams版本2.5.0
配置:
这些是拓扑的配置值:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "myService");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, ONE_THOUSAND);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, THREE);
拓扑结构:
topology.addSource(AGENT_ADMIN_SOURCE, Serdes.String().deserializer(), serde.deserializer(), inboundAdminTopic)
.addSource(MS_SOURCE_MATCH, Serdes.String().deserializer(), kafkaAvroDeserializer(), inboundMatchTopic)
.addProcessor(MsProcessor.PROCESSOR_NAME,
() -> new MsProcessor(msActionManager(), stateStoreDAO()),
AGENT_ADMIN_SOURCE, MS_SOURCE_MATCH)
.addStateStore(agentAdminStateStore(), MsProcessor.PROCESSOR_NAME) //agent-state-store
.addStateStore(resourceMatchingStateStore(), MsProcessor.PROCESSOR_NAME) //service-matches-state-store
.addSink(...)
.addSink(...);
//当服务安装这个pod时获取stand by task
{"@timestamp":"2020-08-31T10:50:17.253+00:00","@version":"1","message":"stream-thread [myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1] partition assignment took 10 ms.\n\tcurrently assigned active tasks: []\n\tcurrently assigned standby tasks: [0_0]\n\trevoked active tasks: []\n\trevoked standby tasks: []\n","logger_name":"org.apache.kafka.streams.processor.internals.StreamThread","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
另一个 pod 被杀死。备用任务应从更改日志主题中的最新偏移量读取。但它最早出现。
{"@timestamp":"2020-08-31T10:50:17.302+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0,myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-agent-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-service-matches-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
请问为什么要寻找最早而不是最近的已知偏移量。
还有为什么我收到 数百条这样的消息
{"@timestamp":"2020-08-31T11:50:39.849+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0, myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
您的期望是正确的,您观察到的是 2.6.0 版本中已修复的错误:https://issues.apache.org/jira/browse/KAFKA-9501
Kafka Streams 在重新平衡中的默认行为是关闭所有任务,稍后再重新打开它们。但是,对于具有内存存储的 StandbyTasks,这意味着当关闭备用任务以便将其作为活动任务重新打开时,状态将丢失。对于持久存储,没有这样的问题,因为,好吧,状态是持久到磁盘的。
从2.6.0开始,逻辑发生了变化,standby任务在不关闭的情况下转换为active任务,因此内存存储的状态不再丢失。
我们正面临一个与具有备用副本的 kafka 状态存储有关的问题。
我们有两个拓扑(流)。一种没有任何内部状态存储,一种带有内部状态存储。
我们有状态存储的拓扑问题。所以在这里我只提供有关该拓扑的信息:
该流的活动和备用任务在两个节点中的两个单独的 kubernetes pods 中 运行。
活动任务消耗一些kafka消息,处理它们并将它们写入状态存储。 持有活动任务的 pod 被杀死。 备用任务分配给变更日志主题。 但消费者正在寻求尽早抵消。由于此更改日志主题有数千条消息,因此需要很长时间才能完成。
我原以为与前一个活动任务的最新偏移量应该可用于备用(现在活动)任务,这样它就不需要抽取所有事件。
请纠正我,因为我 运行 没有选项,不知道去哪里进一步检查。
我使用的是kafka-streams版本2.5.0
配置:
这些是拓扑的配置值:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "myService");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, ONE_THOUSAND);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, THREE);
拓扑结构:
topology.addSource(AGENT_ADMIN_SOURCE, Serdes.String().deserializer(), serde.deserializer(), inboundAdminTopic)
.addSource(MS_SOURCE_MATCH, Serdes.String().deserializer(), kafkaAvroDeserializer(), inboundMatchTopic)
.addProcessor(MsProcessor.PROCESSOR_NAME,
() -> new MsProcessor(msActionManager(), stateStoreDAO()),
AGENT_ADMIN_SOURCE, MS_SOURCE_MATCH)
.addStateStore(agentAdminStateStore(), MsProcessor.PROCESSOR_NAME) //agent-state-store
.addStateStore(resourceMatchingStateStore(), MsProcessor.PROCESSOR_NAME) //service-matches-state-store
.addSink(...)
.addSink(...);
//当服务安装这个pod时获取stand by task
{"@timestamp":"2020-08-31T10:50:17.253+00:00","@version":"1","message":"stream-thread [myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1] partition assignment took 10 ms.\n\tcurrently assigned active tasks: []\n\tcurrently assigned standby tasks: [0_0]\n\trevoked active tasks: []\n\trevoked standby tasks: []\n","logger_name":"org.apache.kafka.streams.processor.internals.StreamThread","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
另一个 pod 被杀死。备用任务应从更改日志主题中的最新偏移量读取。但它最早出现。
{"@timestamp":"2020-08-31T10:50:17.302+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0,myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-agent-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-service-matches-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
请问为什么要寻找最早而不是最近的已知偏移量。
还有为什么我收到 数百条这样的消息
{"@timestamp":"2020-08-31T11:50:39.849+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0, myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}
您的期望是正确的,您观察到的是 2.6.0 版本中已修复的错误:https://issues.apache.org/jira/browse/KAFKA-9501
Kafka Streams 在重新平衡中的默认行为是关闭所有任务,稍后再重新打开它们。但是,对于具有内存存储的 StandbyTasks,这意味着当关闭备用任务以便将其作为活动任务重新打开时,状态将丢失。对于持久存储,没有这样的问题,因为,好吧,状态是持久到磁盘的。
从2.6.0开始,逻辑发生了变化,standby任务在不关闭的情况下转换为active任务,因此内存存储的状态不再丢失。