运行 PySpark 中的自定义 Apache Phoenix SQL 查询
Running custom Apache Phoenix SQL query in PySpark
有人可以提供一个使用 pyspark 的示例,说明如何 运行 自定义 Apache Phoenix SQL 查询并将该查询的结果存储在 RDD 或 DF 中。注意:我正在寻找自定义查询,而不是要将整个 table 读入 RDD。
来自 Phoenix 文档,要加载整个 table 我可以使用这个:
table = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("table", "<TABLENAME>") \
.option("zkUrl", "<hostname>:<port>") \
.load()
我想知道使用自定义的相应等价物是什么SQL
sqlResult = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("sql", "select * from <TABLENAME> where <CONDITION>") \
.option("zkUrl", "<HOSTNAME>:<PORT>") \
.load()
谢谢。
此处您需要使用 .sql 来处理自定义查询。这是语法
dataframe = sqlContext.sql("select * from <table> where <condition>")
dataframe.show()
这可以使用 Phoenix 作为 JDBC 数据源来完成,如下所示:
sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE'
df = sqlContext.read.format('jdbc')\
.options(driver="org.apache.phoenix.jdbc.PhoenixDriver", url='jdbc:phoenix:<HOSTNAME>:<PORT>', dbtable=sql).load()
df.show()
但是需要注意的是,如果SQL语句中有列别名,那么.show()语句会抛出异常(如果你使用.select( ) 到 select 没有别名的列),这可能是 Phoenix 中的错误。
对于 Spark2,我对 .show() 函数没有问题,我也没有使用 .select() 函数来打印来自 Phoenix 的 DataFrame 的所有值。
所以,确保你的 sql 查询已经在括号内,看我的例子:
val sql = " (SELECT P.PERSON_ID as PERSON_ID, P.LAST_NAME as LAST_NAME, C.STATUS as STATUS FROM PERSON P INNER JOIN CLIENT C ON C.CLIENT_ID = P.PERSON_ID) "
val dft = dfPerson.sparkSession.read.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", "jdbc:phoenix:<HOSTNAME>:<PORT>")
.option("useUnicode", "true")
.option("continueBatchOnError", "true")
.option("dbtable", sql)
.load()
dft.show();
它告诉我:
+---------+--------------------+------+
|PERSON_ID| LAST_NAME|STATUS|
+---------+--------------------+------+
| 1005| PerDiem|Active|
| 1008|NAMEEEEEEEEEEEEEE...|Active|
| 1009| Admission|Active|
| 1010| Facility|Active|
| 1011| MeUP|Active|
+---------+--------------------+------+
有人可以提供一个使用 pyspark 的示例,说明如何 运行 自定义 Apache Phoenix SQL 查询并将该查询的结果存储在 RDD 或 DF 中。注意:我正在寻找自定义查询,而不是要将整个 table 读入 RDD。
来自 Phoenix 文档,要加载整个 table 我可以使用这个:
table = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("table", "<TABLENAME>") \
.option("zkUrl", "<hostname>:<port>") \
.load()
我想知道使用自定义的相应等价物是什么SQL
sqlResult = sqlContext.read \
.format("org.apache.phoenix.spark") \
.option("sql", "select * from <TABLENAME> where <CONDITION>") \
.option("zkUrl", "<HOSTNAME>:<PORT>") \
.load()
谢谢。
此处您需要使用 .sql 来处理自定义查询。这是语法
dataframe = sqlContext.sql("select * from <table> where <condition>")
dataframe.show()
这可以使用 Phoenix 作为 JDBC 数据源来完成,如下所示:
sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE'
df = sqlContext.read.format('jdbc')\
.options(driver="org.apache.phoenix.jdbc.PhoenixDriver", url='jdbc:phoenix:<HOSTNAME>:<PORT>', dbtable=sql).load()
df.show()
但是需要注意的是,如果SQL语句中有列别名,那么.show()语句会抛出异常(如果你使用.select( ) 到 select 没有别名的列),这可能是 Phoenix 中的错误。
对于 Spark2,我对 .show() 函数没有问题,我也没有使用 .select() 函数来打印来自 Phoenix 的 DataFrame 的所有值。 所以,确保你的 sql 查询已经在括号内,看我的例子:
val sql = " (SELECT P.PERSON_ID as PERSON_ID, P.LAST_NAME as LAST_NAME, C.STATUS as STATUS FROM PERSON P INNER JOIN CLIENT C ON C.CLIENT_ID = P.PERSON_ID) "
val dft = dfPerson.sparkSession.read.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", "jdbc:phoenix:<HOSTNAME>:<PORT>")
.option("useUnicode", "true")
.option("continueBatchOnError", "true")
.option("dbtable", sql)
.load()
dft.show();
它告诉我:
+---------+--------------------+------+
|PERSON_ID| LAST_NAME|STATUS|
+---------+--------------------+------+
| 1005| PerDiem|Active|
| 1008|NAMEEEEEEEEEEEEEE...|Active|
| 1009| Admission|Active|
| 1010| Facility|Active|
| 1011| MeUP|Active|
+---------+--------------------+------+