运行 PySpark 中的自定义 Apache Phoenix SQL 查询

Running custom Apache Phoenix SQL query in PySpark

有人可以提供一个使用 pyspark 的示例,说明如何 运行 自定义 Apache Phoenix SQL 查询并将该查询的结果存储在 RDD 或 DF 中。注意:我正在寻找自定义查询,而不是要将整个 table 读入 RDD。

来自 Phoenix 文档,要加载整个 table 我可以使用这个:

table = sqlContext.read \
        .format("org.apache.phoenix.spark") \
        .option("table", "<TABLENAME>") \
        .option("zkUrl", "<hostname>:<port>") \
        .load() 

我想知道使用自定义的相应等价物是什么SQL

sqlResult =  sqlContext.read \
             .format("org.apache.phoenix.spark") \
             .option("sql", "select * from <TABLENAME> where <CONDITION>") \
             .option("zkUrl", "<HOSTNAME>:<PORT>") \
             .load()

谢谢。

此处您需要使用 .sql 来处理自定义查询。这是语法

dataframe = sqlContext.sql("select * from <table> where <condition>")
dataframe.show()

这可以使用 Phoenix 作为 JDBC 数据源来完成,如下所示:

sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE'

df = sqlContext.read.format('jdbc')\
       .options(driver="org.apache.phoenix.jdbc.PhoenixDriver", url='jdbc:phoenix:<HOSTNAME>:<PORT>', dbtable=sql).load()

df.show() 

但是需要注意的是,如果SQL语句中有列别名,那么.show()语句会抛出异常(如果你使用.select( ) 到 select 没有别名的列),这可能是 Phoenix 中的错误。

对于 Spark2,我对 .show() 函数没有问题,我也没有使用 .select() 函数来打印来自 Phoenix 的 DataFrame 的所有值。 所以,确保你的 sql 查询已经在括号内,看我的例子:

 val sql = " (SELECT  P.PERSON_ID as PERSON_ID, P.LAST_NAME as LAST_NAME, C.STATUS as STATUS FROM PERSON P INNER JOIN CLIENT C ON C.CLIENT_ID = P.PERSON_ID) "
          val dft = dfPerson.sparkSession.read.format("jdbc")
            .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
            .option("url", "jdbc:phoenix:<HOSTNAME>:<PORT>")
            .option("useUnicode", "true")
            .option("continueBatchOnError", "true")
            .option("dbtable", sql)
            .load()
dft.show();

它告诉我:

+---------+--------------------+------+
|PERSON_ID|           LAST_NAME|STATUS|
+---------+--------------------+------+
|     1005|             PerDiem|Active|
|     1008|NAMEEEEEEEEEEEEEE...|Active|
|     1009|           Admission|Active|
|     1010|            Facility|Active|
|     1011|                MeUP|Active|
+---------+--------------------+------+