Kafka Streams 应用程序死于 "StreamsException: Could not create internal topics."
Kafka Streams Application died with "StreamsException: Could not create internal topics."
我正在评估 Kafka 流并制作了一个简单的应用程序并在 运行 过夜。我 运行 它在 2 个实例上,每个实例有 1 个流线程。我有一个 2 代理 Kafka 集群。
StreamsConfig:
private Map<String, Object> settings() {
Map<String, Object> settings = new HashMap<>();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "fare_tracker");
settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverAddress + ":" + serverPort);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
settings.put(StreamsConfig.STATE_DIR_CONFIG, directoryName);
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, AvroTimeStampExtractor.class);
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
settings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
settings.put("schema.registry.url", "http://zookeeper1:8081");
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
settings.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "snappy");
settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 3);
settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 500);
return settings;
}
它在开始大约 12 小时后死亡,堆栈跟踪如下:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:81)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:628)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:382)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:501)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access00(AbstractCoordinator.java:89)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:451)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:433)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:186)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
我找到了一些相关的 WARN 和 ERROR 日志。
{
"@timestamp": "2017-06-07T05:44:26.996+05:30",
"@version": 1,
"message": "Got error produce response with correlation id 198191 on topic-partition fare_tracker-small_window-changelog-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION",
"logger_name": "org.apache.kafka.clients.producer.internals.Sender",
"thread_name": "kafka-producer-network-thread | fare_tracker-9e0a04f4-c1cc-4b61-8ca5-8bf25f18549f-StreamThread-1-producer",
"level": "WARN",
"level_value": 30000
}
^这似乎是一个普遍的网络问题,我为生产者配置了 3 次重试。
Application1 终止并显示以下日志:
{
"@timestamp": "2017-06-07T06:20:35.122+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Failed to commit StreamTask 2_61 state: ",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000,
"stack_trace": org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:79)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
}
{
"@timestamp": "2017-06-07T06:20:35.236+05:30",
"@version": 1,
"message": "Bootstrap broker kafka2:9092 disconnected",
"logger_name": "org.apache.kafka.clients.NetworkClient",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:36.100+05:30",
"@version": 1,
"message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
"logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:36.914+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
应用程序 2 终止并显示以下日志:
{
"@timestamp": "2017-06-07T06:20:06.254+05:30",
"@version": 1,
"message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
"logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:07.041+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
我检查了我的其他应用程序,虽然它们 运行 都很好,但我在与上面大致相同的时间多次看到以下日志。
{
"@timestamp": "2017-06-07T06:10:34.962+05:30",
"@version": 1,
"message": "Publishing to kafka failed ",
"thread_name": "kafka-producer-network-thread | producer-1",
"level": "ERROR",
"level_value": 40000,
"stack_trace": org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
at org.springframework.kafka.core.KafkaTemplate.onCompletion(KafkaTemplate.java:255)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
}
您说您有 2 个 Kafka 代理,但其中一个错误消息包含以下信息:
Could not create internal topics: Found only 1 brokers, but replication factor is 2.
您的应用程序和 Kafka 代理之间(可能还有 Kafka 代理本身之间)似乎存在网络连接问题。如果此类网络问题持续较长时间,最终尝试与 Kafka 代理通信的应用程序迟早会失败(取决于它们的设置)。
我正在评估 Kafka 流并制作了一个简单的应用程序并在 运行 过夜。我 运行 它在 2 个实例上,每个实例有 1 个流线程。我有一个 2 代理 Kafka 集群。
StreamsConfig:
private Map<String, Object> settings() {
Map<String, Object> settings = new HashMap<>();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "fare_tracker");
settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverAddress + ":" + serverPort);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
settings.put(StreamsConfig.STATE_DIR_CONFIG, directoryName);
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, AvroTimeStampExtractor.class);
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
settings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
settings.put("schema.registry.url", "http://zookeeper1:8081");
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
settings.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "snappy");
settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 3);
settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 500);
return settings;
}
它在开始大约 12 小时后死亡,堆栈跟踪如下:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:81)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:628)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:382)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:501)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access00(AbstractCoordinator.java:89)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:451)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:433)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:186)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
我找到了一些相关的 WARN 和 ERROR 日志。
{
"@timestamp": "2017-06-07T05:44:26.996+05:30",
"@version": 1,
"message": "Got error produce response with correlation id 198191 on topic-partition fare_tracker-small_window-changelog-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION",
"logger_name": "org.apache.kafka.clients.producer.internals.Sender",
"thread_name": "kafka-producer-network-thread | fare_tracker-9e0a04f4-c1cc-4b61-8ca5-8bf25f18549f-StreamThread-1-producer",
"level": "WARN",
"level_value": 30000
}
^这似乎是一个普遍的网络问题,我为生产者配置了 3 次重试。
Application1 终止并显示以下日志:
{
"@timestamp": "2017-06-07T06:20:35.122+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Failed to commit StreamTask 2_61 state: ",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000,
"stack_trace": org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:79)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
}
{
"@timestamp": "2017-06-07T06:20:35.236+05:30",
"@version": 1,
"message": "Bootstrap broker kafka2:9092 disconnected",
"logger_name": "org.apache.kafka.clients.NetworkClient",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:36.100+05:30",
"@version": 1,
"message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
"logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:36.914+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
应用程序 2 终止并显示以下日志:
{
"@timestamp": "2017-06-07T06:20:06.254+05:30",
"@version": 1,
"message": "Could not create internal topics: Found only 1 brokers, but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
"logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
{
"@timestamp": "2017-06-07T06:20:07.041+05:30",
"@version": 1,
"message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
"logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name": "StreamThread-1",
"level": "WARN",
"level_value": 30000
}
我检查了我的其他应用程序,虽然它们 运行 都很好,但我在与上面大致相同的时间多次看到以下日志。
{
"@timestamp": "2017-06-07T06:10:34.962+05:30",
"@version": 1,
"message": "Publishing to kafka failed ",
"thread_name": "kafka-producer-network-thread | producer-1",
"level": "ERROR",
"level_value": 40000,
"stack_trace": org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
at org.springframework.kafka.core.KafkaTemplate.onCompletion(KafkaTemplate.java:255)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
}
您说您有 2 个 Kafka 代理,但其中一个错误消息包含以下信息:
Could not create internal topics: Found only 1 brokers, but replication factor is 2.
您的应用程序和 Kafka 代理之间(可能还有 Kafka 代理本身之间)似乎存在网络连接问题。如果此类网络问题持续较长时间,最终尝试与 Kafka 代理通信的应用程序迟早会失败(取决于它们的设置)。