为什么我的 ProducerBuilder class 在 .Net Core 控制台中的消息没有被 Kafka 消费者 shell 消费?
Why is my ProducerBuilder class's message in .Net Core console not consumed by Kafka consumer shell?
我已经在 Ubuntu 上安装了 Kafka,并成功地测试了一个仅使用 shell 个文件的简单场景:
- 使用默认配置启动了 zookeeper
- 使用默认配置启动节点或代理
- 创建了一个主题,为其命名,具有单个分区和复制因子 1,并将其与 zookeeper 默认地址相关联
- 打开生产者和消费者:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
其次是:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning
很好,一切正常。现在,我想关闭生产者并在 asp.net 核心应用程序中编写生产者代码。
using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;
namespace KafkaTraining
{
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
}
Console.ReadLine();
}
}
}
在这里找到灵感:
// https://docs.confluent.io/current/clients/dotnet.html#
所以,上面的C#代码应该等同于shell命令,对吧,再加上我用C#写的应该被消费者消费的消息(消费者还在终端打开window 感谢 shell 命令)。
如果我之前发布的 shell 命令有效,即 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
以及与 localhost:9092(服务地址)和主题名称相关的附加信息,为什么不C# 程序成功地替代了它,它应该产生消息“一条日志消息”并且终端应该使用它,但是没有。我该如何调试它?
PS。我已经从这个地址 Linux Ubuntu 安装了 Kafka:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz
而在我的 Asp.Net Core 3.1 控制台应用程序中,我已经安装了 1.5.0 版本,不确定它是否起作用,但不知道如何开始调试......
感谢任何指点。
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<StartupObject>KafkaDemo.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
</ItemGroup>
</Project>
您的消息未生成,因为您在传递消息之前处置了生成器。 Produce
方法异步发送消息并且不等待响应 - 它 returns 立即。为确保它已发送,您可以使用 Flush()
- 在发送所有 in-flight 消息之前阻塞,或者您可以使用 await ProduceAsync()
.
试试这个:
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
producer.Flush();
}
我已经在 Ubuntu 上安装了 Kafka,并成功地测试了一个仅使用 shell 个文件的简单场景:
- 使用默认配置启动了 zookeeper
- 使用默认配置启动节点或代理
- 创建了一个主题,为其命名,具有单个分区和复制因子 1,并将其与 zookeeper 默认地址相关联
- 打开生产者和消费者:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
其次是:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning
很好,一切正常。现在,我想关闭生产者并在 asp.net 核心应用程序中编写生产者代码。
using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;
namespace KafkaTraining
{
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
}
Console.ReadLine();
}
}
}
在这里找到灵感: // https://docs.confluent.io/current/clients/dotnet.html#
所以,上面的C#代码应该等同于shell命令,对吧,再加上我用C#写的应该被消费者消费的消息(消费者还在终端打开window 感谢 shell 命令)。
如果我之前发布的 shell 命令有效,即 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
以及与 localhost:9092(服务地址)和主题名称相关的附加信息,为什么不C# 程序成功地替代了它,它应该产生消息“一条日志消息”并且终端应该使用它,但是没有。我该如何调试它?
PS。我已经从这个地址 Linux Ubuntu 安装了 Kafka:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz 而在我的 Asp.Net Core 3.1 控制台应用程序中,我已经安装了 1.5.0 版本,不确定它是否起作用,但不知道如何开始调试...... 感谢任何指点。
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<StartupObject>KafkaDemo.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
</ItemGroup>
</Project>
您的消息未生成,因为您在传递消息之前处置了生成器。 Produce
方法异步发送消息并且不等待响应 - 它 returns 立即。为确保它已发送,您可以使用 Flush()
- 在发送所有 in-flight 消息之前阻塞,或者您可以使用 await ProduceAsync()
.
试试这个:
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
producer.Flush();
}