Apache Spark -- 将 UDF 的结果分配给多个数据框列
Apache Spark -- Assign the result of UDF to multiple dataframe columns
我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据框中,作为预处理步骤,我需要对其中一列中可用的数据应用各种操作(即包含一个 json 字符串)。这将 return X 个值,每个值都需要存储在它们自己单独的列中。
该功能将在 UDF 中实现。但是,我不确定如何 return 来自该 UDF 的值列表并将它们输入到单独的列中。下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
将由 udf return编辑的两个(在此示例中)值存储在单独的列中的最佳方法是什么?现在它们被输入为字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
无法通过单个 UDF 调用创建多个顶级列,但您可以创建一个新的 struct
。它需要一个具有指定 returnType
:
的 UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType
schema = StructType([
StructField("foo", FloatType(), False),
StructField("bar", FloatType(), False)
])
def udf_test(n):
return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))
test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])
foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
## |-- foobar: struct (nullable = true)
## | |-- foo: float (nullable = false)
## | |-- bar: float (nullable = false)
您使用简单 select
:
进一步扁平化架构
foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+
另见 Derive multiple columns from a single column in a Spark DataFrame
您可以使用 flatMap 一次性获取所需数据帧的列
df=df.withColumn('udf_results',udf)
df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)
我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据框中,作为预处理步骤,我需要对其中一列中可用的数据应用各种操作(即包含一个 json 字符串)。这将 return X 个值,每个值都需要存储在它们自己单独的列中。
该功能将在 UDF 中实现。但是,我不确定如何 return 来自该 UDF 的值列表并将它们输入到单独的列中。下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
将由 udf return编辑的两个(在此示例中)值存储在单独的列中的最佳方法是什么?现在它们被输入为字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
无法通过单个 UDF 调用创建多个顶级列,但您可以创建一个新的 struct
。它需要一个具有指定 returnType
:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType
schema = StructType([
StructField("foo", FloatType(), False),
StructField("bar", FloatType(), False)
])
def udf_test(n):
return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))
test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])
foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
## |-- foobar: struct (nullable = true)
## | |-- foo: float (nullable = false)
## | |-- bar: float (nullable = false)
您使用简单 select
:
foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+
另见 Derive multiple columns from a single column in a Spark DataFrame
您可以使用 flatMap 一次性获取所需数据帧的列
df=df.withColumn('udf_results',udf)
df4=df.select('udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)