Spark Scala 将数据框中的列值拆分为附加列表

Spark Scala split column values in a dataframe to appended lists

我在 spark 数据框中有数据,我需要按名称搜索元素,将值附加到列表,并将搜索到的元素拆分到数据框的单独列中。

我正在使用 Scala,下面是我当前代码的一个示例,它可以获取第一个值,但我需要附加所有可用的值,而不仅仅是第一个。

我是 Scala 的新手(python),所以非常感谢任何帮助!

val getNumber: (String => String) = (colString: String) => {
  if (colString != null) {
    raw"number:(\d+)".r
      .findAllIn(colString)
      .group(1)
  }
  else
    null
}

val udfGetColumn = udf(getNumber)

val mydf = df.select(cols.....)
.withColumn("var_number", udfGetColumn($"var"))

示例数据:

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|   key|           var                                                                                                                                                  |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |["[number:123456 rate:111970 position:1]","[number:123457 rate:662352 position:2]","[number:123458 rate:890 position:3]","[number:123459 rate:190 position:4]"] |                                                                                    |
|2     |["[number:654321 rate:211971 position:1]","[number:654322 rate:124 position:2]","[number:654323 rate:421 position:3]"]                                          |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

想要的结果:

+------+------------------------------------------------------------+
|   key|     var_number  |  var_rate           |    var_position    |
+------+------------------------------------------------------------+
|1     |       123456    |   111970            |         1          |
|1     |       123457    |   662352            |         2          | 
|1     |       123458    |   890               |         3          |
|1     |       123459    |   190               |         4          |     
|2     |       654321    |   211971            |         1          |
|2     |       654322    |   124               |         2          | 
|2     |       654323    |   421               |         3          |
+------+-----------------+---------------------+--------------------+

这里不需要使用UDF。使用 regexp_replace 函数删除方括号 ([]) 后,您可以轻松地 transform the array column var by converting each element into a map using str_to_map。最后,展开转换后的数组和 select 字段:

val df = Seq(
  (1, Seq("[number:123456 rate:111970 position:1]", "[number:123457 rate:662352 position:2]", "[number:123458 rate:890 position:3]", "[number:123459 rate:190 position:4]")),
  (2, Seq("[number:654321 rate:211971 position:1]", "[number:654322 rate:124 position:2]", "[number:654323 rate:421 position:3]"))
).toDF("key", "var")

val result = df.withColumn(
  "var", 
  explode(expr(raw"transform(var, x -> str_to_map(regexp_replace(x, '[\[\]]', ''), ' '))"))
).select(
  col("key"),
  col("var").getField("number").alias("var_number"),
  col("var").getField("rate").alias("var_rate"),
  col("var").getField("position").alias("var_position")
)

result.show
//+---+----------+--------+------------+
//|key|var_number|var_rate|var_position|
//+---+----------+--------+------------+
//|  1|    123456|  111970|           1|
//|  1|    123457|  662352|           2|
//|  1|    123458|     890|           3|
//|  1|    123459|     190|           4|
//|  2|    654321|  211971|           1|
//|  2|    654322|     124|           2|
//|  2|    654323|     421|           3|
//+---+----------+--------+------------+

根据您的评论,var 列似乎是字符串类型而不是数组。在这种情况下,您可以先通过删除 []" 字符进行转换,然后用逗号分隔得到一个数组:

val df = Seq(
  (1, """["[number:123456 rate:111970 position:1]", "[number:123457 rate:662352 position:2]", "[number:123458 rate:890 position:3]", "[number:123459 rate:190 position:4]"]"""),
  (2, """["[number:654321 rate:211971 position:1]", "[number:654322 rate:124 position:2]", "[number:654323 rate:421 position:3]"]""")
).toDF("key", "var")

val result = df.withColumn(
  "var", 
  split(regexp_replace(col("var"), "[\[\]\"]", ""), ",")
).withColumn(
  "var", 
  explode(expr("transform(var, x -> str_to_map(x, ' '))"))
).select(
  // select your columns as above...
)