从pyspark中的Dataframe方法访问广播字典

Accessing broadcast Dictionary from within Dataframe methods in pyspark

我有一个广播字典,我想用它来映射我 DataFrame 中的列值。假设我为此调用了 withColumn() 方法。

我只能让它与 UDF 一起工作,但不能直接使用:

sc = SparkContext()
ss = SparkSession(sc)
df = ss.createDataFrame( [ "a", "b" ], StringType() ).toDF("key")
# +---+                                                                           
# |key|
# +---+
# |  a|
# |  b|
# +---+
thedict={"a":"A","b":"B","c":"C"}
thedict_bc=sc.broadcast(thedict)

用文字引用或使用 UDF 工作正常:

df.withColumn('upper',lit(thedict_bc.value.get('c',"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    C|
# |  b|    C|
# +---+-----+
df.withColumn('upper',udf(lambda x : thedict_bc.value.get(x,"--"), StringType())('key')).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|    A|
# |  b|    B|
# +---+-----+

但是,直接从命令访问字典不会:

df.withColumn('upper',lit(thedict_bc.value.get(col('key'),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key,"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key.cast("string"),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# |  a|   --|
# |  b|   --|
# +---+-----+

我是不是遗漏了什么明显的东西?

TL;DR 你混淆了属于完全不同上下文的东西。符号 SQL 表达式(litcol 等)和纯 Python 代码。

你混淆了上下文。下一行:

thedict_bc.value.get(col('key'),"--")))

在驱动程序的Python中执行,字面意思是本地字典查找。 thedict 不包含 col('key') (字面意思,不涉及扩展)你总是得到默认值。

我个人会使用简单的 join:

lookup = sc.parallelize(thedict.items()).toDF(["key", "upper"])
df.join(lookup, ["key"], "left").na.fill("upper", "--").show()
+---+-----+                                                                     
|key|upper|
+---+-----+
|  b|    B|
|  a|    A|
+---+-----+

但是 udf(正如您已经确定的那样)或文字 map 也可以工作:

from pyspark.sql.functions import coalesce, create_map
from itertools import chain

thedict_col = create_map(*chain.from_iterable(
    (lit(k), lit(v)) for k, v in thedict.items()
))

df.withColumn('upper', coalesce(thedict_col[col("key")], lit("--"))).show()
+---+-----+
|key|upper|
+---+-----+
|  a|    A|
|  b|    B|
+---+-----+

备注:

  • 当然如果你想转换成大写,就用pyspark.sql.functions.upper.
  • 使用 some_broadcast.value 作为函数的参数根本不起作用。变量替换将在本地应用,并且不会使用广播。 value应该是在函数体中调用,所以在executor context中执行。