Read/Write 在 Cloudera 数据平台 CDP public 云中使用 Nifi 到 Kafka
Read/Write with Nifi to Kafka in Cloudera Data Platform CDP public cloud
Nifi 和 Kafka 现在都可以在 Cloudera 数据平台、CDP public 云中使用。 Nifi 擅长与一切对话,而 Kafka 是主流消息总线,我只是想知道:
在 CDP Public Cloud
中从 Apache Nifi 向 Kafka Produce/Consume 数据需要最少的步骤是什么
理想情况下,我会寻找适用于任何云的步骤,例如 Amazon AWS 和 Microsoft Azure。
我对遵循最佳实践并使用平台默认配置的答案感到满意,但如果有通用的替代方案,也欢迎使用。
将来会有多种形式可用,现在我假设您的环境包含 1 个带 NiFi 的数据中心和 1 个带 Kafka 的数据中心。 (如果两者都在同一个数据集线器上,答案仍然有效)。
先决条件
- 带有 NiFi 和 Kafka 的数据中心
- 访问这些的权限(例如添加处理器、创建 Kafka 主题)
- 了解您的工作负载用户名(Cdp 管理控制台>单击您的名称(左下方)> 单击配置文件)
- 您应该在同一位置设置您的工作负载密码
这些步骤允许您在 CDP Public Cloud
中将数据从 NiFi 生成到 Kafka
除非另有说明,否则我将所有内容都保留为默认设置。
在 Kafka Data Hub 集群中:
- 收集代理的 FQDN 链接和使用的端口。
- 如果您有 Streams Messaging Manager:转到代理选项卡以查看 FQDN 和端口已经在一起
- 如果您无法使用 Streams Messaging Manager:使用 Kafka 转到数据中心的硬件选项卡并获取相关节点的 FQDN。 (目前这些被称为经纪人)。然后在每个后面添加 :portnumber 。默认端口为 9093。
- 以这种格式将链接组合在一起:FQDN:port、FQDN:port、FQDN:port 现在看起来应该是这样的:
broker1.abc:9093,broker2.abc:9093,broker3.abc:9093
在 NiFi GUI 中:
- 确保您在 NiFi 中有一些数据要生成,例如使用
GenerateFlowFile
处理器
- Select写入kafka的相关处理器,例如
PublishKafka_2_0
,配置如下:
- 设置
- 自动终止关系:成功和失败都勾选
- 属性
- Kafka Brokers:我们之前创建的组合列表
- 安全协议:SASL_SSL
- SASL 机制:PLAIN
- SSL 上下文服务:默认 NiFi SSL 上下文服务
- 用户名:您的工作负载用户名(请参阅上面的先决条件)
- 密码:您的工作负载密码
- 主题名称:丹尼斯
- 使用交易:false
- 最长元数据等待时间:30 秒
- 将您的
GenerateFlowFile
处理器连接到 PublishKafka_2_0
处理器并启动流程
这些是最少的步骤,更详细的解释可以在 Cloudera Documentation 中找到。请注意,最好的做法是显式创建主题(此示例利用 Kafka 的功能,在生成时自动创建主题)。
这些步骤允许您在 CDP Public Cloud
中使用来自 Kafka 的 NiFi 使用数据
检查数据是否已写入 Kafka 的好方法正在再次使用它。
在 NiFi GUI 中:
- 创建一个Kafka消费处理器,例如
ConsumeKafka_2_0
,配置其Properties如下:
- Kafka 代理、安全协议、SASL 机制、SSL 上下文服务、用户名、密码、主题名称:与上面的生产者示例相同
- 消费者组:1
- 偏移重置:最早
- 创建另一个处理器或一个漏斗以将消息发送到,并启动消费处理器。
就是这样,在 30 秒内您应该会看到您发布到 Kafka 的数据现在再次流入 NiFi。
完全披露:我是 Nifi 背后的驱动力 Cloudera 的员工。
Nifi 和 Kafka 现在都可以在 Cloudera 数据平台、CDP public 云中使用。 Nifi 擅长与一切对话,而 Kafka 是主流消息总线,我只是想知道:
在 CDP Public Cloud
中从 Apache Nifi 向 Kafka Produce/Consume 数据需要最少的步骤是什么理想情况下,我会寻找适用于任何云的步骤,例如 Amazon AWS 和 Microsoft Azure。
我对遵循最佳实践并使用平台默认配置的答案感到满意,但如果有通用的替代方案,也欢迎使用。
将来会有多种形式可用,现在我假设您的环境包含 1 个带 NiFi 的数据中心和 1 个带 Kafka 的数据中心。 (如果两者都在同一个数据集线器上,答案仍然有效)。
先决条件
- 带有 NiFi 和 Kafka 的数据中心
- 访问这些的权限(例如添加处理器、创建 Kafka 主题)
- 了解您的工作负载用户名(Cdp 管理控制台>单击您的名称(左下方)> 单击配置文件)
- 您应该在同一位置设置您的工作负载密码
这些步骤允许您在 CDP Public Cloud
中将数据从 NiFi 生成到 Kafka除非另有说明,否则我将所有内容都保留为默认设置。
在 Kafka Data Hub 集群中:
- 收集代理的 FQDN 链接和使用的端口。
- 如果您有 Streams Messaging Manager:转到代理选项卡以查看 FQDN 和端口已经在一起
- 如果您无法使用 Streams Messaging Manager:使用 Kafka 转到数据中心的硬件选项卡并获取相关节点的 FQDN。 (目前这些被称为经纪人)。然后在每个后面添加 :portnumber 。默认端口为 9093。
- 以这种格式将链接组合在一起:FQDN:port、FQDN:port、FQDN:port 现在看起来应该是这样的:
broker1.abc:9093,broker2.abc:9093,broker3.abc:9093
在 NiFi GUI 中:
- 确保您在 NiFi 中有一些数据要生成,例如使用
GenerateFlowFile
处理器 - Select写入kafka的相关处理器,例如
PublishKafka_2_0
,配置如下:
- 设置
- 自动终止关系:成功和失败都勾选
- 属性
- Kafka Brokers:我们之前创建的组合列表
- 安全协议:SASL_SSL
- SASL 机制:PLAIN
- SSL 上下文服务:默认 NiFi SSL 上下文服务
- 用户名:您的工作负载用户名(请参阅上面的先决条件)
- 密码:您的工作负载密码
- 主题名称:丹尼斯
- 使用交易:false
- 最长元数据等待时间:30 秒
- 将您的
GenerateFlowFile
处理器连接到PublishKafka_2_0
处理器并启动流程
这些是最少的步骤,更详细的解释可以在 Cloudera Documentation 中找到。请注意,最好的做法是显式创建主题(此示例利用 Kafka 的功能,在生成时自动创建主题)。
这些步骤允许您在 CDP Public Cloud
中使用来自 Kafka 的 NiFi 使用数据检查数据是否已写入 Kafka 的好方法正在再次使用它。
在 NiFi GUI 中:
- 创建一个Kafka消费处理器,例如
ConsumeKafka_2_0
,配置其Properties如下:
- Kafka 代理、安全协议、SASL 机制、SSL 上下文服务、用户名、密码、主题名称:与上面的生产者示例相同
- 消费者组:1
- 偏移重置:最早
- 创建另一个处理器或一个漏斗以将消息发送到,并启动消费处理器。
就是这样,在 30 秒内您应该会看到您发布到 Kafka 的数据现在再次流入 NiFi。
完全披露:我是 Nifi 背后的驱动力 Cloudera 的员工。