PHPRdfKafka 而不是创建新主题如何生成现有主题
PHPRdfKafka instead of creating new topic how to produce into existing topic
一个简单的问题,我正在尝试使用 php-rdkafka ( https://github.com/arnaud-lb/php-rdkafka ) 进入 Kafka。
我浏览了文档,但找不到生成现有主题的语法,除非语法 newTopic 将插入到我怀疑的现有主题中。我不断收到 Java 错误抛出并且我无法很好地调试 Java 错误。我正在向那些一直使用该框架的人寻求帮助,它的语法是否正确?请指教
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test"); // Is this a correct syntax to consume existing topic?
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
根据与开发商的讨论,
以下语法可用于创建新主题并将数据添加到 following/existing 主题
$topic = $producer->newTopic("test"); /
一个简单的问题,我正在尝试使用 php-rdkafka ( https://github.com/arnaud-lb/php-rdkafka ) 进入 Kafka。
我浏览了文档,但找不到生成现有主题的语法,除非语法 newTopic 将插入到我怀疑的现有主题中。我不断收到 Java 错误抛出并且我无法很好地调试 Java 错误。我正在向那些一直使用该框架的人寻求帮助,它的语法是否正确?请指教
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test"); // Is this a correct syntax to consume existing topic?
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
根据与开发商的讨论,
以下语法可用于创建新主题并将数据添加到 following/existing 主题
$topic = $producer->newTopic("test"); /