pandas_udf 从包含地图的列中提取值

pandas_udf to extract a value from a column containing maps

我有以下 spark df

id | country
------------------
1  | Null
2  | {"date": null, "value": "BRA", "context": "nationality", "state": null}
3  | {"date": null, "value": "ITA", "context": "residence", "state": null}
4  | {"date": null, "value": null, "context": null, "state": null}

我想创建一个 pandas 用户定义的函数,当 运行 如下所示时,将输出如下所示的 df:

(我在 databricks notebooks 上工作,显示功能只是在控制台打印括号内命令的输出)

display(df.withColumn("country_context", get_country_context(col("country"))))

会输出

id | country      | country_context
-----------------------------------
1  | Null         | null
2  | {"date": n...| nationality 
3  | {"date": n...| residence
4  | {"date": n...| null

我创建的pandas_udf如下:

from pyspark.sql.functions import pandas_udf, col
import pandas as pd

@pandas_udf("string")
def get_country_context(country_series: pd.Series) -> pd.Series:
  return country_series.map(lambda d:
                            d.get("context", "Null") 
                            if d else "Null")

display(df
        .withColumn("country_context", get_country_context(col("country"))))

我收到以下错误:

PythonException: 'AttributeError: 'DataFrame' object has no attribute 'map''

我知道我不需要 udf,也不需要 pandas_udf - 但我想了解为什么我的函数不起作用。

我将语法从 Series -> Series 更改为 It[Series] -> It[Series] 并且它有效。不知道为什么,但确实如此。

@pandas_udf('string')
def my_udf(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    return map(lambda d:d.get("context", "Null"), iterator)