修改 ArrayType 中的所有元素

Modify all elements in ArrayType

我有一个 DataFrame,其列为 ArrayType(StringType):

+------------------------------------+
|colname                             |
+------------------------------------+
|[foo_XX_foo, bar_YY_bar]            |
|[qwe_ZZ_rty, asd_AA_fgh, zxc_BB_vbn]|
+------------------------------------+

我现在想提取第一个和第二个 _ 之间的字符串,即预期输出是:

+------------+
|newcolname  |
+------------+
|[XX, YY]    |
|[ZZ, AA, BB]|
+------------+

之后,我尝试将 expr()transform 一起使用,但未能成功。即使将所有字符串更改为大写的示例,如上面引用的答案,对我也不起作用,我收到以下错误:

pyspark.sql.utils.ParseException: u"\nextraneous input '>' expecting {'(', 'SELECT', ...

如何修改 ArrayType 中的所有元素?我想避免使用 udf.

有点不安全,但试试这样:

df = spark.sparkContext.parallelize([
  [["foo_XX_foo", "bar_YY_bar"]],
  [["qwe_ZZ_rty", "asd_AA_fgh", "zxc_BB_vbn"]]
]).toDF(['colname'])

df.selectExpr('transform(colname, x -> split(x, "_")[1]) as newcolname').show()

结果是:

+------------+
|  newcolname|
+------------+
|    [XX, YY]|
|[ZZ, AA, BB]|
+------------+

由于您使用的是 Spark 版本 2.3.2,transform is not available to you. Thus as explained in the ,通常最好的方法是使用 udf.

但是,在这种特定情况下,您可以使用一些老掉牙的正则表达式替换来避免 udf

from pyspark.sql.functions import col, concat_ws, regexp_replace, split, trim

df.withColumn(
    "newcolname",
    regexp_replace(concat_ws(",", col("colname")), "((?<=_)[^_,]+(?=_))", "  ")
).withColumn(
    "newcolname",
    regexp_replace(col("newcolname"), "(_[^_ ]+_)", "")
).withColumn(
    "newcolname",
    regexp_replace(col("newcolname"), "([^_ ]+_)", "")
).withColumn(
    "newcolname",
    regexp_replace(col("newcolname"), "_([^_ ]+)", "")
).withColumn(
    "newcolname",
    split(trim(col("newcolname")), "\s+")
).show(truncate=False)
#+------------------------------------+------------+
#|colname                             |newcolname  |
#+------------------------------------+------------+
#|[foo_XX_foo, bar_YY_bar]            |[XX, YY]    |
#|[qwe_ZZ_rty, asd_AA_fgh, zxc_BB_vbn]|[ZZ, AA, BB]|
#+------------------------------------+------------+

说明

首先我们获取 ArrayType(StringType()) 列并将元素连接在一起形成一个字符串。我使用逗号作为分隔符,仅当逗号未出现在您的数据中时才有效。

接下来我们执行一系列 regexp_replace 调用。

第一个模式 ((?<=_)[^_,]+(?=_)) 标识您实际要提取的内容:下划线括起来的文本。然后将匹配组替换为空格包围的匹配组" "。与之前使用逗号分隔符一样,这​​假定空格不会出现在您的数据中。

例如:

df.select(
    regexp_replace(
        concat_ws(",", col("colname")), 
        "((?<=_)[^_,]+(?=_))", 
        "  "
    ).alias("pattern1")
).show(truncate=False)
#+--------------------------------------+
#|pattern1                              |
#+--------------------------------------+
#|foo_ XX _foo,bar_ YY _bar             |
#|qwe_ ZZ _rty,asd_ AA _fgh,zxc_ BB _vbn|
#+--------------------------------------+

接下来的 3 次调用 regexp_replace 有选择地删除不需要的字符串部分。

终于到了最后,只剩下想要的内容了。字符串被修剪以删除 trailing/leading 空格并拆分空格以获得最终结果。