Samza 任务未在一个分区上接收
Samza task not receiving on one partition
我的一项 samza 任务有一个令人费解的问题。除了一个分区上的消息外,它工作正常。我有 9 个关于这个主题的分区。如果我发送 1000 条消息,我只会收到大约 890 条消息。
我已经使用分区键检查了 kafka-console-consumer,我知道我的 samza 作业不会处理这些分区键,并且控制台使用者 确实 看到了消息,所以我知道它是针对主题写的,至少普通消费者可以看到它就好了。
我在 samza 上启用了调试日志记录,org.apache.samza.checkpoint.kafka.KafkaCheckpointManager
有很多消息说:
Adding checkpoint Checkpoint [offsets={SystemStreamPartition [kafka,
com.mycompany.indexing.document, 4]=448}] for taskName Partition
4
分区 4 总是说 448。分区 0 也有类似的日志,但在它说 448 的地方,它是一个稳定增加的数字。
我很乐意分享任何有助于缩小范围的有趣配置信息,但现在,我什至对我要分享的内容感到有点困惑。
我 运行 和 ThreadJobFactory
一样:
samza-kafka_2.10 版本 0.9.1
kafka_2.10 客户端版本 0.8.2.1
kafka 代理 0.9.0.0
更新
我查看了使用相同分区键的上游 samza 作业,发现上游分区 4 存在问题。使用 kafkacat 检查 samza 检查点主题,我看到分区 4 的检查点没有前进。首先我看到:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713
然后一分钟后我看到:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722
该数字没有超过 2556。但是,查看分区 4 上 resource.mutation
的实际主题,最后一个偏移量的范围与其他偏移量相似,截至目前约为 61000,并且还在增长。
根本没有错误消息或警告消息。它只是停止使用分区 4。
问题是有一条消息超过了 kafka 消费者的默认 max.message.bytes
。但是,负责使用该分区的线程不会给出任何类型的错误消息,而是简单地挂起该消息。其他分区线程将愉快地继续。
一旦我们将 systems.kafka.consumer.fetch.message.max.bytes
配置为足够大的值以使用分区上的每条消息并重新启动作业,它就会从中断的地方开始,一切都按预期开始工作。
我的一项 samza 任务有一个令人费解的问题。除了一个分区上的消息外,它工作正常。我有 9 个关于这个主题的分区。如果我发送 1000 条消息,我只会收到大约 890 条消息。
我已经使用分区键检查了 kafka-console-consumer,我知道我的 samza 作业不会处理这些分区键,并且控制台使用者 确实 看到了消息,所以我知道它是针对主题写的,至少普通消费者可以看到它就好了。
我在 samza 上启用了调试日志记录,org.apache.samza.checkpoint.kafka.KafkaCheckpointManager
有很多消息说:
Adding checkpoint Checkpoint [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}] for taskName Partition 4
分区 4 总是说 448。分区 0 也有类似的日志,但在它说 448 的地方,它是一个稳定增加的数字。
我很乐意分享任何有助于缩小范围的有趣配置信息,但现在,我什至对我要分享的内容感到有点困惑。
我 运行 和 ThreadJobFactory
一样:
samza-kafka_2.10 版本 0.9.1
kafka_2.10 客户端版本 0.8.2.1
kafka 代理 0.9.0.0
更新
我查看了使用相同分区键的上游 samza 作业,发现上游分区 4 存在问题。使用 kafkacat 检查 samza 检查点主题,我看到分区 4 的检查点没有前进。首先我看到:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713
然后一分钟后我看到:
{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722
该数字没有超过 2556。但是,查看分区 4 上 resource.mutation
的实际主题,最后一个偏移量的范围与其他偏移量相似,截至目前约为 61000,并且还在增长。
根本没有错误消息或警告消息。它只是停止使用分区 4。
问题是有一条消息超过了 kafka 消费者的默认 max.message.bytes
。但是,负责使用该分区的线程不会给出任何类型的错误消息,而是简单地挂起该消息。其他分区线程将愉快地继续。
一旦我们将 systems.kafka.consumer.fetch.message.max.bytes
配置为足够大的值以使用分区上的每条消息并重新启动作业,它就会从中断的地方开始,一切都按预期开始工作。