使用谓词下推连接两个数据集
Joining Two Datasets with Predicate Pushdown
我有一个从 RDD 创建的 Dataset 并尝试将它与另一个 Dataset 这是从我的 Phoenix Table:
创建的
val dfToJoin = sparkSession.createDataset(rddToJoin)
val tableDf = sparkSession
.read
.option("table", "table")
.option("zkURL", "localhost")
.format("org.apache.phoenix.spark")
.load()
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
当我执行它时,似乎加载了整个数据库 table 以进行连接。
有没有办法进行这样的连接,以便在数据库而不是在 spark 中完成过滤?
另外:dfToJoin 比 table 小,我不知道这是否重要。
编辑:基本上我想加入我的 Phoenix table 和一个通过 spark 创建的数据集,而不是将整个 table 提取到执行程序中。
Edit2:这是实际计划:
*Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23,
WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
+- *SortMergeJoin [FEATURE#21], [feature#4], Inner
:- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(FEATURE#21, 200)
: +- *Filter isnotnull(FEATURE#21)
: +- *Scan PhoenixRelation(FEATURES,localhost,false)
[FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24]
PushedFilters: [IsNotNull(FEATURE)], ReadSchema:
struct<FEATURE:int,SEQUENCE_IDENTIFIER:string,TAX_NUMBER:int,
WINDOW_NUMBER:int>
+- *Sort [feature#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(feature#4, 200)
+- *Filter isnotnull(feature#4)
+- *SerializeFromObject [assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).feature AS feature#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).uniqueIdentifier, true) AS uniqueIdentifier#5, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).readLength AS readLength#6]
+- Scan ExternalRDDScan[obj#3]
如您所见,equals-filter 不包含在 pushed-filters 列表中,因此很明显没有谓词下推发生。
Spark will fetch the Phoenix table records to appropriate executors(not the entire table to one executor)
由于 Phoenix table df 上没有直接 filter
,我们在实际计划中只看到 *Filter isnotnull(FEATURE#21)
。
正如您提到的 Phoenix table 当您对其应用过滤器时,数据较少。您通过在其他数据集中查找 feature_ids
,将过滤器推送到 feature
列上的 phoenix table。
//This spread across workers - fully distributed
val dfToJoin = sparkSession.createDataset(rddToJoin)
//This sits in driver - not distributed
val list_of_feature_ids = dfToJoin.dropDuplicates("feature")
.select("feature")
.map(r => r.getString(0))
.collect
.toList
//This spread across workers - fully distributed
val tableDf = sparkSession
.read
.option("table", "table")
.option("zkURL", "localhost")
.format("org.apache.phoenix.spark")
.load()
.filter($"FEATURE".isin(list_of_feature_ids:_*)) //added filter
//This spread across workers - fully distributed
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
joinedDf.explain()
我有一个从 RDD 创建的 Dataset 并尝试将它与另一个 Dataset 这是从我的 Phoenix Table:
创建的val dfToJoin = sparkSession.createDataset(rddToJoin)
val tableDf = sparkSession
.read
.option("table", "table")
.option("zkURL", "localhost")
.format("org.apache.phoenix.spark")
.load()
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
当我执行它时,似乎加载了整个数据库 table 以进行连接。
有没有办法进行这样的连接,以便在数据库而不是在 spark 中完成过滤?
另外:dfToJoin 比 table 小,我不知道这是否重要。
编辑:基本上我想加入我的 Phoenix table 和一个通过 spark 创建的数据集,而不是将整个 table 提取到执行程序中。
Edit2:这是实际计划:
*Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23,
WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
+- *SortMergeJoin [FEATURE#21], [feature#4], Inner
:- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(FEATURE#21, 200)
: +- *Filter isnotnull(FEATURE#21)
: +- *Scan PhoenixRelation(FEATURES,localhost,false)
[FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24]
PushedFilters: [IsNotNull(FEATURE)], ReadSchema:
struct<FEATURE:int,SEQUENCE_IDENTIFIER:string,TAX_NUMBER:int,
WINDOW_NUMBER:int>
+- *Sort [feature#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(feature#4, 200)
+- *Filter isnotnull(feature#4)
+- *SerializeFromObject [assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).feature AS feature#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).uniqueIdentifier, true) AS uniqueIdentifier#5, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).readLength AS readLength#6]
+- Scan ExternalRDDScan[obj#3]
如您所见,equals-filter 不包含在 pushed-filters 列表中,因此很明显没有谓词下推发生。
Spark will fetch the Phoenix table records to appropriate executors(not the entire table to one executor)
由于 Phoenix table df 上没有直接 filter
,我们在实际计划中只看到 *Filter isnotnull(FEATURE#21)
。
正如您提到的 Phoenix table 当您对其应用过滤器时,数据较少。您通过在其他数据集中查找 feature_ids
,将过滤器推送到 feature
列上的 phoenix table。
//This spread across workers - fully distributed
val dfToJoin = sparkSession.createDataset(rddToJoin)
//This sits in driver - not distributed
val list_of_feature_ids = dfToJoin.dropDuplicates("feature")
.select("feature")
.map(r => r.getString(0))
.collect
.toList
//This spread across workers - fully distributed
val tableDf = sparkSession
.read
.option("table", "table")
.option("zkURL", "localhost")
.format("org.apache.phoenix.spark")
.load()
.filter($"FEATURE".isin(list_of_feature_ids:_*)) //added filter
//This spread across workers - fully distributed
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")
joinedDf.explain()