Pyspark 使用 sql.transform 使包含结构数组的列中的所有空字符串无效

Pyspark use sql.transform to nullify all empty strings in a column containing an array of structs

我在 pyspark df 中有一列,其中包含如下所示的地图数组:

[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]

df.printSchema() returns 列的以下内容:

 |-- constituencies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |    |-- stateProvince: string (nullable = true)

我想使所有这些空字符串无效。所以我认为这将是一个用 F.transform(col, f)

解决的完美问题

所以我创建了函数,然后在转换表达式中使用它,如下所示:

def nullify_vals(d):
  def nullify_string(str_):
    if str_.strip() == "":
      return None
    return str_.strip()
  
  return (
    dict((k, nullify_string(v)) for k, v in d.items())  
  )

请注意,以上内容在字典上测试时有效:

dd = {"my": "map", "is": "", "not": "   ", "entierly": "  empty , right?"}
d_cln = nullify_vals(dd)  
d_cln["not"] is None # returns True

但是当我在 Pyspark 中使用它时,它给我一个错误:

import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))

TypeError: 'Column' object is not callable

这些是堆栈跟踪的最后几行:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))

File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
   4214 def transform(col, f):
   4215     """
   4216     Returns an array of elements after applying a transformation to each element in the input array.
   4217 
   (...)
   4258     +--------------+
   4259     """
-> 4260     return _invoke_higher_order_function("ArrayTransform", [col], [f])

File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
   4206 expr = getattr(expressions, name)
   4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
   4211 return Column(sc._jvm.Column(expr(*jcols + jfuns)))

我仍在调查您遇到的错误,当我弄清楚问题所在时,我会更新 post。与此同时,你可以做类似的事情来解决它

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(
    StructType([
      StructField('address', StringType()),
      StructField('city', StringType()),
      StructField('country', StringType()),
      StructField('note', StringType()),
      StructField('stateProvince', StringType()),
  ]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)

result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))

您得到的具体错误是说您不能将 d.items() 作为函数调用,并且输入函数确实需要处理传入的 Column 对象 d

pyspark.sql.functions.transform 的描述说,“Returns 对输入数组中的每个元素应用转换后的元素数组。”

但是在接受函数 f 的描述中,它说,“...并且可以使用 Column 的方法,pyspark.sql.functions 和 [=17 中定义的函数=]。Python UserDefinedFunctions 不受支持 (SPARK-27052)。”所以它还不能接受自定义 Python UserDefinedFunctions,这正是你想要做的。

您的函数 nullify_vals 应该采用 StructType 类型的 Column 对象,因为您的数组元素是结构。但是你传递的是一个普通的 python 对象。

尝试像这样重写它:

from pyspark.sql import functions as F, Column

def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
    for f in fields:
        struct_col = struct_col.withField(
            f,
            F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
        )

    return struct_col

对于内部结构中的每个字段,我们使用列withField方法来更新它,如果它等于空字符串,那么我们将它设置为空。

应用于您的输入示例:

json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))

您可以从数据框架构中获取 constituencies 结构字段的列表:

constituencies_fields = df.selectExpr("inline(constituencies)").columns

df1 = df.withColumn(
    "constituencies",
    F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)

df1.show(truncate=False)
#+----------------------------------------+
#|constituencies                          |
#+----------------------------------------+
#|[{Fadden, null, null, null, Queensland}]|
#+----------------------------------------+