如何使用 QueryCassandra 和 ExecutePySpark Nifi 处理器将我的 cassandra 数据传输到 pyspark?

How do I transfer my cassandra data to pyspark using QueryCassandra and ExecutePySpark Nifi Processors?

我只是使用 querycassandra 处理器查询 cassandra table 但我不明白的是如何将我的 Json 输出文件作为输入文件传递到 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮我解决这个问题,谢谢。

我的查询 Cassandra 属性:

Pyspark 属性:

考虑如下使用 4 个处理器的流程:

QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark

步骤 1QueryCassandra 处理器:在 Cassandra 上执行 CQL 并将结果输出到流文件中。

步骤 2UpdateAttribute 处理器:为 属性 filename 分配一个包含磁盘上临时文件名称的值,该文件将包含查询结果。使用 NiFi expression language 生成文件名,以便每个 运行 都不同。创建一个 属性 result_directory 并为 NiFi 具有写入权限的磁盘上的文件夹分配一个值。

  • 属性: filename
  • 值:cassandra_result_${now():toNumber()}

  • 属性: result_directory

  • 值:/tmp

步骤 3PutFile 处理器:使用步骤 2 中填充的值 ${result_directory} 配置 Directory 属性。

步骤 4ExecutePySpark 处理器:通过 PySpark App Args 处理器将文件名及其位置作为参数传递给 PySpark 应用程序 属性.然后,应用程序可以使用代码从磁盘上的文件读取数据、处理它并写入 Hive。

  • 属性: PySpark App Args
  • 值:${result_directory}/${filename}

此外,您可以在步骤 2 (UpdateAttribute) 中配置更多属性,然后在步骤 4 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和 table姓名).