nifi 作为 kafka 的生产者:读取 Kafka 时数据不是连续的
nifi as a producer to kafka: data is not sequential while reading Kafka
我正在使用 publishKafka_0_10 处理器将流文件从 nifi 发布到 kafka。通过代码从Kafka中读取数据时,数据的顺序是不保持的(按照时间戳排序)。我的数据集是这样的:时间戳、通道、值。
为了调试,我使用 PutSql 将相同的流文件发布到 phoenix,我可以看到在 Phoenix table 中,数据是顺序的(根据时间排序)。如果有人向我解释为什么我无法按顺序从 kafka 读取数据,那就太好了。 kafka中topic只有一个partition。提前致谢。
Kafka 只在一个分区内 gua运行tees 排序。既然你说这是一个分区,那好吧。
My data set is like: timestamp, channel,value.
消息时间戳只是记录元数据,(你自己的时间戳不会被NiFi传入Kafka ProducerRecordclass)。此外,时间戳对排序没有影响。换句话说,如果一个 "late timestamped" 消息在另一个 "earlier" 时间之前提交,那么是的,它按时间顺序是乱序的,但 Kafka 只是看到偏移量已经移动。
why am I not able to read data from kafka sequentially
你是,但是按照消息提交给 Kafka 的顺序。
您的消费者代码应提取记录时间戳并相应地对其重新排序。例如,Kafka Connect 有一个 Record Timestamp extractor,它可以根据这个时间将数据写入分区目录。我假设您的 PutSQL 处理器正在读取顺序排列的 FlowFiles(它们有自己的时间戳,而不是数据中的时间戳,除非您 运行 一个 ModifyAttribute 处理器),而不是使用 ConsumeKafka 处理器?
我正在使用 publishKafka_0_10 处理器将流文件从 nifi 发布到 kafka。通过代码从Kafka中读取数据时,数据的顺序是不保持的(按照时间戳排序)。我的数据集是这样的:时间戳、通道、值。
为了调试,我使用 PutSql 将相同的流文件发布到 phoenix,我可以看到在 Phoenix table 中,数据是顺序的(根据时间排序)。如果有人向我解释为什么我无法按顺序从 kafka 读取数据,那就太好了。 kafka中topic只有一个partition。提前致谢。
Kafka 只在一个分区内 gua运行tees 排序。既然你说这是一个分区,那好吧。
My data set is like: timestamp, channel,value.
消息时间戳只是记录元数据,(你自己的时间戳不会被NiFi传入Kafka ProducerRecordclass)。此外,时间戳对排序没有影响。换句话说,如果一个 "late timestamped" 消息在另一个 "earlier" 时间之前提交,那么是的,它按时间顺序是乱序的,但 Kafka 只是看到偏移量已经移动。
why am I not able to read data from kafka sequentially
你是,但是按照消息提交给 Kafka 的顺序。
您的消费者代码应提取记录时间戳并相应地对其重新排序。例如,Kafka Connect 有一个 Record Timestamp extractor,它可以根据这个时间将数据写入分区目录。我假设您的 PutSQL 处理器正在读取顺序排列的 FlowFiles(它们有自己的时间戳,而不是数据中的时间戳,除非您 运行 一个 ModifyAttribute 处理器),而不是使用 ConsumeKafka 处理器?