Kafka Producer 导致 org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
Kafka Producer causes org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
我是 运行 Kubernetes 上的 3-zookeeper-cluster 和 3-kafka-cluster。
卡夫卡好像是运行.
但是,如果我向某个主题生成一些消息并检查该主题,则根本没有消息。
这是我的经纪人说的。那说一些无效的接收或其他东西,有趣的是试图使主题运作良好但生产。
我还可以观看我早期在 Topics-ui 上制作的主题或模式,这是代理的 GUI 工具。
Schema-registry、Connect、Rest 的日志很好,所以代理似乎 运行 很好。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
这是我使用 terraform 的代理配置
Statefulset
port {
container_port = 9092
}
env {
name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_DEFAULT_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"
value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
}
env {
name = "KAFKA_ZOOKEEPER_CONNECT"
value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
}
env {
name = "POD_IP"
value_from {
field_ref {
field_path = "status.podIP"
}
}
}
env {
name = "HOST_IP"
value_from {
field_ref {
field_path = "status.hostIP"
}
}
}
env {
name = "POD_NAME"
value_from {
field_ref {
field_path = "metadata.name"
}
}
}
env {
name = "POD_NAMESPACE"
value_from {
field_ref {
field_path = "metadata.namespace"
}
}
}
command = [
"sh",
"-exec",
"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
]
服务
resource "kubernetes_service" "kafka-service" {
metadata {
name = "kafka-service"
labels = {
app = "broker"
}
}
spec {
selector = {
app = "broker"
}
port {
port = 9092
}
cluster_ip = "None"
}
尝试生成的代码
kafka-console-producer --broker-list kafka-service:9092 --topic test
我最初的猜测是您尝试接收的请求可能太大了。最大大小是 socket.request.max.bytes
的默认大小,即 100MB。因此,如果您有一条大于 100MB 的消息,请尝试在 server.properties
下增加此变量的值。
如果您尝试连接到非 SSL 侦听器,就好像它是 SSL 一样,也会发生这种情况。
验证 9092 是代理上的 SSL 侦听器端口。资料来源:https://github.com/edenhill/librdkafka/issues/1680#issuecomment-364883669
我是 运行 Kubernetes 上的 3-zookeeper-cluster 和 3-kafka-cluster。
卡夫卡好像是运行.
但是,如果我向某个主题生成一些消息并检查该主题,则根本没有消息。
这是我的经纪人说的。那说一些无效的接收或其他东西,有趣的是试图使主题运作良好但生产。
我还可以观看我早期在 Topics-ui 上制作的主题或模式,这是代理的 GUI 工具。
Schema-registry、Connect、Rest 的日志很好,所以代理似乎 运行 很好。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
这是我使用 terraform 的代理配置 Statefulset
port {
container_port = 9092
}
env {
name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_DEFAULT_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"
value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
}
env {
name = "KAFKA_ZOOKEEPER_CONNECT"
value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
}
env {
name = "POD_IP"
value_from {
field_ref {
field_path = "status.podIP"
}
}
}
env {
name = "HOST_IP"
value_from {
field_ref {
field_path = "status.hostIP"
}
}
}
env {
name = "POD_NAME"
value_from {
field_ref {
field_path = "metadata.name"
}
}
}
env {
name = "POD_NAMESPACE"
value_from {
field_ref {
field_path = "metadata.namespace"
}
}
}
command = [
"sh",
"-exec",
"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
]
服务
resource "kubernetes_service" "kafka-service" {
metadata {
name = "kafka-service"
labels = {
app = "broker"
}
}
spec {
selector = {
app = "broker"
}
port {
port = 9092
}
cluster_ip = "None"
}
尝试生成的代码
kafka-console-producer --broker-list kafka-service:9092 --topic test
我最初的猜测是您尝试接收的请求可能太大了。最大大小是 socket.request.max.bytes
的默认大小,即 100MB。因此,如果您有一条大于 100MB 的消息,请尝试在 server.properties
下增加此变量的值。
如果您尝试连接到非 SSL 侦听器,就好像它是 SSL 一样,也会发生这种情况。 验证 9092 是代理上的 SSL 侦听器端口。资料来源:https://github.com/edenhill/librdkafka/issues/1680#issuecomment-364883669