.Net Core Kafka ProduceAsync 未完成
.Net Core Kafka ProduceAsync doesn't complete
我正在尝试进行概念验证,简单的 Kafka 生产者和消费者。我使用的平台是 .Net Core 2.0,由 Visual Studio 2017 构建。我使用的 Nuget 包是 Confluent.Kafka
我通过研究找到了一些最成功的代码,但现在在实现 .ProduceAsync() 方法时似乎遇到了麻烦。没有出现错误代码,程序似乎继续 运行,但不执行该方法。
这是我的制作人:
class Producer
{
static void Main(string[] args)
{
Console.WriteLine("PRODUCER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the producer configuration
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
// Create the producer
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("Producer Created!");
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
Console.WriteLine($"Begin Event {i}");
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
Console.WriteLine("Press any key to continue...");
Console.ReadLine();
}
}
}
}
这是我的消费者:
class Consumer
{
static void Main(string[] args)
{
Console.WriteLine("CONSUMER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll(3000);
//consumer.Poll();
}
}
}
}
当我 运行 代码时,消费者处于空闲状态,因为它似乎没有生成任何消息,但这是我为生产者提供的控制台输出:
PRODUCER!!!
Producer Created!
Begin Event 0
(就是这样。此时控制台也处于空闲状态。)
问题是:如何解决这个问题,以便实际生成消息,以便消费者可以接收它们?
您可以通过将 producer 中的行更改为:
来强制获取结果
var result = producer.ProduceAsync(kafkaTopic, null, message).Result;
检查结果内容以找出问题所在。
给予:
producer.Flush(TimeSpan.FromSeconds(10);
一试。这里也有很多很好的例子:https://github.com/confluentinc/confluent-kafka-dotnet
我正在尝试进行概念验证,简单的 Kafka 生产者和消费者。我使用的平台是 .Net Core 2.0,由 Visual Studio 2017 构建。我使用的 Nuget 包是 Confluent.Kafka
我通过研究找到了一些最成功的代码,但现在在实现 .ProduceAsync() 方法时似乎遇到了麻烦。没有出现错误代码,程序似乎继续 运行,但不执行该方法。
这是我的制作人:
class Producer
{
static void Main(string[] args)
{
Console.WriteLine("PRODUCER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the producer configuration
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
// Create the producer
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("Producer Created!");
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
Console.WriteLine($"Begin Event {i}");
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
Console.WriteLine("Press any key to continue...");
Console.ReadLine();
}
}
}
}
这是我的消费者:
class Consumer
{
static void Main(string[] args)
{
Console.WriteLine("CONSUMER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll(3000);
//consumer.Poll();
}
}
}
}
当我 运行 代码时,消费者处于空闲状态,因为它似乎没有生成任何消息,但这是我为生产者提供的控制台输出:
PRODUCER!!!
Producer Created!
Begin Event 0
(就是这样。此时控制台也处于空闲状态。)
问题是:如何解决这个问题,以便实际生成消息,以便消费者可以接收它们?
您可以通过将 producer 中的行更改为:
来强制获取结果var result = producer.ProduceAsync(kafkaTopic, null, message).Result;
检查结果内容以找出问题所在。
给予:
producer.Flush(TimeSpan.FromSeconds(10);
一试。这里也有很多很好的例子:https://github.com/confluentinc/confluent-kafka-dotnet