从pyspark中的Dataframe方法访问广播字典
Accessing broadcast Dictionary from within Dataframe methods in pyspark
我有一个广播字典,我想用它来映射我 DataFrame
中的列值。假设我为此调用了 withColumn()
方法。
我只能让它与 UDF 一起工作,但不能直接使用:
sc = SparkContext()
ss = SparkSession(sc)
df = ss.createDataFrame( [ "a", "b" ], StringType() ).toDF("key")
# +---+
# |key|
# +---+
# | a|
# | b|
# +---+
thedict={"a":"A","b":"B","c":"C"}
thedict_bc=sc.broadcast(thedict)
用文字引用或使用 UDF 工作正常:
df.withColumn('upper',lit(thedict_bc.value.get('c',"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| C|
# | b| C|
# +---+-----+
df.withColumn('upper',udf(lambda x : thedict_bc.value.get(x,"--"), StringType())('key')).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| A|
# | b| B|
# +---+-----+
但是,直接从命令访问字典不会:
df.withColumn('upper',lit(thedict_bc.value.get(col('key'),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key,"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key.cast("string"),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
我是不是遗漏了什么明显的东西?
TL;DR 你混淆了属于完全不同上下文的东西。符号 SQL 表达式(lit
、col
等)和纯 Python 代码。
你混淆了上下文。下一行:
thedict_bc.value.get(col('key'),"--")))
在驱动程序的Python中执行,字面意思是本地字典查找。 thedict
不包含 col('key')
(字面意思,不涉及扩展)你总是得到默认值。
我个人会使用简单的 join
:
lookup = sc.parallelize(thedict.items()).toDF(["key", "upper"])
df.join(lookup, ["key"], "left").na.fill("upper", "--").show()
+---+-----+
|key|upper|
+---+-----+
| b| B|
| a| A|
+---+-----+
但是 udf
(正如您已经确定的那样)或文字 map
也可以工作:
from pyspark.sql.functions import coalesce, create_map
from itertools import chain
thedict_col = create_map(*chain.from_iterable(
(lit(k), lit(v)) for k, v in thedict.items()
))
df.withColumn('upper', coalesce(thedict_col[col("key")], lit("--"))).show()
+---+-----+
|key|upper|
+---+-----+
| a| A|
| b| B|
+---+-----+
备注:
- 当然如果你想转换成大写,就用
pyspark.sql.functions.upper
.
- 使用
some_broadcast.value
作为函数的参数根本不起作用。变量替换将在本地应用,并且不会使用广播。 value
应该是在函数体中调用,所以在executor context中执行。
我有一个广播字典,我想用它来映射我 DataFrame
中的列值。假设我为此调用了 withColumn()
方法。
我只能让它与 UDF 一起工作,但不能直接使用:
sc = SparkContext()
ss = SparkSession(sc)
df = ss.createDataFrame( [ "a", "b" ], StringType() ).toDF("key")
# +---+
# |key|
# +---+
# | a|
# | b|
# +---+
thedict={"a":"A","b":"B","c":"C"}
thedict_bc=sc.broadcast(thedict)
用文字引用或使用 UDF 工作正常:
df.withColumn('upper',lit(thedict_bc.value.get('c',"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| C|
# | b| C|
# +---+-----+
df.withColumn('upper',udf(lambda x : thedict_bc.value.get(x,"--"), StringType())('key')).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| A|
# | b| B|
# +---+-----+
但是,直接从命令访问字典不会:
df.withColumn('upper',lit(thedict_bc.value.get(col('key'),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key,"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
df.withColumn('upper',lit(thedict_bc.value.get(df.key.cast("string"),"--"))).show()
# +---+-----+
# |key|upper|
# +---+-----+
# | a| --|
# | b| --|
# +---+-----+
我是不是遗漏了什么明显的东西?
TL;DR 你混淆了属于完全不同上下文的东西。符号 SQL 表达式(lit
、col
等)和纯 Python 代码。
你混淆了上下文。下一行:
thedict_bc.value.get(col('key'),"--")))
在驱动程序的Python中执行,字面意思是本地字典查找。 thedict
不包含 col('key')
(字面意思,不涉及扩展)你总是得到默认值。
我个人会使用简单的 join
:
lookup = sc.parallelize(thedict.items()).toDF(["key", "upper"])
df.join(lookup, ["key"], "left").na.fill("upper", "--").show()
+---+-----+
|key|upper|
+---+-----+
| b| B|
| a| A|
+---+-----+
但是 udf
(正如您已经确定的那样)或文字 map
也可以工作:
from pyspark.sql.functions import coalesce, create_map
from itertools import chain
thedict_col = create_map(*chain.from_iterable(
(lit(k), lit(v)) for k, v in thedict.items()
))
df.withColumn('upper', coalesce(thedict_col[col("key")], lit("--"))).show()
+---+-----+
|key|upper|
+---+-----+
| a| A|
| b| B|
+---+-----+
备注:
- 当然如果你想转换成大写,就用
pyspark.sql.functions.upper
. - 使用
some_broadcast.value
作为函数的参数根本不起作用。变量替换将在本地应用,并且不会使用广播。value
应该是在函数体中调用,所以在executor context中执行。