Pyspark 使用 when() otherwise() 检查字典或映射中的值是否
Pyspark check if value in dictionary or map using when() otherwise()
我想测试列中的值是否存在于常规 python 字典或 when().otherwise() 代码块中的 pyspark 映射中,但无法找出正确的语法。将有多个 when() 子句使用“计数”列的排列,因此需要类似于“if/elif/else”的内容。 Dictionary/map 会很大,不会成为数据框中的一列
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from itertools import chain
spark = (SparkSession
.builder
.getOrCreate())
data = [('Category A', 100, "This is category A"),
('Category B', 120, "This is category B"),
('Category C', 150, None)]
schema = StructType([
StructField('Category', StringType(), True),
StructField('Count', IntegerType(), True),
StructField('Description', StringType(), True)
])
rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, schema)
### Can either match regular python dict, or pyspark map ###
sec_lookup = {120: "new_category"}
sec_lookup_map = F.create_map(*[F.lit(x) for x in chain(*sec_lookup.items())])
user_df = df.withColumn(
"new_col",
F.when(
df["Count"].value in sec_lookup.keys(), <--- WHAT IS CORRECT SYNTAX?
F.concat(F.col("Category"), F.lit("_add"))
).when(
...
...
)
.otherwise(
F.concat(F.col("Category"), F.lit("_old"))
)
)
使用 isin
我相信您不需要 if/elif/else 而 if/else 应该没问题,因为您只需检查字典键的成员资格:
sec_lookup = {120: "new_category"}
df = df.withColumn("new",F.when(F.col("Count").isin([*sec_lookup.keys()])
,F.concat("Category",F.lit("_new"))).otherwise(
F.concat("Category",F.lit("_old"))))
df.show()
+----------+-----+------------------+--------------+
| Category|Count| Description| new|
+----------+-----+------------------+--------------+
|Category A| 100|This is category A|Category A_old|
|Category B| 120|This is category B|Category B_new|
|Category C| 150| null|Category C_old|
+----------+-----+------------------+--------------+
我想测试列中的值是否存在于常规 python 字典或 when().otherwise() 代码块中的 pyspark 映射中,但无法找出正确的语法。将有多个 when() 子句使用“计数”列的排列,因此需要类似于“if/elif/else”的内容。 Dictionary/map 会很大,不会成为数据框中的一列
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from itertools import chain
spark = (SparkSession
.builder
.getOrCreate())
data = [('Category A', 100, "This is category A"),
('Category B', 120, "This is category B"),
('Category C', 150, None)]
schema = StructType([
StructField('Category', StringType(), True),
StructField('Count', IntegerType(), True),
StructField('Description', StringType(), True)
])
rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, schema)
### Can either match regular python dict, or pyspark map ###
sec_lookup = {120: "new_category"}
sec_lookup_map = F.create_map(*[F.lit(x) for x in chain(*sec_lookup.items())])
user_df = df.withColumn(
"new_col",
F.when(
df["Count"].value in sec_lookup.keys(), <--- WHAT IS CORRECT SYNTAX?
F.concat(F.col("Category"), F.lit("_add"))
).when(
...
...
)
.otherwise(
F.concat(F.col("Category"), F.lit("_old"))
)
)
使用 isin
我相信您不需要 if/elif/else 而 if/else 应该没问题,因为您只需检查字典键的成员资格:
sec_lookup = {120: "new_category"}
df = df.withColumn("new",F.when(F.col("Count").isin([*sec_lookup.keys()])
,F.concat("Category",F.lit("_new"))).otherwise(
F.concat("Category",F.lit("_old"))))
df.show()
+----------+-----+------------------+--------------+
| Category|Count| Description| new|
+----------+-----+------------------+--------------+
|Category A| 100|This is category A|Category A_old|
|Category B| 120|This is category B|Category B_new|
|Category C| 150| null|Category C_old|
+----------+-----+------------------+--------------+