Kafka 生产者超时异常
Kafka Producer TimeOutException
我是 运行 正在将数据写入 Kafka 主题的 Samza 流作业。 Kafka 是 运行 一个 3 节点集群。 Samza 作业部署在 yarn 上。我们在容器日志中看到很多这样的异常:
INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
at org.apache.samza.system.kafka.KafkaSystemProducer$$anon.onCompletion(KafkaSystemProducer.scala:181)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time
这3种异常情况比较多
59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time
61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time
62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append
请帮助我了解这里的问题是什么。每当它发生时,Samza 容器就会重新启动。
该错误表明某些记录放入队列的速度快于它们从客户端发送的速度。
当您的 Producer 发送消息时,它们存储在缓冲区中(在发送到目标代理之前)并且记录被分组到批中以增加吞吐量。将新记录添加到批次时,必须在可配置的时间 window 内发送,该时间由 request.timeout.ms
控制(默认设置为 30 秒)。如果批处理在队列中的时间较长,则会抛出 TimeoutException
,然后批处理记录将从队列中删除,并且不会传递给代理。
增加 request.timeout.ms
的值应该可以解决问题。
如果这不起作用,您也可以尝试减少 batch.size
以便更频繁地发送批次(但这次将包含更少的消息)并确保 linger.ms
设置为0(默认值)。
请注意,您需要在更改任何配置参数后重新启动您的 kafka 代理。
如果您仍然收到错误消息,我认为您的网络出现了问题。您是否启用了 SSL?
我是 运行 正在将数据写入 Kafka 主题的 Samza 流作业。 Kafka 是 运行 一个 3 节点集群。 Samza 作业部署在 yarn 上。我们在容器日志中看到很多这样的异常:
INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
at org.apache.samza.system.kafka.KafkaSystemProducer$$anon.onCompletion(KafkaSystemProducer.scala:181)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time
这3种异常情况比较多
59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time
61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time
62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append
请帮助我了解这里的问题是什么。每当它发生时,Samza 容器就会重新启动。
该错误表明某些记录放入队列的速度快于它们从客户端发送的速度。
当您的 Producer 发送消息时,它们存储在缓冲区中(在发送到目标代理之前)并且记录被分组到批中以增加吞吐量。将新记录添加到批次时,必须在可配置的时间 window 内发送,该时间由 request.timeout.ms
控制(默认设置为 30 秒)。如果批处理在队列中的时间较长,则会抛出 TimeoutException
,然后批处理记录将从队列中删除,并且不会传递给代理。
增加 request.timeout.ms
的值应该可以解决问题。
如果这不起作用,您也可以尝试减少 batch.size
以便更频繁地发送批次(但这次将包含更少的消息)并确保 linger.ms
设置为0(默认值)。
请注意,您需要在更改任何配置参数后重新启动您的 kafka 代理。
如果您仍然收到错误消息,我认为您的网络出现了问题。您是否启用了 SSL?