来自两个表的 Spark 结果
Spark Results from two tables
我有两个 files/tables 如下
File1:
101,10,20
102,30,40
103,50,60
和
File2:
101,10,20
104,70,80
103,50,55
比较两个文件后,我需要创建新文件:
File3:
102,30,40,D
104,70,80,I
103,50,55,U
其中 D
是 "Deleted",I
是 "Inserted",U
是 "Updated"。
我尝试过使用 RDD subtract
和 SparkSQL,但是在 Spark 中 1.x 对子查询有限制。
一个可能的想法是使用 keyBy 函数按您想要的键对两个 RDD 进行分组,然后应用不同的操作来计算 D、I, U.
- D : 使用 subtractByKey 函数计算 file1 中而不是 file2[=34 中的元素=];
- I :相同的 subtractByKey 函数计算 file2 中的元素而不是 file1;
- U : 使用 join 函数计算 file1 和 [=18= 之间的公共元素(关键!) ]file2。
请记住,join 函数将与 commun 中的关键元素一起运行。因此,在您的示例中 (101,10,20) 也将作为连接的结果。您必须过滤此结果以仅获取更改的键。
我想您正在寻找类似下面的内容。这里我有两个数据帧 df1 和 df2。 df1 是具有键列 a1 的主数据集,它正在与具有键列 b1 的辅助数据集 df2 进行比较。因此,如果键的字段 a2、a3 和 b2、b3 相同,则这些记录将被忽略。
- 如果键存在于 df1 中而不存在于 df2 中,则记录被标记为 D。
- 如果键存在于 df2 而不是 df1,则记录被标记为 I。
- 如果键在 df1 和 df2 中都存在但值字段不同,则记录被标记为 U。
下面是代码片段。
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
input1 = [[101,10,20], [102,30,40], [103,50,60]]
input2 = [[101,10,20], [104,70,80], [103,50,55]]
df1 = sc.parallelize(input1).toDF(schema= StructType([StructField("a1", IntegerType(), False),StructField("a2", IntegerType(), False),StructField("a3", IntegerType(), False)]))
df2 = sc.parallelize(input2).toDF(schema=StructType([StructField("b1", IntegerType(), False),StructField("b2", IntegerType(), False),StructField("b3", IntegerType(), False)]))
joindf = df1.join(df2, [df1.a1 == df2.b1], 'outer').filter(((df1.a2 != df2.b2) | (df1.a3 != df2.b3)) | df1.a1.isNull() | df2.b1.isNull())
def check_row(a1, b1):
if not a1:
return 'D'
elif not b1:
return 'I'
else:
return 'U'
flagger = udf(check_row)
joindf.withColumn("flag", flagger(joindf.a1, joindf.b1)).select(F.when(joindf.a1.isNull(), joindf.b1).otherwise(joindf.a1).alias('a1'),F.when(joindf.a2.isNull(), joindf.b2).otherwise(joindf.a2).alias('a2'),F.when(joindf.a3.isNull(), joindf.b3).otherwise(joindf.a3).alias('a3'),'flag').show()
+---+---+---+----+
| a1| a2| a3|flag|
+---+---+---+----+
|103| 50| 60| U|
|102| 30| 40| I|
|104| 70| 80| D|
+---+---+---+----+
或者,如果您更喜欢 spark-sql,请使用以下代码段。
sqlContext.registerDataFrameAsTable(df1, 'df1')
sqlContext.registerDataFrameAsTable(df2, 'df2')
sqlContext.sql("""
SELECT
CASE WHEN a1 IS NULL THEN b1 ELSE a1 END as c1,
CASE WHEN a2 IS NULL THEN b2 ELSE a1 END as c2,
CASE WHEN a3 IS NULL THEN b3 ELSE a1 END as c3,
CASE
WHEN a1 IS NULL THEN 'I'
WHEN b1 is NULL THEN 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()
+---+---+---+----+
| c1| c2| c3|flag|
+---+---+---+----+
|103|103|103| U|
|102|102|102| D|
|104| 70| 80| I|
+---+---+---+----+
我认为我们可能需要稍微更改 spark sql 的代码以包含更新条件。
sqlContext.sql("""
SELECT
CASE when a1 IS NULL then b1 ELSE a1 END as c1,
CASE when a2 IS NULL then b2
when a1 = b1 then b2
else a2 END as c2,
CASE when a3 IS NULL then b3
when a1 = b1 then b3
else a3 END as c3,
CASE
when a1 IS NULL then 'I'
when b1 is NULL then 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()
我有两个 files/tables 如下
File1:
101,10,20
102,30,40
103,50,60
和
File2:
101,10,20
104,70,80
103,50,55
比较两个文件后,我需要创建新文件:
File3:
102,30,40,D
104,70,80,I
103,50,55,U
其中 D
是 "Deleted",I
是 "Inserted",U
是 "Updated"。
我尝试过使用 RDD subtract
和 SparkSQL,但是在 Spark 中 1.x 对子查询有限制。
一个可能的想法是使用 keyBy 函数按您想要的键对两个 RDD 进行分组,然后应用不同的操作来计算 D、I, U.
- D : 使用 subtractByKey 函数计算 file1 中而不是 file2[=34 中的元素=];
- I :相同的 subtractByKey 函数计算 file2 中的元素而不是 file1;
- U : 使用 join 函数计算 file1 和 [=18= 之间的公共元素(关键!) ]file2。
请记住,join 函数将与 commun 中的关键元素一起运行。因此,在您的示例中 (101,10,20) 也将作为连接的结果。您必须过滤此结果以仅获取更改的键。
我想您正在寻找类似下面的内容。这里我有两个数据帧 df1 和 df2。 df1 是具有键列 a1 的主数据集,它正在与具有键列 b1 的辅助数据集 df2 进行比较。因此,如果键的字段 a2、a3 和 b2、b3 相同,则这些记录将被忽略。
- 如果键存在于 df1 中而不存在于 df2 中,则记录被标记为 D。
- 如果键存在于 df2 而不是 df1,则记录被标记为 I。
- 如果键在 df1 和 df2 中都存在但值字段不同,则记录被标记为 U。
下面是代码片段。
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
input1 = [[101,10,20], [102,30,40], [103,50,60]]
input2 = [[101,10,20], [104,70,80], [103,50,55]]
df1 = sc.parallelize(input1).toDF(schema= StructType([StructField("a1", IntegerType(), False),StructField("a2", IntegerType(), False),StructField("a3", IntegerType(), False)]))
df2 = sc.parallelize(input2).toDF(schema=StructType([StructField("b1", IntegerType(), False),StructField("b2", IntegerType(), False),StructField("b3", IntegerType(), False)]))
joindf = df1.join(df2, [df1.a1 == df2.b1], 'outer').filter(((df1.a2 != df2.b2) | (df1.a3 != df2.b3)) | df1.a1.isNull() | df2.b1.isNull())
def check_row(a1, b1):
if not a1:
return 'D'
elif not b1:
return 'I'
else:
return 'U'
flagger = udf(check_row)
joindf.withColumn("flag", flagger(joindf.a1, joindf.b1)).select(F.when(joindf.a1.isNull(), joindf.b1).otherwise(joindf.a1).alias('a1'),F.when(joindf.a2.isNull(), joindf.b2).otherwise(joindf.a2).alias('a2'),F.when(joindf.a3.isNull(), joindf.b3).otherwise(joindf.a3).alias('a3'),'flag').show()
+---+---+---+----+
| a1| a2| a3|flag|
+---+---+---+----+
|103| 50| 60| U|
|102| 30| 40| I|
|104| 70| 80| D|
+---+---+---+----+
或者,如果您更喜欢 spark-sql,请使用以下代码段。
sqlContext.registerDataFrameAsTable(df1, 'df1')
sqlContext.registerDataFrameAsTable(df2, 'df2')
sqlContext.sql("""
SELECT
CASE WHEN a1 IS NULL THEN b1 ELSE a1 END as c1,
CASE WHEN a2 IS NULL THEN b2 ELSE a1 END as c2,
CASE WHEN a3 IS NULL THEN b3 ELSE a1 END as c3,
CASE
WHEN a1 IS NULL THEN 'I'
WHEN b1 is NULL THEN 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()
+---+---+---+----+
| c1| c2| c3|flag|
+---+---+---+----+
|103|103|103| U|
|102|102|102| D|
|104| 70| 80| I|
+---+---+---+----+
我认为我们可能需要稍微更改 spark sql 的代码以包含更新条件。
sqlContext.sql("""
SELECT
CASE when a1 IS NULL then b1 ELSE a1 END as c1,
CASE when a2 IS NULL then b2
when a1 = b1 then b2
else a2 END as c2,
CASE when a3 IS NULL then b3
when a1 = b1 then b3
else a3 END as c3,
CASE
when a1 IS NULL then 'I'
when b1 is NULL then 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()