Spark Hive:通过另一个 DataFrame 列的值过滤一个 DataFrame 的行
Spark Hive: Filter rows of one DataFrame by the values of another DataFrame's column
我有以下两个DataFrames
:
DataFrame "dfPromotion":
date | store
===================
2017-01-01 | 1
2017-01-02 | 1
DataFrame "dfOther":
date | store
===================
2017-01-01 | 1
2017-01-03 | 1
稍后我需要 union
以上两个 DataFrames
。但是在我必须删除具有 date
值的 dfOther
的所有行之前,该值也包含在 dfPromotion
.
中
以下 filtering
步骤的结果应如下所示:
DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date | store
===================
2017-01-01 | 1
2017-01-02 | 1
DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date | store
===================
2017-01-03 | 1
在 Java 中有没有办法做到这一点?在此之前我只找到了 DataFrame.except
方法,但这会检查 DataFrame 的所有列。我需要 仅通过 date
列 过滤第二个 DataFrame,因为稍后可以添加其他列,这些列可能包含不同的值...
调用 dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date")))
抛出以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);
可以使用减法功能,
dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()
既然你提到了 Spark Hive,你能试试下面的 spark sql 方法吗?
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val dfpromotion = sqlContext.sql("select * from dfpromotion");
dfpromotion.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-02| 1|
+----------+-----+
val dfother = sqlContext.sql("select * from dfother");
dfother.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-03| 1|
+----------+-----+
val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right outer join dfother o on p.dt = o.dt where p.dt is null");
val dfunion = dfpromotion.union(dfdiff);
scala> dfunion.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-02| 1|
|2017-01-03| 1|
我有以下两个DataFrames
:
DataFrame "dfPromotion":
date | store
===================
2017-01-01 | 1
2017-01-02 | 1
DataFrame "dfOther":
date | store
===================
2017-01-01 | 1
2017-01-03 | 1
稍后我需要 union
以上两个 DataFrames
。但是在我必须删除具有 date
值的 dfOther
的所有行之前,该值也包含在 dfPromotion
.
以下 filtering
步骤的结果应如下所示:
DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date | store
===================
2017-01-01 | 1
2017-01-02 | 1
DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date | store
===================
2017-01-03 | 1
在 Java 中有没有办法做到这一点?在此之前我只找到了 DataFrame.except
方法,但这会检查 DataFrame 的所有列。我需要 仅通过 date
列 过滤第二个 DataFrame,因为稍后可以添加其他列,这些列可能包含不同的值...
调用 dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date")))
抛出以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);
可以使用减法功能,
dfOther.select("date").except(dfPromotion.select("date")).join(dfOther,'date').show()
既然你提到了 Spark Hive,你能试试下面的 spark sql 方法吗?
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
val dfpromotion = sqlContext.sql("select * from dfpromotion");
dfpromotion.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-02| 1|
+----------+-----+
val dfother = sqlContext.sql("select * from dfother");
dfother.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-03| 1|
+----------+-----+
val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right outer join dfother o on p.dt = o.dt where p.dt is null");
val dfunion = dfpromotion.union(dfdiff);
scala> dfunion.show
+----------+-----+
| dt|store|
+----------+-----+
|2017-01-01| 1|
|2017-01-02| 1|
|2017-01-03| 1|