在 Spark SQL 中反加入后联合

Anti join followed by union in Spark SQL

我正在 运行ning PySpark 脚本,我在其中执行 2 个数据帧的反连接和联合。但我想在 Spark SQL.

中完成

df_src:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    110|     NM|
|    115|     AB|
+-------+-------+

df_lkp:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    106|     XZ|
+-------+-------+

我们有两个数据框:df_src & df_lkp。我正在从 df_src:

中提取不匹配的记录
df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')

给出的结果是:

df_unmatched

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    115|     AB|
+-------+-------+

但我想使用 Spark SQL 来完成这部分。我创建了临时视图 vw_df_src & vw_df_lkp 并尝试 运行 以下查询,但没有得到结果。

unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)

我也在对数据帧进行合并并删除重复项。我正在使用以下代码:

df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])

df_src2:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    100|     QC|
|    115|     AB|
|    106|     XZ|
|    105|     XY|
+-------+-------+

我也希望在 Spark SQL 中完成此操作。

我正在使用以下代码创建临时视图:

df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '[=18=]1').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '[=18=]1').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')

预设:

df_src = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (110, 'NM'),
     (115, 'AB')],
    ['call_id', 'call_nm']
)
df_lkp = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (105, 'XY'),
     (106, 'XZ')],
    ['call_id', 'call_nm']
)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp.createOrReplaceTempView('vw_df_lkp')

根据你的要求,(anti join + union)可以这样做:

spark.sql(
    """
    select *
    from vw_df_src as a
    anti join vw_df_lkp b on a.call_nm=b.call_nm
    union (select * from vw_df_lkp)
    """
).show()
# +-------+-------+
# |call_id|call_nm|
# +-------+-------+
# |    110|     NM|
# |    115|     AB|
# |    100|     QC|
# |    105|     XY|
# |    106|     XZ|
# +-------+-------+

不过好像不需要anti join:

spark.sql(
    """
    select * from vw_df_src
    union
    select * from vw_df_lkp
    """
).show()
# +-------+-------+
# |call_id|call_nm|
# +-------+-------+
# |    100|     QC|
# |    105|     XY|
# |    115|     AB|
# |    110|     NM|
# |    106|     XZ|
# +-------+-------+

反连接

spark.sql(
"""select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm """).show()


+-------+-------+
|call_id|call_nm|
+-------+-------+
|    115|     AB|
|    110|     NM|
+-------+-------+

如果笔记本单元格中的 运行 未首字母缩写为 sql TRY

%sql 
select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm 

联合

pyspark 中,联合 returns 重复,您必须 drop_duplicates() 或使用 distinct()。在sql中,联合消除了重复。因此,将执行以下操作。 Spark 2.0.0 unionall() 重新调整了重复项,union 就是问题

spark.sql(
"""select * from vw_df_src

union

select * from vw_df_lkp""" ).show()