当消费者 运行 时,在 Confluent.Kafka 中使用主题消息
Use a message for a topic in Confluent.Kafka when consumer run
我在 .netCore 项目中使用 Confluent.Kafka(1.4.4) 作为消息代理。在项目启动时,我只将“bootstrapservers”设置为 appSetting.json 文件中的特定服务器,必要时我会在 API 中生成消息,并在相关 class 中使用下面的代码:
public async Task WriteMessage<T>(string topicName, T message)
{
using (var p = new ProducerBuilder<Null, string>(_producerConfig).Build())
{
try
{
var serializedMessage= JsonConvert.SerializeObject(message);
var dr = await p.ProduceAsync(topicName, new Message<Null, string> { Value = serializedMessage });
logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
logger.LogInformation($"Delivery failed: {e.Error.Reason}");
}
}
}
我还在消费者解决方案中添加了以下代码:
public async Task Run()
{
using (var consumerBuilder = new ConsumerBuilder<Ignore, string>(_consumerConfig).Build())
{
consumerBuilder.Subscribe(new List<string>() { "ActiveMemberCardForPanClubEvent", "CreatePanClubEvent", "RemovePanClubEvent"
});
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumer = consumerBuilder.Consume(cts.Token);
if (consumer.Message != null)
{
using (LogContext.PushProperty("RequestId", Guid.NewGuid()))
{
//Do something
logger.LogInformation($"Consumed message '{consumer.Message.Value}' at: '{consumer.TopicPartitionOffset}'.");
await DoJob(consumer.Topic, consumer.Message.Value);
consumer.Topic.Remove(0, consumer.Topic.Length);
}
}
else
{
logger.LogInformation($"message is null for topic '{consumer.Topic}'and partition : '{consumer.TopicPartitionOffset}' .");
consumer.Topic.Remove(0, consumer.Topic.Length);
}
}
catch (ConsumeException e)
{
logger.LogInformation($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumerBuilder.Close();
}
}
}
我生成了一条消息,当消费者项目是 运行 时,一切都很顺利,消息正在消费者解决方案中读取。
当消费者项目不是 运行 并且我在 API 中与消息生产者在 API 中排队消息时会出现问题。在 运行ning 消费者之后,该主题没有任何有效消息正在生成。
我熟悉并有使用消息代理的经验,我知道通过发送一条消息,它会一直在总线上,直到它被使用,但我不明白为什么它不能在这个项目中与 Kafka 一起工作。
“auto.offset.reset”消费者 属性 的默认设置是“最新”。
这意味着(在尚未写入偏移量的情况下)如果您向某个主题写入消息然后启动消费者,它将跳过在消费者启动之前写入的任何消息。这可能就是您的消费者看不到生产者排队的消息的原因。
解决方案是将“auto.offset.reset”设置为“earliest”,这意味着消费者将从该主题的最早偏移量开始。
https://docs.confluent.io/current/installation/configuration/consumer-configs.html#auto.offset.reset
我在 .netCore 项目中使用 Confluent.Kafka(1.4.4) 作为消息代理。在项目启动时,我只将“bootstrapservers”设置为 appSetting.json 文件中的特定服务器,必要时我会在 API 中生成消息,并在相关 class 中使用下面的代码:
public async Task WriteMessage<T>(string topicName, T message)
{
using (var p = new ProducerBuilder<Null, string>(_producerConfig).Build())
{
try
{
var serializedMessage= JsonConvert.SerializeObject(message);
var dr = await p.ProduceAsync(topicName, new Message<Null, string> { Value = serializedMessage });
logger.LogInformation($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
logger.LogInformation($"Delivery failed: {e.Error.Reason}");
}
}
}
我还在消费者解决方案中添加了以下代码:
public async Task Run()
{
using (var consumerBuilder = new ConsumerBuilder<Ignore, string>(_consumerConfig).Build())
{
consumerBuilder.Subscribe(new List<string>() { "ActiveMemberCardForPanClubEvent", "CreatePanClubEvent", "RemovePanClubEvent"
});
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumer = consumerBuilder.Consume(cts.Token);
if (consumer.Message != null)
{
using (LogContext.PushProperty("RequestId", Guid.NewGuid()))
{
//Do something
logger.LogInformation($"Consumed message '{consumer.Message.Value}' at: '{consumer.TopicPartitionOffset}'.");
await DoJob(consumer.Topic, consumer.Message.Value);
consumer.Topic.Remove(0, consumer.Topic.Length);
}
}
else
{
logger.LogInformation($"message is null for topic '{consumer.Topic}'and partition : '{consumer.TopicPartitionOffset}' .");
consumer.Topic.Remove(0, consumer.Topic.Length);
}
}
catch (ConsumeException e)
{
logger.LogInformation($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumerBuilder.Close();
}
}
}
我生成了一条消息,当消费者项目是 运行 时,一切都很顺利,消息正在消费者解决方案中读取。 当消费者项目不是 运行 并且我在 API 中与消息生产者在 API 中排队消息时会出现问题。在 运行ning 消费者之后,该主题没有任何有效消息正在生成。 我熟悉并有使用消息代理的经验,我知道通过发送一条消息,它会一直在总线上,直到它被使用,但我不明白为什么它不能在这个项目中与 Kafka 一起工作。
“auto.offset.reset”消费者 属性 的默认设置是“最新”。
这意味着(在尚未写入偏移量的情况下)如果您向某个主题写入消息然后启动消费者,它将跳过在消费者启动之前写入的任何消息。这可能就是您的消费者看不到生产者排队的消息的原因。
解决方案是将“auto.offset.reset”设置为“earliest”,这意味着消费者将从该主题的最早偏移量开始。
https://docs.confluent.io/current/installation/configuration/consumer-configs.html#auto.offset.reset