配置kafka-net停止发送最新消息
Configure kafka-net to stop sending latest messages
我在带有 kafka-net 插件的 Red Hat VM 上使用 kafka 0.8.1.1。我如何配置我的消费者停止接收来自 kafka 的早期消息?
我的消费代码:
var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));
Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));
ThreadStart start2 = () =>
{
while (true)
{
sp.Start();
foreach (var message in consumer.Consume())
{
if (MessageDecoderReceiver.MessageBase(message.Value) != null)
{
PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
}
else
{
Console.WriteLine(message.Value);
}
}
sp.Stop();
}
};
var thread2 = new Thread(start2);
thread2.Start();
Kafka-net 中的消费者当前不会自动跟踪正在消费的偏移量。您将必须手动实施偏移跟踪。
在kafka版本0.8.1中存储偏移量:
var commit = new OffsetCommitRequest
{
ConsumerGroup = consumerGroup,
OffsetCommits = new List<OffsetCommit>
{
new OffsetCommit
{
PartitionId = partitionId,
Topic = IntegrationConfig.IntegrationTopic,
Offset = offset,
Metadata = metadata
}
}
};
var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();
设置消费者在特定偏移点开始导入:
var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();
var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);
注意上面的代码会将消费者设置为在日志的最后开始消费,实际上只接收新消息。
我在带有 kafka-net 插件的 Red Hat VM 上使用 kafka 0.8.1.1。我如何配置我的消费者停止接收来自 kafka 的早期消息?
我的消费代码:
var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));
Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));
ThreadStart start2 = () =>
{
while (true)
{
sp.Start();
foreach (var message in consumer.Consume())
{
if (MessageDecoderReceiver.MessageBase(message.Value) != null)
{
PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
}
else
{
Console.WriteLine(message.Value);
}
}
sp.Stop();
}
};
var thread2 = new Thread(start2);
thread2.Start();
Kafka-net 中的消费者当前不会自动跟踪正在消费的偏移量。您将必须手动实施偏移跟踪。
在kafka版本0.8.1中存储偏移量:
var commit = new OffsetCommitRequest
{
ConsumerGroup = consumerGroup,
OffsetCommits = new List<OffsetCommit>
{
new OffsetCommit
{
PartitionId = partitionId,
Topic = IntegrationConfig.IntegrationTopic,
Offset = offset,
Metadata = metadata
}
}
};
var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();
设置消费者在特定偏移点开始导入:
var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();
var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);
注意上面的代码会将消费者设置为在日志的最后开始消费,实际上只接收新消息。