UDF 使用数组拆分列中的字符串
UDFs Use Array to Split String in Column
我有一个包含两列的 df。一列是字符串,另一列是整数数组。
root
|-- col1: string (nullable = true)
|-- col2: array (nullable = true)
| |-- element: integer (containsNull = true)
数据框看起来像:
+--------------------+------------+
| col1| col2|
+--------------------+------------+
|Barkley likes peo...|[22, 22, 25]|
+--------------------+------------+
数组实际上告诉我需要在 col1 中的什么地方拆分句子。
如果 col1 中的值是“巴克利喜欢人。巴克利喜欢款待。巴克利喜欢一切。”该数组告诉我从 0-22 个字符是第一句话,从 22 到 44 (22+22) 是第二句话,最后一句话是从 44(22+22) 到 69 (44+25)。
我需要避免向驱动程序节点发送任何内容并保持并行性。那么我的问题是如何创建一个 udf 来利用数组中的整数来拆分 col1 中的句子?输出可以利用 withColumn 和 return 三个新列或每个句子的映射。我可以在没有 for 循环、列表理解、collect() 或 select() 的情况下执行此操作吗?
对于Spark版本>=2.4,我们可以利用高阶函数来处理数组,包括这个问题。假设 df
是数据框。
df = spark.createDataFrame([
("Barkley likes people. Barkley likes treats. Barkley likes everything.",[22, 22, 25]),
("A sentence. Another sentence.",[13, 18]),
("One sheep. Two sheep. Three sheep. Four sheep.",[11, 12, 13, 12])],
"col1:string, col2:array<int>")
df.show()
# +--------------------+----------------+
# | col1| col2|
# +--------------------+----------------+
# |Barkley likes peo...| [22, 22, 25]|
# |A sentence. Anoth...| [13, 18]|
# |One sheep. Two sh...|[11, 12, 13, 12]|
# +--------------------+----------------+
要从 col1
分割句子,将使用 substring
函数,它需要 起始位置 和 长度[= 的参数26=]。 col2
是字符串中每个句子的长度。每个句子的起始位置是数组 col2
从 0 到 n-1 的累加和,如问题中所暗示的。为此,请使用高阶函数 transform
和 aggregate
。之后,获取每个句子并使用 map_from_entries
为每个句子及其索引创建一个映射。这是一个这样做的例子。
import pyspark.sql.functions as F
df = (df
.withColumn("start", F.expr("transform(transform(col2, (v1,i) -> slice(col2, 1, i)), v2 -> aggregate(v2, 0, (a,b) -> a + b))"))
.withColumn("sentences", F.expr("transform(col2, (v, i) -> struct(i+1 as index, substring(col1, start[i], col2[i]) as sentence))"))
.selectExpr("col1", "map_from_entries(sentences) as sentences")
)
df.show(truncate=False)
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+
# |col1 |sentences |
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+
# |Barkley likes people. Barkley likes treats. Barkley likes everything.|[1 -> Barkley likes people. , 2 -> Barkley likes treats., 3 -> Barkley likes everything]|
# |A sentence. Another sentence. |[1 -> A sentence. A, 2 -> Another sentence.] |
# |One sheep. Two sheep. Three sheep. Four sheep. |[1 -> One sheep. , 2 -> Two sheep. , 3 -> Three sheep. , 4 -> Four sheep.] |
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+
我有一个包含两列的 df。一列是字符串,另一列是整数数组。
root
|-- col1: string (nullable = true)
|-- col2: array (nullable = true)
| |-- element: integer (containsNull = true)
数据框看起来像:
+--------------------+------------+
| col1| col2|
+--------------------+------------+
|Barkley likes peo...|[22, 22, 25]|
+--------------------+------------+
数组实际上告诉我需要在 col1 中的什么地方拆分句子。
如果 col1 中的值是“巴克利喜欢人。巴克利喜欢款待。巴克利喜欢一切。”该数组告诉我从 0-22 个字符是第一句话,从 22 到 44 (22+22) 是第二句话,最后一句话是从 44(22+22) 到 69 (44+25)。
我需要避免向驱动程序节点发送任何内容并保持并行性。那么我的问题是如何创建一个 udf 来利用数组中的整数来拆分 col1 中的句子?输出可以利用 withColumn 和 return 三个新列或每个句子的映射。我可以在没有 for 循环、列表理解、collect() 或 select() 的情况下执行此操作吗?
对于Spark版本>=2.4,我们可以利用高阶函数来处理数组,包括这个问题。假设 df
是数据框。
df = spark.createDataFrame([
("Barkley likes people. Barkley likes treats. Barkley likes everything.",[22, 22, 25]),
("A sentence. Another sentence.",[13, 18]),
("One sheep. Two sheep. Three sheep. Four sheep.",[11, 12, 13, 12])],
"col1:string, col2:array<int>")
df.show()
# +--------------------+----------------+
# | col1| col2|
# +--------------------+----------------+
# |Barkley likes peo...| [22, 22, 25]|
# |A sentence. Anoth...| [13, 18]|
# |One sheep. Two sh...|[11, 12, 13, 12]|
# +--------------------+----------------+
要从 col1
分割句子,将使用 substring
函数,它需要 起始位置 和 长度[= 的参数26=]。 col2
是字符串中每个句子的长度。每个句子的起始位置是数组 col2
从 0 到 n-1 的累加和,如问题中所暗示的。为此,请使用高阶函数 transform
和 aggregate
。之后,获取每个句子并使用 map_from_entries
为每个句子及其索引创建一个映射。这是一个这样做的例子。
import pyspark.sql.functions as F
df = (df
.withColumn("start", F.expr("transform(transform(col2, (v1,i) -> slice(col2, 1, i)), v2 -> aggregate(v2, 0, (a,b) -> a + b))"))
.withColumn("sentences", F.expr("transform(col2, (v, i) -> struct(i+1 as index, substring(col1, start[i], col2[i]) as sentence))"))
.selectExpr("col1", "map_from_entries(sentences) as sentences")
)
df.show(truncate=False)
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+
# |col1 |sentences |
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+
# |Barkley likes people. Barkley likes treats. Barkley likes everything.|[1 -> Barkley likes people. , 2 -> Barkley likes treats., 3 -> Barkley likes everything]|
# |A sentence. Another sentence. |[1 -> A sentence. A, 2 -> Another sentence.] |
# |One sheep. Two sheep. Three sheep. Four sheep. |[1 -> One sheep. , 2 -> Two sheep. , 3 -> Three sheep. , 4 -> Four sheep.] |
# +---------------------------------------------------------------------+------------------------------------------------------------------------------------------+