根据字典中的值创建列

Create a column based on values in a dictionary

我有这样一本字典:

sample_dict = {
    "A": ["aaaa\.com", "aaaa\.es"],
    "B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
    "C": ["ccccc\.com"],
    # many more entries here
}

我想在执行以下操作的 Spark DataFrame 中添加一列:

(
    df
    .withColumn(
        "new_col",
        F.when(
            (F.col("filter_col").rlike("aaaa\.com")) | 
            (F.col("filter_col").rlike("aaaa\.es")),
            F.lit("A") 
        )
        .when(
            (F.col("filter_col").rlike("bbbb\.com")) | 
            (F.col("filter_col").rlike("bbbb\.es")) | 
            (F.col("filter_col").rlike("bbbb\.net")),
            F.lit("B") 
        )
        .when(
            (F.col("filter_col").rlike("cccc\.com")),
            F.lit("C") 
        )
        .otherwise(None)
    )
)

但是,当然,我希望它是动态的,这样我就可以将新组件添加到我的字典中,并且该列会自动考虑它们并根据规则添加一个新类别。

这可能吗?

如果您可以更改您的列以便查找完全匹配的内容,您可以使用 df.replace():

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(filter_col='aaa.de'),
    Row(filter_col='aaa.es'),
    Row(filter_col='bbb.de'),
    Row(filter_col='bbb.es'),
])

d = {
    'aaa.de': 'A',
    'aaa.es': 'A',
    'bbb.de': 'B',
    'bbb.es': 'B',
}

(
    df
    .withColumn('new_col', F.col('filter_col'))
    .withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
    .replace(d, None, subset='new_col')
    .show()
)

# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
|    aaa.de|      A|
|    aaa.es|      A|
|    bbb.de|      B|
|    bbb.es|      B|
|       foo|   null|
+----------+-------+

可能有一种更高效的方法来用“None”(您的“否则”条件)替换字典中未提及的值。


更新:

如果无法重新格式化,您将不得不遍历您的字典:

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(filter_col='aaa.de/foo'),
    Row(filter_col='aaa.es/foo'),
    Row(filter_col='bbb.de/foo'),
    Row(filter_col='bbb.es/foo'),
    Row(filter_col='foo'),
])

d = {
    'aaa\.de': 'A',
    'aaa\.es': 'A',
    'bbb\.de': 'B',
    'bbb\.es': 'B',
}

df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
    df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
    
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo|      A|
|aaa.es/foo|      A|
|bbb.de/foo|      B|
|bbb.es/foo|      B|
|       foo|   null|
+----------+-------+

您可以通过遍历 dict 来构建列表达式,并将此表达式分配给您的 withColumn 调用。


from pyspark.sql import functions as F

sample_dict = {
    "A": ["aaaa\.com", "aaaa\.es"],
    "B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
    "C": ["ccccc\.com"],
    # many more entries here
}

data = [("aaaa.com", ), ("aaaa.es", ), ("bbbb.com", ), ("zzzz.com", ), ]

df = spark.createDataFrame(data, ("filter_col", ))

column_expression = F
for k, conditions in sample_dict.items():
    condition_expression = F.col("filter_col").rlike(conditions[0])
    for condition in conditions[1:]:
        condition_expression |= F.col("filter_col").rlike(condition)
    column_expression = column_expression.when(condition_expression, F.lit(k))

df.withColumn("new_col", column_expression.otherwise(None)).show()

输出

# column_expression Equivalent to writing the expression by hand
Column<'CASE WHEN (RLIKE(filter_col, aaaa\.com) OR RLIKE(filter_col, aaaa\.es)) THEN A WHEN ((RLIKE(filter_col, bbbb\.com) OR RLIKE(filter_col, bbbb\.es)) OR RLIKE(filter_col, bbbb\.net)) THEN B WHEN RLIKE(filter_col, ccccc\.com) THEN C END'>


## Df with expression applied
+----------+-------+
|filter_col|new_col|
+----------+-------+
|  aaaa.com|      A|
|   aaaa.es|      A|
|  bbbb.com|      B|
|  zzzz.com|   null|
+----------+-------+