使用另一个 RDD/df 在 Spark RDD 或数据帧中执行 lookup/translation
Performing lookup/translation in a Spark RDD or data frame using another RDD/df
我很难实现一些看起来应该很容易的东西:
我的目标是在 RDD/dataframe 中使用第二个 RDD/dataframe 作为查找 table 或翻译词典进行翻译。我想在多个栏中进行这些翻译。
解释问题的最简单方法是举例。假设我有以下两个 RDD 作为我的输入:
Route SourceCityID DestinationCityID
A 1 2
B 1 3
C 2 1
和
CityID CityName
1 London
2 Paris
3 Tokyo
我想要的输出 RDD 是:
Route SourceCity DestinationCity
A London Paris
B London Tokyo
C Paris London
我应该如何制作它?
这是 SQL 中的一个简单问题,但我不知道在 Spark 中使用 RDD 的明显解决方案。 join、cogroup 等方法似乎不太适合多列 RDD,并且不允许指定要连接的列.
有什么想法吗? SQL上下文是答案吗?
假设我们有两个包含路线和城市的 RDD:
val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))
有几种方法可以实现城市查找。假设与包含许多项目的路线相比,城市查找包含的项目很少。在这种情况下,让我们开始收集城市作为由 driver 发送给每个任务的地图。
val citiesByID = citiesByIDRDD.collectAsMap
routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
为避免向每个任务发送查找 table,但只向工作人员发送一次,您可以扩展现有代码广播查找映射。
val bCitiesByID = sc.broadcast(citiesByID)
routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
我看不出这里需要数据框,但如果你愿意,你可以:
import sqlContext.implicits._
case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)
val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))
val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")
routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
| A| 1| 2|
| B| 1| 3|
| C| 2| 1|
+---+----+---+
citiesDf.show
+---+------+
| id| name|
+---+------+
| 1|London|
| 2| Paris|
| 3| Tokyo|
+---+------+
你提到在 SQL 中这是一个简单的问题,所以我想你可以从这里开始。执行 SQL 是这样的:
sqlContext.sql ("SELECT COUNT(*) FROM routes")
rdd 方式:
routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])
print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()
打印:
[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]
以及 SQLContext 方式:
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])
temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")
print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()
打印:
[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]
我很难实现一些看起来应该很容易的东西:
我的目标是在 RDD/dataframe 中使用第二个 RDD/dataframe 作为查找 table 或翻译词典进行翻译。我想在多个栏中进行这些翻译。
解释问题的最简单方法是举例。假设我有以下两个 RDD 作为我的输入:
Route SourceCityID DestinationCityID
A 1 2
B 1 3
C 2 1
和
CityID CityName
1 London
2 Paris
3 Tokyo
我想要的输出 RDD 是:
Route SourceCity DestinationCity
A London Paris
B London Tokyo
C Paris London
我应该如何制作它?
这是 SQL 中的一个简单问题,但我不知道在 Spark 中使用 RDD 的明显解决方案。 join、cogroup 等方法似乎不太适合多列 RDD,并且不允许指定要连接的列.
有什么想法吗? SQL上下文是答案吗?
假设我们有两个包含路线和城市的 RDD:
val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1)))
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo")))
有几种方法可以实现城市查找。假设与包含许多项目的路线相比,城市查找包含的项目很少。在这种情况下,让我们开始收集城市作为由 driver 发送给每个任务的地图。
val citiesByID = citiesByIDRDD.collectAsMap
routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
为避免向每个任务发送查找 table,但只向工作人员发送一次,您可以扩展现有代码广播查找映射。
val bCitiesByID = sc.broadcast(citiesByID)
routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London))
我看不出这里需要数据框,但如果你愿意,你可以:
import sqlContext.implicits._
case class Route(id: String, from: Int, to: Int)
case class City(id: Int, name: String)
val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo"))
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1))
val citiesDf = cities.df
citiesDf.registerTempTable("cities")
val routesDf = routes.df
citiesDf.registerTempTable("routes")
routesDf.show
+---+----+---+
| id|from| to|
+---+----+---+
| A| 1| 2|
| B| 1| 3|
| C| 2| 1|
+---+----+---+
citiesDf.show
+---+------+
| id| name|
+---+------+
| 1|London|
| 2| Paris|
| 3| Tokyo|
+---+------+
你提到在 SQL 中这是一个简单的问题,所以我想你可以从这里开始。执行 SQL 是这样的:
sqlContext.sql ("SELECT COUNT(*) FROM routes")
rdd 方式:
routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])
print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()
打印:
[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]
以及 SQLContext 方式:
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])
temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")
print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()
打印:
[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]