Kafka state store stand by 任务在激活时需要很长时间

Kafka state store stand by task takes long time when becomes active

我们正面临一个与具有备用副本的 kafka 状态存储有关的问题。

我们有两个拓扑(流)。一种没有任何内部状态存储,一种带有内部状态存储。

我们有状态存储的拓扑问题。所以在这里我只提供有关该拓扑的信息:

该流的活动和备用任务在两个节点中的两个单独的 kubernetes pods 中 运行。

活动任务消耗一些kafka消息,处理它们并将它们写入状态存储。 持有活动任务的 pod 被杀死。 备用任务分配给变更日志主题。 但消费者正在寻求尽早抵消。由于此更改日志主题有数千条消息,因此需要很长时间才能完成。

我原以为与前一个活动任务的最新偏移量应该可用于备用(现在活动)任务,这样它就不需要抽取所有事件。

请纠正我,因为我 运行 没有选项,不知道去哪里进一步检查。

我使用的是kafka-streams版本2.5.0

配置:

这些是拓扑的配置值:

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "myService");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, ONE_THOUSAND);
    properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
    
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
    properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
    properties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
    properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, THREE); 

拓扑结构:

 topology.addSource(AGENT_ADMIN_SOURCE, Serdes.String().deserializer(), serde.deserializer(), inboundAdminTopic)
            .addSource(MS_SOURCE_MATCH, Serdes.String().deserializer(), kafkaAvroDeserializer(), inboundMatchTopic)
            .addProcessor(MsProcessor.PROCESSOR_NAME, 
                    () -> new MsProcessor(msActionManager(), stateStoreDAO()), 
                    AGENT_ADMIN_SOURCE, MS_SOURCE_MATCH)
            .addStateStore(agentAdminStateStore(), MsProcessor.PROCESSOR_NAME)   //agent-state-store
            .addStateStore(resourceMatchingStateStore(), MsProcessor.PROCESSOR_NAME)   //service-matches-state-store
            .addSink(...)
            .addSink(...);
        

//当服务安装这个pod时获取stand by task

{"@timestamp":"2020-08-31T10:50:17.253+00:00","@version":"1","message":"stream-thread [myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1] partition assignment took 10 ms.\n\tcurrently assigned active tasks: []\n\tcurrently assigned standby tasks: [0_0]\n\trevoked active tasks: []\n\trevoked standby tasks: []\n","logger_name":"org.apache.kafka.streams.processor.internals.StreamThread","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}

另一个 pod 被杀死。备用任务应从更改日志主题中的最新偏移量读取。但它最早出现。

{"@timestamp":"2020-08-31T10:50:17.302+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0,myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}

{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-agent-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}

{"@timestamp":"2020-08-31T10:50:17.306+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Seeking to EARLIEST offset of partition myService-service-matches-state-store-changelog-0","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}

请问为什么要寻找最早而不是最近的已知偏移量。

还有为什么我收到 数百条这样的消息

{"@timestamp":"2020-08-31T11:50:39.849+00:00","@version":"1","message":"[Consumer clientId=myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1-restore-consumer, groupId=null] Subscribed to partition(s): myService-agent-state-store-changelog-0, myService-service-matches-state-store-changelog-0 ", "logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"myService-418e5529-d591-46b3-b1f4-1da74e3926f4-StreamThread-1","level":"INFO","level_value":20000}

您的期望是正确的,您观察到的是 2.6.0 版本中已修复的错误:https://issues.apache.org/jira/browse/KAFKA-9501

Kafka Streams 在重新平衡中的默认行为是关闭所有任务,稍后再重新打开它们。但是,对于具有内存存储的 StandbyTasks,这意味着当关闭备用任务以便将其作为活动任务重新打开时,状态将丢失。对于持久存储,没有这样的问题,因为,好吧,状态是持久到磁盘的。

从2.6.0开始,逻辑发生了变化,standby任务在不关闭的情况下转换为active任务,因此内存存储的状态不再丢失。