卡夫卡集群设置超时读取套接字错误
Getting Timed out reading socket error for kafka cluster setup
我想为三个具有相同队列的类似应用程序设置一个 kafka 集群类型设置,例如 AppA -> {TopicX, TopicY, TopicZ}, AppB -> {TopicX, TopicZ}, AppC -> {TopicX, TopicY} .生产者和消费者将特定于应用程序。
我设置了 kafka 集群,其中三个代理在具有不同端口的三个不同配置文件中具有分区 1、2、3。然后启动kafka服务器(集群)
我正在使用 http://github.com/nmred/kafka-php
的 kafka php 包装器
所以我为 App A 使用了生产者代码,例如
$producer->setRequireAck(-1);
$producer->setMessages("TopicX", 0, array(json_encode($this->data)));
$producer->send();
并且为 App B 使用了生产者代码,例如
$producer->setRequireAck(-1);
$producer->setMessages("TopicX", 1, array(json_encode($this->data)));
$producer->send();
等等。
然后我为
等三个应用程序制作了我的消费者脚本
$queues = array("TopicX", "TopicY", "TopicZ");
while(true) {
foreach($queues as $queue) {
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
$consumer->setPartition($queue, 0);
$result = $consumer->fetch();
}
}
但是当我尝试为任何应用程序执行消费者脚本时,我收到类似
的错误
"Timed out reading socket while reading 750437 bytes with 750323 bytes to go"
我只是不知道如何解决这个问题 我尝试修改一些 kafka 配置参数,例如
zookeeper.connection.timeout.ms=24000 # Initially 6000
replica.socket.timeout.ms=15000 # Not exists in default file
但这没有用。
您实际上是在破坏 Kafka 消费者,方法是将其声明到您的 foreach 循环中,将其移出循环,使用该循环设置分区,然后获取结果并区分来源:
$queues = array("TopicX", "TopicY", "TopicZ");
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
foreach($queues as $queue) {
$consumer->setPartition($queue, 0);
}
while(true) {
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
foreach ($topic as $partId => $partition) {
var_dump($partition->getHighOffset());
foreach ($partition as $message) {
var_dump((string)$message);
}
}
}
}
查看 kafka php 项目的 github README 以查看示例。
我想为三个具有相同队列的类似应用程序设置一个 kafka 集群类型设置,例如 AppA -> {TopicX, TopicY, TopicZ}, AppB -> {TopicX, TopicZ}, AppC -> {TopicX, TopicY} .生产者和消费者将特定于应用程序。 我设置了 kafka 集群,其中三个代理在具有不同端口的三个不同配置文件中具有分区 1、2、3。然后启动kafka服务器(集群)
我正在使用 http://github.com/nmred/kafka-php
的 kafka php 包装器所以我为 App A 使用了生产者代码,例如
$producer->setRequireAck(-1);
$producer->setMessages("TopicX", 0, array(json_encode($this->data)));
$producer->send();
并且为 App B 使用了生产者代码,例如
$producer->setRequireAck(-1);
$producer->setMessages("TopicX", 1, array(json_encode($this->data)));
$producer->send();
等等。
然后我为
等三个应用程序制作了我的消费者脚本 $queues = array("TopicX", "TopicY", "TopicZ");
while(true) {
foreach($queues as $queue) {
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
$consumer->setPartition($queue, 0);
$result = $consumer->fetch();
}
}
但是当我尝试为任何应用程序执行消费者脚本时,我收到类似
的错误"Timed out reading socket while reading 750437 bytes with 750323 bytes to go"
我只是不知道如何解决这个问题 我尝试修改一些 kafka 配置参数,例如
zookeeper.connection.timeout.ms=24000 # Initially 6000
replica.socket.timeout.ms=15000 # Not exists in default file
但这没有用。
您实际上是在破坏 Kafka 消费者,方法是将其声明到您的 foreach 循环中,将其移出循环,使用该循环设置分区,然后获取结果并区分来源:
$queues = array("TopicX", "TopicY", "TopicZ");
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
foreach($queues as $queue) {
$consumer->setPartition($queue, 0);
}
while(true) {
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
foreach ($topic as $partId => $partition) {
var_dump($partition->getHighOffset());
foreach ($partition as $message) {
var_dump((string)$message);
}
}
}
}
查看 kafka php 项目的 github README 以查看示例。