Spark Dataframe 除了方法问题
Spark Dataframe except method Issue
我有一个减去两个数据帧的用例。所以我使用了 dataframe except() 方法。
这在较小的数据集上本地运行良好。
但是当我 运行 通过 AWS S3 存储桶时,except() 方法没有像预期的那样产生负值。分布式环境有什么需要注意的吗?
有人遇到过类似的问题吗?
这是我的示例代码
val values = List(List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "A", "Yes")
, List("Two", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "X", "No")
, List("Three", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "M", "Yes")
, List("Four", "2017-11-01T23:59:59.000", "2017-12-09T23:59:58.000", "A", "No")
, List("Five", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "", "No")
,List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "", "No")
)
.map(row => (row(0), row(1), row(2), row(3), row(4)))
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = values.toDF("KEY", "ROW_START_DATE", "ROW_END_DATE", "CODE", "Indicator")
val filterCond = (col("ROW_START_DATE") <= "2017-10-31T23:59:59.999" && col("ROW_END_DATE") >= "2017-10-31T23:59:59.999" && col("CODE").isin("M", "A", "R", "G"))
val Filtered = df.filter(filterCond)
val Excluded = df.except(df.filter(filterCond))
预期输出:
df.show(false)
Filtered.show(false)
Excluded.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
|Four |2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Five |2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+-----+-----------------------+-----------------------+----+---------+
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
+-----+-----------------------+-----------------------+----+---------+
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+----+-----------------------+-----------------------+----+---------+
但是当 运行 在 S3 存储桶上
时得到类似下面的内容
Filtered.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
+-----+-----------------------+-----------------------+----+---------+
Excluded.show(false)
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |---> wrong
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+----+-----------------------+-----------------------+----+---------+
有没有其他方法可以减去两个 spark 数据帧?
S3 不完全是一个文件系统,it can surface in spark
- 尝试验证写入 s3 的数据是否与使用 file:// dest 时获得的数据相同。因为有东西在途中丢失的风险。
- 然后尝试在写入 s3 和读取之间放置一个 Thread.sleep(10000);这将显示目录不一致是否出现。
- 如果您使用的是 EMR,请尝试使用他们一致的 EMR 选项
- 并尝试使用 s3a:// 连接器
如果它不能与 s3a:// 一起工作,请在 issues.apache.org 上提交一个 SPARK-JIRA,也将 s3a 放在文本中,包括此代码片段(这已将其隐式许可给 ASF)。然后我可以将它复制到测试中并查看我是否可以看到它,如果可以,当我在 Hadoop 3.1+
中打开 s3guard 时它是否消失
可以根据两个数据帧的唯一性对这两个数据帧使用 leftanti join,这将为您提供 except 操作所期望的输出。
val diffdf = df1.join(df2,Seq("uniquekey"),"leftanti")
我有一个减去两个数据帧的用例。所以我使用了 dataframe except() 方法。
这在较小的数据集上本地运行良好。
但是当我 运行 通过 AWS S3 存储桶时,except() 方法没有像预期的那样产生负值。分布式环境有什么需要注意的吗?
有人遇到过类似的问题吗?
这是我的示例代码
val values = List(List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "A", "Yes")
, List("Two", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "X", "No")
, List("Three", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "M", "Yes")
, List("Four", "2017-11-01T23:59:59.000", "2017-12-09T23:59:58.000", "A", "No")
, List("Five", "2017-07-09T23:59:59.000", "2017-12-05T23:59:58.000", "", "No")
,List("One", "2017-07-01T23:59:59.000", "2017-11-04T23:59:58.000", "", "No")
)
.map(row => (row(0), row(1), row(2), row(3), row(4)))
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = values.toDF("KEY", "ROW_START_DATE", "ROW_END_DATE", "CODE", "Indicator")
val filterCond = (col("ROW_START_DATE") <= "2017-10-31T23:59:59.999" && col("ROW_END_DATE") >= "2017-10-31T23:59:59.999" && col("CODE").isin("M", "A", "R", "G"))
val Filtered = df.filter(filterCond)
val Excluded = df.except(df.filter(filterCond))
预期输出:
df.show(false)
Filtered.show(false)
Excluded.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
|Four |2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Five |2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+-----+-----------------------+-----------------------+----+---------+
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
+-----+-----------------------+-----------------------+----+---------+
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+----+-----------------------+-----------------------+----+---------+
但是当 运行 在 S3 存储桶上
时得到类似下面的内容Filtered.show(false)
+-----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+-----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |
|Three|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000|M |Yes |
+-----+-----------------------+-----------------------+----+---------+
Excluded.show(false)
+----+-----------------------+-----------------------+----+---------+
|KEY |ROW_START_DATE |ROW_END_DATE |CODE|Indicator|
+----+-----------------------+-----------------------+----+---------+
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|A |Yes |---> wrong
|Four|2017-11-01T23:59:59.000|2017-12-09T23:59:58.000|A |No |
|Two |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000|X |No |
|Five|2017-07-09T23:59:59.000|2017-12-05T23:59:58.000| |No |
|One |2017-07-01T23:59:59.000|2017-11-04T23:59:58.000| |No |
+----+-----------------------+-----------------------+----+---------+
有没有其他方法可以减去两个 spark 数据帧?
S3 不完全是一个文件系统,it can surface in spark
- 尝试验证写入 s3 的数据是否与使用 file:// dest 时获得的数据相同。因为有东西在途中丢失的风险。
- 然后尝试在写入 s3 和读取之间放置一个 Thread.sleep(10000);这将显示目录不一致是否出现。
- 如果您使用的是 EMR,请尝试使用他们一致的 EMR 选项
- 并尝试使用 s3a:// 连接器
如果它不能与 s3a:// 一起工作,请在 issues.apache.org 上提交一个 SPARK-JIRA,也将 s3a 放在文本中,包括此代码片段(这已将其隐式许可给 ASF)。然后我可以将它复制到测试中并查看我是否可以看到它,如果可以,当我在 Hadoop 3.1+
中打开 s3guard 时它是否消失可以根据两个数据帧的唯一性对这两个数据帧使用 leftanti join,这将为您提供 except 操作所期望的输出。
val diffdf = df1.join(df2,Seq("uniquekey"),"leftanti")