如何在 Scala 和 Apache Spark 中连接两个 DataFrame?
How to join two DataFrames in Scala and Apache Spark?
有两个DataFrame(Scala、Apache Spark 1.6.1)
1) 匹配
MatchID | Player1 | Player2
--------------------------------
1 | John Wayne | John Doe
2 | Ive Fish | San Simon
2) 个人资料
Player | BirthYear
--------------------------------
John Wayne | 1986
Ive Fish | 1990
San Simon | 1974
john Doe | 1995
如何为两个玩家'BirthYear'创建一个新的 DataFrame
MatchID | Player1 | Player2 | BYear_P1 |BYear_P2 | Diff
-------------------------------------------------------------
1 | John Wayne | John Doe | 1986 | 1995 | 9
2 | Ive Fish | San Simon | 1990 | 1974 | 16
?
我试过了
val df = MatchesDF.join(PersonalDF, MatchesDF("Player1") === PersonalDF("Player"))
然后为第二个玩家再次加入
val resDf = df.join(PersonalDF, df("Player2") === PersonalDF("Player"))
但这是非常耗时的操作。
可能是在 Scala 和 Apache Spark 中执行此操作的另一种方法?
这应该表现更好:
case class Match(matchId: Int, player1: String, player2: String)
case class Player(name: String, birthYear: Int)
val matches = Seq(
Match(1, "John Wayne", "John Doe"),
Match(2, "Ive Fish", "San Simon")
)
val players = Seq(
Player("John Wayne", 1986),
Player("Ive Fish", 1990),
Player("San Simon", 1974),
Player("John Doe", 1995)
)
val matchesDf = sqlContext.createDataFrame(matches)
val playersDf = sqlContext.createDataFrame(players)
matchesDf.registerTempTable("matches")
playersDf.registerTempTable("players")
sqlContext.sql(
"select matchId, player1, player2, p1.birthYear, p2.birthYear, abs(p1.birthYear-p2.birthYear) " +
"from matches m inner join players p1 inner join players p2 " +
"where m.player1 = p1.name and m.player2 = p2.name").show()
+-------+----------+---------+---------+---------+---+
|matchId| player1| player2|birthYear|birthYear|_c5|
+-------+----------+---------+---------+---------+---+
| 1|John Wayne| John Doe| 1986| 1995| 9|
| 2| Ive Fish|San Simon| 1990| 1974| 16|
+-------+----------+---------+---------+---------+---+
我没有找到在 Scala DSL 中表达 3 个表连接的方法。
val df = left.join(right, Seq("name"))
display(df)
这是一个使用 spark 的 dataframe 函数的解决方案:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.abs
val matches = sqlContext.sparkContext.parallelize(Row(1, "John Wayne", "John Doe"), Row(2, "Ive Fish", "San Simon")))
val players = sqlContext.sparkContext.parallelize(Seq(
Row("John Wayne", 1986),
Row("Ive Fish", 1990),
Row("San Simon", 1974),
Row("John Doe", 1995)
))
val matchesDf = sqlContext.createDataFrame(matches, StructType(Seq(
StructField("matchId", IntegerType, nullable = false),
StructField("player1", StringType, nullable = false),
StructField("player2", StringType, nullable = false)))
).as('matches)
val playersDf = sqlContext.createDataFrame(players, StructType(Seq(
StructField("player", StringType, nullable = false),
StructField("birthYear", IntegerType, nullable = false)
))).as('players)
matchesDf
.join(playersDf, $"matches.player1" === $"players.player")
.select($"matches.matchId" as "matchId", $"matches.player1" as "player1", $"matches.player2" as "player2", $"players.birthYear" as "player1BirthYear")
.join(playersDf, $"player2" === $"players.player")
.select($"matchId" as "MatchID", $"player1" as "Player1", $"player2" as "Player2", $"player1BirthYear" as "BYear_P1", $"players.birthYear" as "BYear_P2")
.withColumn("Diff", abs('BYear_P2.minus('BYear_P1)))
.show()
+-------+----------+---------+--------+--------+----+
|MatchID| Player1| Player2|BYear_P1|BYear_P2|Diff|
+-------+----------+---------+--------+--------+----+
| 1|John Wayne| John Doe| 1986| 1995| 9|
| 2| Ive Fish|San Simon| 1990| 1974| 16|
+-------+----------+---------+--------+--------+----+
在 Spark 2.0 及更高版本中,Spark 提供了多种语法来连接两个数据帧
join(right: Dataset[_]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
Dataset class 中可用的所有这些 Spark Join 方法和这些方法 return DataFrame(注意 DataFrame = Dataset[Row])
所有这些方法都将第一个参数作为数据集[_],这意味着它也采用数据帧。
为了说明如何加入,我拿emp和dept DataFrame
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
如果您必须在两个数据框中连接相同的列名,您甚至可以忽略连接表达式。
有两个DataFrame(Scala、Apache Spark 1.6.1)
1) 匹配
MatchID | Player1 | Player2
--------------------------------
1 | John Wayne | John Doe
2 | Ive Fish | San Simon
2) 个人资料
Player | BirthYear
--------------------------------
John Wayne | 1986
Ive Fish | 1990
San Simon | 1974
john Doe | 1995
如何为两个玩家'BirthYear'创建一个新的 DataFrame
MatchID | Player1 | Player2 | BYear_P1 |BYear_P2 | Diff
-------------------------------------------------------------
1 | John Wayne | John Doe | 1986 | 1995 | 9
2 | Ive Fish | San Simon | 1990 | 1974 | 16
?
我试过了
val df = MatchesDF.join(PersonalDF, MatchesDF("Player1") === PersonalDF("Player"))
然后为第二个玩家再次加入
val resDf = df.join(PersonalDF, df("Player2") === PersonalDF("Player"))
但这是非常耗时的操作。
可能是在 Scala 和 Apache Spark 中执行此操作的另一种方法?
这应该表现更好:
case class Match(matchId: Int, player1: String, player2: String)
case class Player(name: String, birthYear: Int)
val matches = Seq(
Match(1, "John Wayne", "John Doe"),
Match(2, "Ive Fish", "San Simon")
)
val players = Seq(
Player("John Wayne", 1986),
Player("Ive Fish", 1990),
Player("San Simon", 1974),
Player("John Doe", 1995)
)
val matchesDf = sqlContext.createDataFrame(matches)
val playersDf = sqlContext.createDataFrame(players)
matchesDf.registerTempTable("matches")
playersDf.registerTempTable("players")
sqlContext.sql(
"select matchId, player1, player2, p1.birthYear, p2.birthYear, abs(p1.birthYear-p2.birthYear) " +
"from matches m inner join players p1 inner join players p2 " +
"where m.player1 = p1.name and m.player2 = p2.name").show()
+-------+----------+---------+---------+---------+---+
|matchId| player1| player2|birthYear|birthYear|_c5|
+-------+----------+---------+---------+---------+---+
| 1|John Wayne| John Doe| 1986| 1995| 9|
| 2| Ive Fish|San Simon| 1990| 1974| 16|
+-------+----------+---------+---------+---------+---+
我没有找到在 Scala DSL 中表达 3 个表连接的方法。
val df = left.join(right, Seq("name"))
display(df)
这是一个使用 spark 的 dataframe 函数的解决方案:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.abs
val matches = sqlContext.sparkContext.parallelize(Row(1, "John Wayne", "John Doe"), Row(2, "Ive Fish", "San Simon")))
val players = sqlContext.sparkContext.parallelize(Seq(
Row("John Wayne", 1986),
Row("Ive Fish", 1990),
Row("San Simon", 1974),
Row("John Doe", 1995)
))
val matchesDf = sqlContext.createDataFrame(matches, StructType(Seq(
StructField("matchId", IntegerType, nullable = false),
StructField("player1", StringType, nullable = false),
StructField("player2", StringType, nullable = false)))
).as('matches)
val playersDf = sqlContext.createDataFrame(players, StructType(Seq(
StructField("player", StringType, nullable = false),
StructField("birthYear", IntegerType, nullable = false)
))).as('players)
matchesDf
.join(playersDf, $"matches.player1" === $"players.player")
.select($"matches.matchId" as "matchId", $"matches.player1" as "player1", $"matches.player2" as "player2", $"players.birthYear" as "player1BirthYear")
.join(playersDf, $"player2" === $"players.player")
.select($"matchId" as "MatchID", $"player1" as "Player1", $"player2" as "Player2", $"player1BirthYear" as "BYear_P1", $"players.birthYear" as "BYear_P2")
.withColumn("Diff", abs('BYear_P2.minus('BYear_P1)))
.show()
+-------+----------+---------+--------+--------+----+
|MatchID| Player1| Player2|BYear_P1|BYear_P2|Diff|
+-------+----------+---------+--------+--------+----+
| 1|John Wayne| John Doe| 1986| 1995| 9|
| 2| Ive Fish|San Simon| 1990| 1974| 16|
+-------+----------+---------+--------+--------+----+
在 Spark 2.0 及更高版本中,Spark 提供了多种语法来连接两个数据帧
join(right: Dataset[_]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
Dataset class 中可用的所有这些 Spark Join 方法和这些方法 return DataFrame(注意 DataFrame = Dataset[Row])
所有这些方法都将第一个参数作为数据集[_],这意味着它也采用数据帧。
为了说明如何加入,我拿emp和dept DataFrame
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
.show(false)
如果您必须在两个数据框中连接相同的列名,您甚至可以忽略连接表达式。