如何使用 Python 在 Spark 中执行两个 RDD 表的基本连接?
How do you perform basic joins of two RDD tables in Spark using Python?
您将如何使用 python 在 Spark 中执行基本连接?在 R 中,您可以使用 merg() 来执行此操作。在 spark 上使用 python 的语法是什么:
- 内部联接
- 左外连接
- 交叉连接
有两个表 (RDD),每个表中只有一个列,每个列都有一个公共键。
RDD(1):(key,U)
RDD(2):(key,V)
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
是吗?我在互联网上搜索过,找不到一个很好的连接示例。提前致谢。
可以使用 PairRDDFunctions
或 Spark 数据帧来完成。由于数据框操作受益于 Catalyst Optimizer,因此第二种选择值得考虑。
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
使用 PairRDD:
内部联接:
rdd1.join(rdd2)
左外连接:
rdd1.leftOuterJoin(rdd2)
笛卡尔积(不需要RDD[(T, U)]
):
rdd1.cartesian(rdd2)
广播加入(不需要RDD[(T, U)]
):
- 见Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
最后是 cogroup
,它没有直接的 SQL 等价物,但在某些情况下很有用:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
使用 Spark 数据帧
您可以使用 SQL DSL 或使用 sqlContext.sql
.
执行原始 SQL
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
内部联接:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
左外连接:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
交叉连接(在 Spark.2.0 中需要显式交叉连接或配置更改 - ):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
从 1.6(Scala 中的 1.5)开始,这些中的每一个都可以与 broadcast
函数组合:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
执行广播加入。另见
您将如何使用 python 在 Spark 中执行基本连接?在 R 中,您可以使用 merg() 来执行此操作。在 spark 上使用 python 的语法是什么:
- 内部联接
- 左外连接
- 交叉连接
有两个表 (RDD),每个表中只有一个列,每个列都有一个公共键。
RDD(1):(key,U)
RDD(2):(key,V)
我认为内部联接是这样的:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
是吗?我在互联网上搜索过,找不到一个很好的连接示例。提前致谢。
可以使用 PairRDDFunctions
或 Spark 数据帧来完成。由于数据框操作受益于 Catalyst Optimizer,因此第二种选择值得考虑。
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
使用 PairRDD:
内部联接:
rdd1.join(rdd2)
左外连接:
rdd1.leftOuterJoin(rdd2)
笛卡尔积(不需要RDD[(T, U)]
):
rdd1.cartesian(rdd2)
广播加入(不需要RDD[(T, U)]
):
- 见Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
最后是 cogroup
,它没有直接的 SQL 等价物,但在某些情况下很有用:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
使用 Spark 数据帧
您可以使用 SQL DSL 或使用 sqlContext.sql
.
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
内部联接:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
左外连接:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
交叉连接(在 Spark.2.0 中需要显式交叉连接或配置更改 -
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
从 1.6(Scala 中的 1.5)开始,这些中的每一个都可以与 broadcast
函数组合:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
执行广播加入。另见