Apache Kafka 客户端什么时候抛出 "Batch Expired" 异常?
When does the Apache Kafka client throw a "Batch Expired" exception?
使用 Apache Kafka Java 客户端 (0.9),我尝试使用 Kafka Producer class 向代理发送一长串记录。
异步send method returns immediately for a while, then starts blocking on each call for a short time period. After around thirty seconds, the client starts throwing exceptions (TimeoutException),带消息"Batch expired".
什么情况下会抛出这个异常?
此异常表明您正在以比发送记录更快的速度排队记录。
当你调用时send method, the ProducerRecord will be stored in an internal buffer for sending to the broker. The method returns immediately once the ProducerRecord已经被缓冲,不管它是否已经发送。
记录分为 批次 以发送给代理,以减少每条消息的传输监听并提高吞吐量。
一旦一条记录被添加到一个批次中,发送该批次就有一个时间限制,以确保它在指定的时间内被发送。这由生产者配置参数 request.timeout.ms 控制,默认为三十秒。
如果批次排队的时间超过超时限制,将抛出异常。该批次中的记录将从发送队列中删除。
增加超时限制,使用配置参数,将允许客户端在到期前将批次排队更长的时间。
控制发送到代理之前的时间的参数是linger.ms
。它的默认值为 0(无延迟)。
我在完全不同的上下文中遇到了这个异常。
我已经设置了一个迷你集群,其中包含一个 zookeeper 虚拟机、一个代理虚拟机和一个 producer/consumer 虚拟机。我在服务器 (9092) 和动物园管理员 (2181) 上打开了所有必要的端口,然后尝试从 consumer/publisher 虚拟机向代理发布一条消息。我得到了 OP 提到的异常,但由于到目前为止我只发布了一条消息(或者至少我尝试过),解决方案不能增加超时或批处理大小。所以我搜索并找到了这个邮件列表,它描述了我在尝试从 consumer/producer vm (ClosedChannelException) 中使用消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config
这个邮件列表的最后post实际上描述了如何解决这个问题。
长话短说,如果您同时遇到 ChannelClosedException
和 Batch Expired
异常,您可能必须将 server.config
文件中的这一行更改为以下内容并重新启动代理:
advertised.host.name=<broker public IP address>
如果未设置,则回退到 host.name
属性(可能两者都未设置),然后回退到 [=15] 的规范主机名=] Java class,这当然是不正确的,因此混淆了远程节点。
当您创建消费者时,将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 true。
我正在使用 Kafka Java 客户端版本 0.11.0.0。我也开始看到相同的模式无法始终如一地生成大消息。它通过了很少的消息,而对于其他一些消息则失败了。 (尽管通过和失败的消息大小相同)。在我的例子中,每条消息大小约为 60KB,远高于 Kafka 默认的 batch.size
16kB,而且我的 linger.ms
设置为默认值为 0。此错误是由于 Producer 客户端在收到来自 server.Basically 的成功响应之前超时而引发的,在我的代码中,此调用超时:kafkaProd.send(pr).get()
。要解决此问题,我必须将 Producer 客户端的默认 request.timeout.ms
增加到 60000
在 docker-compose 中与 Kafka 运行 有类似的问题。
我的 docker-compose.yml 设置为
KAFKA_ADVERTISED_HOST_NAME: kafka
ports:
- 9092:9092
但是当我试图从外面用骆驼发送消息时docker
to("kafka:test?brokers=localhost:9092")
我收到一个 TimeoutException。我通过添加
解决了它
127.0.0.1 kafka
到Windows\System32\drivers\etc\hosts文件然后把我的骆驼url改成
to("kafka:test?brokers=kafka:9092")
我解决了。
我的Kafka部署在Docker容器中,容器的网络模式是bridge,主机和容器使用端口映射,我把Kafka服务器的默认端口改成了9102。
server.properties中解决问题的配置项是这两个:
听众
advertised.listeners
我尝试了几种组合:
成功:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
服务器无法启动:
listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
超时错误:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102
使用 Apache Kafka Java 客户端 (0.9),我尝试使用 Kafka Producer class 向代理发送一长串记录。
异步send method returns immediately for a while, then starts blocking on each call for a short time period. After around thirty seconds, the client starts throwing exceptions (TimeoutException),带消息"Batch expired".
什么情况下会抛出这个异常?
此异常表明您正在以比发送记录更快的速度排队记录。
当你调用时send method, the ProducerRecord will be stored in an internal buffer for sending to the broker. The method returns immediately once the ProducerRecord已经被缓冲,不管它是否已经发送。
记录分为 批次 以发送给代理,以减少每条消息的传输监听并提高吞吐量。
一旦一条记录被添加到一个批次中,发送该批次就有一个时间限制,以确保它在指定的时间内被发送。这由生产者配置参数 request.timeout.ms 控制,默认为三十秒。
如果批次排队的时间超过超时限制,将抛出异常。该批次中的记录将从发送队列中删除。
增加超时限制,使用配置参数,将允许客户端在到期前将批次排队更长的时间。
控制发送到代理之前的时间的参数是linger.ms
。它的默认值为 0(无延迟)。
我在完全不同的上下文中遇到了这个异常。
我已经设置了一个迷你集群,其中包含一个 zookeeper 虚拟机、一个代理虚拟机和一个 producer/consumer 虚拟机。我在服务器 (9092) 和动物园管理员 (2181) 上打开了所有必要的端口,然后尝试从 consumer/publisher 虚拟机向代理发布一条消息。我得到了 OP 提到的异常,但由于到目前为止我只发布了一条消息(或者至少我尝试过),解决方案不能增加超时或批处理大小。所以我搜索并找到了这个邮件列表,它描述了我在尝试从 consumer/producer vm (ClosedChannelException) 中使用消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 这个邮件列表的最后post实际上描述了如何解决这个问题。
长话短说,如果您同时遇到 ChannelClosedException
和 Batch Expired
异常,您可能必须将 server.config
文件中的这一行更改为以下内容并重新启动代理:
advertised.host.name=<broker public IP address>
如果未设置,则回退到 host.name
属性(可能两者都未设置),然后回退到 [=15] 的规范主机名=] Java class,这当然是不正确的,因此混淆了远程节点。
当您创建消费者时,将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 true。
我正在使用 Kafka Java 客户端版本 0.11.0.0。我也开始看到相同的模式无法始终如一地生成大消息。它通过了很少的消息,而对于其他一些消息则失败了。 (尽管通过和失败的消息大小相同)。在我的例子中,每条消息大小约为 60KB,远高于 Kafka 默认的 batch.size
16kB,而且我的 linger.ms
设置为默认值为 0。此错误是由于 Producer 客户端在收到来自 server.Basically 的成功响应之前超时而引发的,在我的代码中,此调用超时:kafkaProd.send(pr).get()
。要解决此问题,我必须将 Producer 客户端的默认 request.timeout.ms
增加到 60000
在 docker-compose 中与 Kafka 运行 有类似的问题。 我的 docker-compose.yml 设置为
KAFKA_ADVERTISED_HOST_NAME: kafka
ports:
- 9092:9092
但是当我试图从外面用骆驼发送消息时docker
to("kafka:test?brokers=localhost:9092")
我收到一个 TimeoutException。我通过添加
解决了它127.0.0.1 kafka
到Windows\System32\drivers\etc\hosts文件然后把我的骆驼url改成
to("kafka:test?brokers=kafka:9092")
我解决了。
我的Kafka部署在Docker容器中,容器的网络模式是bridge,主机和容器使用端口映射,我把Kafka服务器的默认端口改成了9102。
server.properties中解决问题的配置项是这两个: 听众 advertised.listeners
我尝试了几种组合:
成功:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
服务器无法启动:
listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
超时错误:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102