Read/Write 在 Cloudera 数据平台 CDP public 云中使用 Nifi 到 Kafka

Read/Write with Nifi to Kafka in Cloudera Data Platform CDP public cloud

Nifi 和 Kafka 现在都可以在 Cloudera 数据平台、CDP public 云中使用。 Nifi 擅长与一切对话,而 Kafka 是主流消息总线,我只是想知道:

在 CDP Public Cloud

中从 Apache Nifi 向 Kafka Produce/Consume 数据需要最少的步骤是什么

理想情况下,我会寻找适用于任何云的步骤,例如 Amazon AWS 和 Microsoft Azure。

我对遵循最佳实践并使用平台默认配置的答案感到满意,但如果有通用的替代方案,也欢迎使用。

将来会有多种形式可用,现在我假设您的环境包含 1 个带 NiFi 的数据中心和 1 个带 Kafka 的数据中心。 (如果两者都在同一个数据集线器上,答案仍然有效)。

先决条件

  • 带有 NiFi 和 Kafka 的数据中心
  • 访问这些的权限(例如添加处理器、创建 Kafka 主题)
  • 了解您的工作负载用户名(Cdp 管理控制台>单击您的名称(左下方)> 单击配置文件)
  • 您应该在同一位置设置您的工作负载密码

这些步骤允许您在 CDP Public Cloud

中将数据从 NiFi 生成到 Kafka

除非另有说明,否则我将所有内容都保留为默认设置。

在 Kafka Data Hub 集群中:

  1. 收集代理的 FQDN 链接和使用的端口。
  • 如果您有 Streams Messaging Manager:转到代理选项卡以查看 FQDN 和端口已经在一起
  • 如果您无法使用 Streams Messaging Manager:使用 Kafka 转到数据中心的硬件选项卡并获取相关节点的 FQDN。 (目前这些被称为经纪人)。然后在每个后面添加 :portnumber 。默认端口为 9093。
  1. 以这种格式将链接组合在一起:FQDN:port、FQDN:port、FQDN:port 现在看起来应该是这样的:

broker1.abc:9093,broker2.abc:9093,broker3.abc:9093

在 NiFi GUI 中:

  1. 确保您在 NiFi 中有一些数据要生成,例如使用 GenerateFlowFile 处理器
  2. Select写入kafka的相关处理器,例如PublishKafka_2_0,配置如下:
  • 设置
    • 自动终止关系:成功和失败都勾选
  • 属性
    • Kafka Brokers:我们之前创建的组合列表
    • 安全协议:SASL_SSL
    • SASL 机制:PLAIN
    • SSL 上下文服务:默认 NiFi SSL 上下文服务
    • 用户名:您的工作负载用户名(请参阅上面的先决条件)
    • 密码:您的工作负载密码
    • 主题名称:丹尼斯
    • 使用交易:false
    • 最长元数据等待时间:30 秒
  1. 将您的 GenerateFlowFile 处理器连接到 PublishKafka_2_0 处理器并启动流程

这些是最少的步骤,更详细的解释可以在 Cloudera Documentation 中找到。请注意,最好的做法是显式创建主题(此示例利用 Kafka 的功能,在生成时自动创建主题)。

这些步骤允许您在 CDP Public Cloud

中使用来自 Kafka 的 NiFi 使用数据

检查数据是否已写入 Kafka 的好方法正在再次使用它。

在 NiFi GUI 中:

  1. 创建一个Kafka消费处理器,例如ConsumeKafka_2_0,配置其Properties如下:
  • Kafka 代理、安全协议、SASL 机制、SSL 上下文服务、用户名、密码、主题名称:与上面的生产者示例相同
  • 消费者组:1
  • 偏移重置:最早
  1. 创建另一个处理器或一个漏斗以将消息发送到,并启动消费处理器。

就是这样,在 30 秒内您应该会看到您发布到 Kafka 的数据现在再次流入 NiFi。


完全披露:我是 Nifi 背后的驱动力 Cloudera 的员工。