PySpark-如何使用一列中的行值访问与行值同名的另一列
PySpark- How to use a row value from one column to access another column which has the same name as of the row value
我有一个 PySpark df:
+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|
+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1|
| 1| 2| 43| 8| 10| 20| 43| e1|
| 2| 3| 15| 0| 1| 23| 7| b1|
| 3| 4| 2| 6| 11| 5| 8| d1|
| 4| 5| 6| 7| 2| 8| 1| f1|
+---+---+---+---+---+---+---+---+
我最终想创建另一个列 "out",其值基于 "ref" 列。例如,在第一行中,ref 列的值是 b1。在 "out" 列中,我希望看到 "b1" 列的值,即 23。
这是预期的输出:
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
| 2| 3| 15| 0| 1| 23| 7| b1| 15|
| 3| 4| 2| 6| 11| 5| 8| d1| 11|
| 4| 5| 6| 7| 2| 8| 1| f1| 1|
+---+---+---+---+---+---+---+---+---+
请指教如何实现"out"栏。我正在使用 Spark 1.6 version.Thanks
独立于版本,您可以转换为 RDD
、map
,然后转换回 DataFrame
:
df = spark.createDataFrame(
[(0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1")],
("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
)
df.rdd.map(lambda row: row + (row[row.ref], )).toDF(df.columns + ["out"])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
您还可以保留架构
from pyspark.sql.types import LongType, StructField
spark.createDataFrame(
df.rdd.map(lambda row: row + (row[row.ref], )),
df.schema.add(StructField("out", LongType())))
使用 DataFrames
可以编写复杂的 Columns
。在 1.6 中:
from pyspark.sql.functions import array, col, udf
from pyspark.sql.types import LongType, MapType, StringType
data_cols = [x for x in df.columns if x not in {"id", "ref"}]
# Literal map from column name to index
name_to_index = udf(
lambda: {x: i for i, x in enumerate(data_cols)},
MapType(StringType(), LongType())
)()
# Array of data
data_array = array(*[col(c) for c in data_cols])
df.withColumn("out", data_array[name_to_index[col("ref")]])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
在2.x中你可以跳过中间对象:
from pyspark.sql.functions import create_map, lit, col
from itertools import chain
# Map from column name to column value
name_to_value = create_map(*chain.from_iterable(
(lit(c), col(c)) for c in data_cols
))
df.withColumn("out", name_to_value[col("ref")])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
终于可以使用了when
:
from pyspark.sql.functions import col, lit, when
from functools import reduce
out = reduce(
lambda acc, x: when(col("ref") == x, col(x)).otherwise(acc),
data_cols,
lit(None)
)
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
OP 已询问 python 解决方案。我只是在 spark-scala 2.X 中回答相同的问题以供参考。希望对大家有帮助
scala> val df = Seq((0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1"), (2, 3, 15, 0, 1, 23, 7, "b1"),(3, 4, 2, 6, 11, 5, 8, "d1"),(4, 5, 6, 7, 2, 8, 1, "f1")).toDF("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
df: org.apache.spark.sql.DataFrame = [id: int, a1: int ... 6 more fields]
scala> df.show(false)
+---+---+---+---+---+---+---+---+
|id |a1 |b1 |c1 |d1 |e1 |f1 |ref|
+---+---+---+---+---+---+---+---+
|0 |1 |23 |4 |8 |9 |5 |b1 |
|1 |2 |43 |8 |10 |20 |43 |e1 |
|2 |3 |15 |0 |1 |23 |7 |b1 |
|3 |4 |2 |6 |11 |5 |8 |d1 |
|4 |5 |6 |7 |2 |8 |1 |f1 |
+---+---+---+---+---+---+---+---+
scala> val colx = df.columns.filter(x=>x!="ref").filter(x=>x!="id")
colx: Array[String] = Array(a1, b1, c1, d1, e1, f1)
scala> val colm = colx.map( x=> when(col("ref")===lit(x),col(x)) )
colm: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (ref = a1) THEN a1 END, CASE WHEN (ref = b1) THEN b1 END, CASE WHEN (ref = c1) THEN c1 END, CASE WHEN (ref = d1) THEN d1 END, CASE WHEN (ref = e1) THEN e1 END, CASE WHEN (ref = f1) THEN f1 END)
scala> df.select(col("*"),concat_ws("",array(colm:_*)).as("res1")).show(false)
+---+---+---+---+---+---+---+---+----+
|id |a1 |b1 |c1 |d1 |e1 |f1 |ref|res1|
+---+---+---+---+---+---+---+---+----+
|0 |1 |23 |4 |8 |9 |5 |b1 |23 |
|1 |2 |43 |8 |10 |20 |43 |e1 |20 |
|2 |3 |15 |0 |1 |23 |7 |b1 |15 |
|3 |4 |2 |6 |11 |5 |8 |d1 |11 |
|4 |5 |6 |7 |2 |8 |1 |f1 |1 |
+---+---+---+---+---+---+---+---+----+
scala>
我有一个 PySpark df:
+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|
+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1|
| 1| 2| 43| 8| 10| 20| 43| e1|
| 2| 3| 15| 0| 1| 23| 7| b1|
| 3| 4| 2| 6| 11| 5| 8| d1|
| 4| 5| 6| 7| 2| 8| 1| f1|
+---+---+---+---+---+---+---+---+
我最终想创建另一个列 "out",其值基于 "ref" 列。例如,在第一行中,ref 列的值是 b1。在 "out" 列中,我希望看到 "b1" 列的值,即 23。 这是预期的输出:
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
| 2| 3| 15| 0| 1| 23| 7| b1| 15|
| 3| 4| 2| 6| 11| 5| 8| d1| 11|
| 4| 5| 6| 7| 2| 8| 1| f1| 1|
+---+---+---+---+---+---+---+---+---+
请指教如何实现"out"栏。我正在使用 Spark 1.6 version.Thanks
独立于版本,您可以转换为 RDD
、map
,然后转换回 DataFrame
:
df = spark.createDataFrame(
[(0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1")],
("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
)
df.rdd.map(lambda row: row + (row[row.ref], )).toDF(df.columns + ["out"])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
您还可以保留架构
from pyspark.sql.types import LongType, StructField
spark.createDataFrame(
df.rdd.map(lambda row: row + (row[row.ref], )),
df.schema.add(StructField("out", LongType())))
使用 DataFrames
可以编写复杂的 Columns
。在 1.6 中:
from pyspark.sql.functions import array, col, udf
from pyspark.sql.types import LongType, MapType, StringType
data_cols = [x for x in df.columns if x not in {"id", "ref"}]
# Literal map from column name to index
name_to_index = udf(
lambda: {x: i for i, x in enumerate(data_cols)},
MapType(StringType(), LongType())
)()
# Array of data
data_array = array(*[col(c) for c in data_cols])
df.withColumn("out", data_array[name_to_index[col("ref")]])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
在2.x中你可以跳过中间对象:
from pyspark.sql.functions import create_map, lit, col
from itertools import chain
# Map from column name to column value
name_to_value = create_map(*chain.from_iterable(
(lit(c), col(c)) for c in data_cols
))
df.withColumn("out", name_to_value[col("ref")])
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
终于可以使用了when
:
from pyspark.sql.functions import col, lit, when
from functools import reduce
out = reduce(
lambda acc, x: when(col("ref") == x, col(x)).otherwise(acc),
data_cols,
lit(None)
)
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1| 23|
| 1| 2| 43| 8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+
OP 已询问 python 解决方案。我只是在 spark-scala 2.X 中回答相同的问题以供参考。希望对大家有帮助
scala> val df = Seq((0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1"), (2, 3, 15, 0, 1, 23, 7, "b1"),(3, 4, 2, 6, 11, 5, 8, "d1"),(4, 5, 6, 7, 2, 8, 1, "f1")).toDF("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
df: org.apache.spark.sql.DataFrame = [id: int, a1: int ... 6 more fields]
scala> df.show(false)
+---+---+---+---+---+---+---+---+
|id |a1 |b1 |c1 |d1 |e1 |f1 |ref|
+---+---+---+---+---+---+---+---+
|0 |1 |23 |4 |8 |9 |5 |b1 |
|1 |2 |43 |8 |10 |20 |43 |e1 |
|2 |3 |15 |0 |1 |23 |7 |b1 |
|3 |4 |2 |6 |11 |5 |8 |d1 |
|4 |5 |6 |7 |2 |8 |1 |f1 |
+---+---+---+---+---+---+---+---+
scala> val colx = df.columns.filter(x=>x!="ref").filter(x=>x!="id")
colx: Array[String] = Array(a1, b1, c1, d1, e1, f1)
scala> val colm = colx.map( x=> when(col("ref")===lit(x),col(x)) )
colm: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (ref = a1) THEN a1 END, CASE WHEN (ref = b1) THEN b1 END, CASE WHEN (ref = c1) THEN c1 END, CASE WHEN (ref = d1) THEN d1 END, CASE WHEN (ref = e1) THEN e1 END, CASE WHEN (ref = f1) THEN f1 END)
scala> df.select(col("*"),concat_ws("",array(colm:_*)).as("res1")).show(false)
+---+---+---+---+---+---+---+---+----+
|id |a1 |b1 |c1 |d1 |e1 |f1 |ref|res1|
+---+---+---+---+---+---+---+---+----+
|0 |1 |23 |4 |8 |9 |5 |b1 |23 |
|1 |2 |43 |8 |10 |20 |43 |e1 |20 |
|2 |3 |15 |0 |1 |23 |7 |b1 |15 |
|3 |4 |2 |6 |11 |5 |8 |d1 |11 |
|4 |5 |6 |7 |2 |8 |1 |f1 |1 |
+---+---+---+---+---+---+---+---+----+
scala>