Kafka:使用手动批处理使用分区 - 消息被跳过
Kafka: Consume partition with manual batching - Messages are being skipped
我正在使用 Confluent Kafka .NET 为分区主题创建消费者。
由于 Confluent Kafka .NET 不支持批量消费,我构建了一个消费消息的函数,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用了具有不同分区的结果并且 return 我能够使用的任何数量的消息,我就停止构建批处理到目前为止。
目标或 Objective: 我希望能够处理我在批处理中 return 编辑的消息,并提交这些消息的偏移量只要。即:
Message Consumed From Partition
Offset
Stored in Batch
0
0
Yes
0
1
Yes
2
0
No
根据上面的 table,我想处理从分区 0 收到的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对 ConsumeBatch 的另一个调用中获取.
为了提交,我只需调用同步 Commit
函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上面 table 中显示的批处理的第二条消息的偏移量(分区 0 - 偏移量 1)。
问题:
问题是,由于某种原因,当我构建如上所示的批处理时,由于验证而决定不处理的消息将被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。
正如您在下面的消费者配置中看到的,我已将 EnableAutoCommit 和 EnableAutoOffsetStore 都设置为 false。我认为这足以让消费者不对偏移量做任何事情并且能够在另一个 Consume
调用中获取被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。
任何人都可以告诉我我在这里缺少什么来实现所需的行为吗?
建批功能的简化版:
public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
int latestPartition = -1; // The partition from where we consumed the last message
for (int i = 0; i < batchSize; i++)
{
var result = _consumer.Consume(100);
if (result != null)
{
if (latestPartition == -1 || result.Partition.Value == latestPartition)
{
consumedMessages.Add(result);
latestPartition = result.Partition.Value;
}
else
break;
}
else
break;
}
return consumedMessages;
}
ConsumerConfig 用于实例化我的消费者客户端:
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
EnableAutoCommit = false,
AutoCommitIntervalMs = 0,
GroupId = "WorkerConsumers",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoOffsetStore = false,
};
附加信息:
正在测试:
- 1 个主题,有 6 个分区,复制因子为 2
- 3 位经纪人
- 1 个属于消费者组的单线程消费者客户端
- wsl2 的本地环境 Windows 10
关键是使用 Seek
函数将分区的偏移量重置为特定位置,以便可以将被忽略的消息作为另一批次的一部分再次拾取。
在上面的相同函数中:
public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
int latestPartition = -1; // The partition from where we consumed the last message
for (int i = 0; i < batchSize; i++)
{
var result = _consumer.Consume(100);
if (result != null)
{
if (latestPartition == -1 || result.Partition.Value == latestPartition)
{
consumedMessages.Add(result);
latestPartition = result.Partition.Value;
}
else
{
// This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
_consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
break;
}
}
else
break;
}
return consumedMessages;
}
我想一般来说,如果你想在不以任何方式改变偏移量的情况下使用消息(有点偷看主题分区),你可以调用 Consume
然后使用 Seek(result.TopicPartitionOffset)
来设置该主题分区的偏移量回到使用消息之前的位置。
我正在使用 Confluent Kafka .NET 为分区主题创建消费者。
由于 Confluent Kafka .NET 不支持批量消费,我构建了一个消费消息的函数,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用了具有不同分区的结果并且 return 我能够使用的任何数量的消息,我就停止构建批处理到目前为止。
目标或 Objective: 我希望能够处理我在批处理中 return 编辑的消息,并提交这些消息的偏移量只要。即:
Message Consumed From Partition | Offset | Stored in Batch |
---|---|---|
0 | 0 | Yes |
0 | 1 | Yes |
2 | 0 | No |
根据上面的 table,我想处理从分区 0 收到的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对 ConsumeBatch 的另一个调用中获取.
为了提交,我只需调用同步 Commit
函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上面 table 中显示的批处理的第二条消息的偏移量(分区 0 - 偏移量 1)。
问题:
问题是,由于某种原因,当我构建如上所示的批处理时,由于验证而决定不处理的消息将被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。
正如您在下面的消费者配置中看到的,我已将 EnableAutoCommit 和 EnableAutoOffsetStore 都设置为 false。我认为这足以让消费者不对偏移量做任何事情并且能够在另一个 Consume
调用中获取被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。
任何人都可以告诉我我在这里缺少什么来实现所需的行为吗?
建批功能的简化版:
public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
int latestPartition = -1; // The partition from where we consumed the last message
for (int i = 0; i < batchSize; i++)
{
var result = _consumer.Consume(100);
if (result != null)
{
if (latestPartition == -1 || result.Partition.Value == latestPartition)
{
consumedMessages.Add(result);
latestPartition = result.Partition.Value;
}
else
break;
}
else
break;
}
return consumedMessages;
}
ConsumerConfig 用于实例化我的消费者客户端:
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
EnableAutoCommit = false,
AutoCommitIntervalMs = 0,
GroupId = "WorkerConsumers",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoOffsetStore = false,
};
附加信息: 正在测试:
- 1 个主题,有 6 个分区,复制因子为 2
- 3 位经纪人
- 1 个属于消费者组的单线程消费者客户端
- wsl2 的本地环境 Windows 10
关键是使用 Seek
函数将分区的偏移量重置为特定位置,以便可以将被忽略的消息作为另一批次的一部分再次拾取。
在上面的相同函数中:
public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
int latestPartition = -1; // The partition from where we consumed the last message
for (int i = 0; i < batchSize; i++)
{
var result = _consumer.Consume(100);
if (result != null)
{
if (latestPartition == -1 || result.Partition.Value == latestPartition)
{
consumedMessages.Add(result);
latestPartition = result.Partition.Value;
}
else
{
// This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
_consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
break;
}
}
else
break;
}
return consumedMessages;
}
我想一般来说,如果你想在不以任何方式改变偏移量的情况下使用消息(有点偷看主题分区),你可以调用 Consume
然后使用 Seek(result.TopicPartitionOffset)
来设置该主题分区的偏移量回到使用消息之前的位置。