使用 ArrayType 列将 UDF 重写为 pandas udf
Rewrite UDF to pandas udf with ArrayType column
我正在尝试将 UDF 重写为 pandas UDF。
但是,当涉及到包含ArrayType 的列时。我正在努力寻找正确的解决方案。
我有一个数据框如下:
+-----------+--------------------+
| genre| ids|
+-----------+--------------------+
| Crime|[6, 22, 42, 47, 5...|
| Romance|[3, 7, 11, 15, 17...|
| Thriller|[6, 10, 16, 18, 2...|
| Adventure|[2, 8, 10, 15, 29...|
| Children|[1, 2, 8, 13, 34,...|
| Drama|[4, 11, 14, 16, 1...|
| War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
| Fantasy|[2, 56, 60, 126, ...|
| Mystery|[59, 113, 123, 16...|
+-----------+--------------------+
以下 UDF 运行良好:
pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))
输出如下:
+-----------+--------------------+
| genre| ids|
+-----------+--------------------+
| Crime|[[6, 22], [6, 42]...|
| Romance|[[3, 7], [3, 11],...|
| Thriller|[[6, 10], [6, 16]...|
| Adventure|[[2, 8], [2, 10],...|
| Children|[[1, 2], [1, 8], ...|
| Drama|[[4, 11], [4, 14]...|
| War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
| Fantasy|[[2, 56], [2, 60]...|
| Mystery|[[59, 113], [59, ...|
+-----------+--------------------+
但是,在 pandas udf
.
中编写函数时等效的是什么
PS:我明白了,或者,我可以使用交叉连接来达到相同的结果。
但是,我更好奇 pandas udf 如何处理 ArrayType 的列。
我将在这里分享我的发现:
要使 pandas udf 为您的项目工作,有 3 个方面:
1。 pandas UDF,或者更准确地说,Apache Arrow 不像普通的 udf 那样支持复杂类型。(截至 pyspark 3.0.1
、pyarrow 2.0.0
)
例如:
ArrayType(StringType())
由 pandas udf 支持。
不支持 ArrayType(StructType([...]))
。
你可以查看更多:https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
2。如果您是 运行 Java 11,这是 (py)Spark 3 中的默认值。您需要添加以下内容作为您的 spark 配置的一部分:
spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
这将解决上面提到的java.lang.UnsupportedOperationException
。
3。确保您的虚拟环境 python 路径已添加到您的 pyspark_python
即
environ['PYSPARK_PYTHON']='./your/virtual/enviroment/path'
我正在尝试将 UDF 重写为 pandas UDF。
但是,当涉及到包含ArrayType 的列时。我正在努力寻找正确的解决方案。
我有一个数据框如下:
+-----------+--------------------+
| genre| ids|
+-----------+--------------------+
| Crime|[6, 22, 42, 47, 5...|
| Romance|[3, 7, 11, 15, 17...|
| Thriller|[6, 10, 16, 18, 2...|
| Adventure|[2, 8, 10, 15, 29...|
| Children|[1, 2, 8, 13, 34,...|
| Drama|[4, 11, 14, 16, 1...|
| War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
| Fantasy|[2, 56, 60, 126, ...|
| Mystery|[59, 113, 123, 16...|
+-----------+--------------------+
以下 UDF 运行良好:
pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))
输出如下:
+-----------+--------------------+
| genre| ids|
+-----------+--------------------+
| Crime|[[6, 22], [6, 42]...|
| Romance|[[3, 7], [3, 11],...|
| Thriller|[[6, 10], [6, 16]...|
| Adventure|[[2, 8], [2, 10],...|
| Children|[[1, 2], [1, 8], ...|
| Drama|[[4, 11], [4, 14]...|
| War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
| Fantasy|[[2, 56], [2, 60]...|
| Mystery|[[59, 113], [59, ...|
+-----------+--------------------+
但是,在 pandas udf
.
PS:我明白了,或者,我可以使用交叉连接来达到相同的结果。
但是,我更好奇 pandas udf 如何处理 ArrayType 的列。
我将在这里分享我的发现:
要使 pandas udf 为您的项目工作,有 3 个方面:
1。 pandas UDF,或者更准确地说,Apache Arrow 不像普通的 udf 那样支持复杂类型。(截至 pyspark 3.0.1
、pyarrow 2.0.0
)
例如:
ArrayType(StringType())
由 pandas udf 支持。
不支持 ArrayType(StructType([...]))
。 你可以查看更多:https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
2。如果您是 运行 Java 11,这是 (py)Spark 3 中的默认值。您需要添加以下内容作为您的 spark 配置的一部分:
spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
这将解决上面提到的java.lang.UnsupportedOperationException
。
3。确保您的虚拟环境 python 路径已添加到您的 pyspark_python
即
environ['PYSPARK_PYTHON']='./your/virtual/enviroment/path'