spark 谓词下推是否适用于 JDBC?
Does spark predicate pushdown work with JDBC?
根据this
Catalyst applies logical optimizations such as predicate pushdown. The
optimizer can push filter predicates down into the data source,
enabling the physical execution to skip irrelevant data.
Spark 支持将谓词下推到数据源。
JDBC是否也可以/预期使用此功能?
(通过检查数据库日志我可以看出这不是现在的默认行为 - 完整的查询被传递到数据库,即使它后来受到火花过滤器的限制)
更多详情
运行 Spark 1.5 与 PostgreSQL 9.4
代码片段:
from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION
sc = SparkContext()
sqlContext = SQLContext(sc)
url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"
df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()
SQL 跟踪:
< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0
< 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
AND a.attnum = vals.attnum)
< 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy
我希望最后一个 select 将包含一个 limit 1
子句 - 但它没有
Spark DataFrames 支持 JDBC 源的谓词下推,但术语 predicate 用于严格的 SQL 含义。这意味着它仅涵盖 WHERE
子句。此外,它看起来仅限于逻辑连词(恐怕没有 IN
和 OR
)和简单的谓词。
其他一切,如限制、计数、排序、组和条件,都在 Spark 端处理。 SO 中已经涵盖的一个警告是 df.count()
或 sqlContext.sql("SELECT COUNT(*) FROM df")
被转换为 SELECT 1 FROM df
并且需要使用 Spark 进行大量数据传输和处理。
这是否意味着它已经失败了?不完全是。可以使用任意子查询作为 table
参数。它不如谓词下推方便,但在其他方面效果很好:
n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)
注:
一旦数据源 API v2 准备就绪,此行为将来可能会得到改进:
根据this
Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.
Spark 支持将谓词下推到数据源。 JDBC是否也可以/预期使用此功能?
(通过检查数据库日志我可以看出这不是现在的默认行为 - 完整的查询被传递到数据库,即使它后来受到火花过滤器的限制)
更多详情
运行 Spark 1.5 与 PostgreSQL 9.4
代码片段:
from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION
sc = SparkContext()
sqlContext = SQLContext(sc)
url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"
df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()
SQL 跟踪:
< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0
< 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
AND a.attnum = vals.attnum)
< 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy
我希望最后一个 select 将包含一个 limit 1
子句 - 但它没有
Spark DataFrames 支持 JDBC 源的谓词下推,但术语 predicate 用于严格的 SQL 含义。这意味着它仅涵盖 WHERE
子句。此外,它看起来仅限于逻辑连词(恐怕没有 IN
和 OR
)和简单的谓词。
其他一切,如限制、计数、排序、组和条件,都在 Spark 端处理。 SO 中已经涵盖的一个警告是 df.count()
或 sqlContext.sql("SELECT COUNT(*) FROM df")
被转换为 SELECT 1 FROM df
并且需要使用 Spark 进行大量数据传输和处理。
这是否意味着它已经失败了?不完全是。可以使用任意子查询作为 table
参数。它不如谓词下推方便,但在其他方面效果很好:
n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)
注:
一旦数据源 API v2 准备就绪,此行为将来可能会得到改进: