vertica sql 的 pyspark 逻辑合取

pyspark logical conjunction for vertica sql

spark1.6,从我的 Vertica 数据库中检索数据以对其进行处理以下查询在 vertica db 上运行良好但在 pyspark 上不起作用,Spark DataFrames 支持谓词下推 JDBC 来源,但术语 predicate 的使用具有严格的 SQL 含义。这意味着它只涵盖 WHERE 子句。此外,它看起来仅限于逻辑连词(恐怕没有 IN 和 OR)和简单的谓词,它显示了这个错误:java.lang.RuntimeException: 未指定选项 'dbtable'

conf = (SparkConf()
.setAppName("hivereader")
.setMaster("yarn-client")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.io.compression.codec", "snappy")
.set("spark.rdd.compress", "true")
.set("spark.executor.cores" , 7)
.set("spark.sql.inMemoryStorage.compressed", "true")
.set("spark.sql.shuffle.partitions" , 2000)
.set("spark.sql.tungsten.enabled" , 'true')
.set("spark.port.maxRetries" , 200))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

url = "*******"
properties = {"user": "*****", "password": "*******", "driver": "com.vertica.jdbc.Driver" }

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

问题是——即使你说这个查询在 Vertica 上有效——你的查询不是用 Vertica 可以识别的 SQL 语法编写的。您的查询应重写为:

SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION
FROM traffic.stats
WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'

修复所有这些错误后,您的 sqlContext.read 方法应如下所示。

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

或者您可以将 table 作为子查询的别名并使用 dbtable 而不是 query

df = sqlContext.read.format("JDBC").options(
    url = url,
    dbtable = "(SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29') temp",
    **properties
).load()

df.show()