根据地图的内容更新数据框的行
Update rows of dataframe according to the content of a map
我有这个数据框:
+------+----------+-----------+
|brand |Timestamp |Weight |
+------+----------+-----------+
|BR1 |1632899456|4.0 |
|BR1 |1632901256|4.0 |
|BR300 |1632901796|2.0 |
|BR300 |1632899155|2.0 |
|BR200 |1632899155|2.0 |
还有这张地图:
val map : Map[String, Double] = ("BR1" -> 70.0, "BR300" -> 90.0 )
我想根据地图中的内容更新“权重”栏。
目的是将行中的值与地图中的值相加。
输出应如下所示:
+------+----------+-----------+
|brand |Timestamp |Weight |
+------+----------+-----------+
|BR1 |1632899456|74.0 |
|BR1 |1632901256|74.0 |
|BR300 |1632901796|92.0 |
|BR300 |1632899155|92.0 |
|BR200 |1632899155|2.0 |
我正在使用 Spark 版本 3.0.2 和 SQLContext,以及 scala 语言。
您可以使用 UDF
从地图中获取值,然后对列值求和。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(("BR1", 1632901256, 4.0),
("BR300", 1632901796, 2.0),
("BR200", 1632899155, 2.0)).toDF("brand", "Timestamp", "Weight")
val map: Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0)
val broadcastedMap = spark.sparkContext.broadcast(map)
val getvalueFromMap = udf((s: String) => broadcastedMap.value.getOrElse(s, 0.0))
df.withColumn("Weight", getvalueFromMap('brand) + 'Weight).show()
/*
+-----+----------+------+
|brand| Timestamp|Weight|
+-----+----------+------+
| BR1|1632901256| 74.0|
|BR300|1632901796| 92.0|
|BR200|1632899155| 2.0|
+-----+----------+------+*/
地图可以翻译成SQL语句。这避免了使用 UDF,因此可能会提高性能。
val df = ...
val map : Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0 )
val sql=map.foldLeft("Weight + case brand ")((a, b) => s"$a when '${b._1}' then ${b._2}") + " else 0.0 end"
df.withColumn("Weight", expr(sql)).show()
生成的sql
字符串为
Weight + case brand when 'BR1' then 70.0 when 'BR300' then 90.0 else 0.0 end
输出:
+-----+----------+------+
|brand| Timestamp|Weight|
+-----+----------+------+
| BR1|1632899456| 74.0|
| BR1|1632901256| 74.0|
|BR300|1632901796| 92.0|
|BR300|1632899155| 92.0|
|BR200|1632899155| 2.0|
+-----+----------+------+
我有这个数据框:
+------+----------+-----------+
|brand |Timestamp |Weight |
+------+----------+-----------+
|BR1 |1632899456|4.0 |
|BR1 |1632901256|4.0 |
|BR300 |1632901796|2.0 |
|BR300 |1632899155|2.0 |
|BR200 |1632899155|2.0 |
还有这张地图:
val map : Map[String, Double] = ("BR1" -> 70.0, "BR300" -> 90.0 )
我想根据地图中的内容更新“权重”栏。
目的是将行中的值与地图中的值相加。
输出应如下所示:
+------+----------+-----------+
|brand |Timestamp |Weight |
+------+----------+-----------+
|BR1 |1632899456|74.0 |
|BR1 |1632901256|74.0 |
|BR300 |1632901796|92.0 |
|BR300 |1632899155|92.0 |
|BR200 |1632899155|2.0 |
我正在使用 Spark 版本 3.0.2 和 SQLContext,以及 scala 语言。
您可以使用 UDF
从地图中获取值,然后对列值求和。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(("BR1", 1632901256, 4.0),
("BR300", 1632901796, 2.0),
("BR200", 1632899155, 2.0)).toDF("brand", "Timestamp", "Weight")
val map: Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0)
val broadcastedMap = spark.sparkContext.broadcast(map)
val getvalueFromMap = udf((s: String) => broadcastedMap.value.getOrElse(s, 0.0))
df.withColumn("Weight", getvalueFromMap('brand) + 'Weight).show()
/*
+-----+----------+------+
|brand| Timestamp|Weight|
+-----+----------+------+
| BR1|1632901256| 74.0|
|BR300|1632901796| 92.0|
|BR200|1632899155| 2.0|
+-----+----------+------+*/
地图可以翻译成SQL语句。这避免了使用 UDF,因此可能会提高性能。
val df = ...
val map : Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0 )
val sql=map.foldLeft("Weight + case brand ")((a, b) => s"$a when '${b._1}' then ${b._2}") + " else 0.0 end"
df.withColumn("Weight", expr(sql)).show()
生成的sql
字符串为
Weight + case brand when 'BR1' then 70.0 when 'BR300' then 90.0 else 0.0 end
输出:
+-----+----------+------+
|brand| Timestamp|Weight|
+-----+----------+------+
| BR1|1632899456| 74.0|
| BR1|1632901256| 74.0|
|BR300|1632901796| 92.0|
|BR300|1632899155| 92.0|
|BR200|1632899155| 2.0|
+-----+----------+------+