如何使用 QueryCassandra 和 ExecutePySpark Nifi 处理器将我的 cassandra 数据传输到 pyspark?
How do I transfer my cassandra data to pyspark using QueryCassandra and ExecutePySpark Nifi Processors?
我只是使用 querycassandra 处理器查询 cassandra table 但我不明白的是如何将我的 Json 输出文件作为输入文件传递到 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮我解决这个问题,谢谢。
我的查询 Cassandra 属性:
Pyspark 属性:
考虑如下使用 4 个处理器的流程:
QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark
步骤 1:QueryCassandra
处理器:在 Cassandra 上执行 CQL 并将结果输出到流文件中。
步骤 2:UpdateAttribute
处理器:为 属性 filename
分配一个包含磁盘上临时文件名称的值,该文件将包含查询结果。使用 NiFi expression language 生成文件名,以便每个 运行 都不同。创建一个 属性 result_directory
并为 NiFi 具有写入权限的磁盘上的文件夹分配一个值。
- 属性:
filename
值:cassandra_result_${now():toNumber()}
属性: result_directory
- 值:
/tmp
步骤 3:PutFile
处理器:使用步骤 2 中填充的值 ${result_directory}
配置 Directory
属性。
步骤 4:ExecutePySpark
处理器:通过 PySpark App Args
处理器将文件名及其位置作为参数传递给 PySpark 应用程序 属性.然后,应用程序可以使用代码从磁盘上的文件读取数据、处理它并写入 Hive。
- 属性:
PySpark App Args
- 值:
${result_directory}/${filename}
此外,您可以在步骤 2 (UpdateAttribute) 中配置更多属性,然后在步骤 4 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和 table姓名).
我只是使用 querycassandra 处理器查询 cassandra table 但我不明白的是如何将我的 Json 输出文件作为输入文件传递到 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮我解决这个问题,谢谢。
我的查询 Cassandra 属性:
Pyspark 属性:
考虑如下使用 4 个处理器的流程:
QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark
步骤 1:QueryCassandra
处理器:在 Cassandra 上执行 CQL 并将结果输出到流文件中。
步骤 2:UpdateAttribute
处理器:为 属性 filename
分配一个包含磁盘上临时文件名称的值,该文件将包含查询结果。使用 NiFi expression language 生成文件名,以便每个 运行 都不同。创建一个 属性 result_directory
并为 NiFi 具有写入权限的磁盘上的文件夹分配一个值。
- 属性:
filename
值:
cassandra_result_${now():toNumber()}
属性:
result_directory
- 值:
/tmp
步骤 3:PutFile
处理器:使用步骤 2 中填充的值 ${result_directory}
配置 Directory
属性。
步骤 4:ExecutePySpark
处理器:通过 PySpark App Args
处理器将文件名及其位置作为参数传递给 PySpark 应用程序 属性.然后,应用程序可以使用代码从磁盘上的文件读取数据、处理它并写入 Hive。
- 属性:
PySpark App Args
- 值:
${result_directory}/${filename}
此外,您可以在步骤 2 (UpdateAttribute) 中配置更多属性,然后在步骤 4 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和 table姓名).