Nifi,如何通过 Kafka avro 文件生成每个文件有多个记录
Nifi, how to produce via Kafka avro files with multiple records each file
我创建了一个管道来处理单个 json 文件(一个包含 5890 个元素的向量,每个元素一个记录)并通过 Kafka 以 avro 格式发送它。生产者工作正常,然后当我与消费者一起阅读时,我得到每条记录的流文件(avro 文件)。 5890 个 avro 文件。如何在单个 avro 文件中设置或合并更多记录?
我只是使用 PublishKafkaRecord_0_10 1.5.0(jsonTreeReader 1.5.0 和 AvroRecordSetWriter 1.5.0)和 ConsumeKafka_0_10 1.5.0 .
首先,NiFi 1.5.0 是从 2018 年 1 月开始的。请考虑升级,因为它已经过时了。 NiFi 1.15.3 是今天最新的。
其次,*Kafka_0_10 处理器适用于非常旧的 Kafka 版本 - 您真的在使用 v0.10 的 Kafka 吗?对于更高版本的 Kafka,您有以下处理器:
如果您提供您的输入和期望输出的示例以及您实际想要实现的目标,将会很有用。
如果你想在 NiFi 中使用这些消息,并且你想要一个包含许多消息的 FlowFile,你应该使用 ConsumeKafkaRecord 而不是 ConsumeKafka。这将让您控制每个 'file'.
您希望看到多少条记录
如果您的消费者不是 NiFi,那么要么他们需要在他们的末端合并,要么您需要在生成时将所有记录捆绑到一个更大的消息中。然而,这并不是 Kafka 的真正意义,因为它不适合大型 messages/files.
我创建了一个管道来处理单个 json 文件(一个包含 5890 个元素的向量,每个元素一个记录)并通过 Kafka 以 avro 格式发送它。生产者工作正常,然后当我与消费者一起阅读时,我得到每条记录的流文件(avro 文件)。 5890 个 avro 文件。如何在单个 avro 文件中设置或合并更多记录?
我只是使用 PublishKafkaRecord_0_10 1.5.0(jsonTreeReader 1.5.0 和 AvroRecordSetWriter 1.5.0)和 ConsumeKafka_0_10 1.5.0 .
首先,NiFi 1.5.0 是从 2018 年 1 月开始的。请考虑升级,因为它已经过时了。 NiFi 1.15.3 是今天最新的。
其次,*Kafka_0_10 处理器适用于非常旧的 Kafka 版本 - 您真的在使用 v0.10 的 Kafka 吗?对于更高版本的 Kafka,您有以下处理器:
如果您提供您的输入和期望输出的示例以及您实际想要实现的目标,将会很有用。
如果你想在 NiFi 中使用这些消息,并且你想要一个包含许多消息的 FlowFile,你应该使用 ConsumeKafkaRecord 而不是 ConsumeKafka。这将让您控制每个 'file'.
您希望看到多少条记录如果您的消费者不是 NiFi,那么要么他们需要在他们的末端合并,要么您需要在生成时将所有记录捆绑到一个更大的消息中。然而,这并不是 Kafka 的真正意义,因为它不适合大型 messages/files.