根据字典中的值创建列
Create a column based on values in a dictionary
我有这样一本字典:
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
我想在执行以下操作的 Spark DataFrame 中添加一列:
(
df
.withColumn(
"new_col",
F.when(
(F.col("filter_col").rlike("aaaa\.com")) |
(F.col("filter_col").rlike("aaaa\.es")),
F.lit("A")
)
.when(
(F.col("filter_col").rlike("bbbb\.com")) |
(F.col("filter_col").rlike("bbbb\.es")) |
(F.col("filter_col").rlike("bbbb\.net")),
F.lit("B")
)
.when(
(F.col("filter_col").rlike("cccc\.com")),
F.lit("C")
)
.otherwise(None)
)
)
但是,当然,我希望它是动态的,这样我就可以将新组件添加到我的字典中,并且该列会自动考虑它们并根据规则添加一个新类别。
这可能吗?
如果您可以更改您的列以便查找完全匹配的内容,您可以使用 df.replace()
:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de'),
Row(filter_col='aaa.es'),
Row(filter_col='bbb.de'),
Row(filter_col='bbb.es'),
])
d = {
'aaa.de': 'A',
'aaa.es': 'A',
'bbb.de': 'B',
'bbb.es': 'B',
}
(
df
.withColumn('new_col', F.col('filter_col'))
.withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
.replace(d, None, subset='new_col')
.show()
)
# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaa.de| A|
| aaa.es| A|
| bbb.de| B|
| bbb.es| B|
| foo| null|
+----------+-------+
可能有一种更高效的方法来用“None”(您的“否则”条件)替换字典中未提及的值。
更新:
如果无法重新格式化,您将不得不遍历您的字典:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de/foo'),
Row(filter_col='aaa.es/foo'),
Row(filter_col='bbb.de/foo'),
Row(filter_col='bbb.es/foo'),
Row(filter_col='foo'),
])
d = {
'aaa\.de': 'A',
'aaa\.es': 'A',
'bbb\.de': 'B',
'bbb\.es': 'B',
}
df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo| A|
|aaa.es/foo| A|
|bbb.de/foo| B|
|bbb.es/foo| B|
| foo| null|
+----------+-------+
您可以通过遍历 dict
来构建列表达式,并将此表达式分配给您的 withColumn
调用。
from pyspark.sql import functions as F
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
data = [("aaaa.com", ), ("aaaa.es", ), ("bbbb.com", ), ("zzzz.com", ), ]
df = spark.createDataFrame(data, ("filter_col", ))
column_expression = F
for k, conditions in sample_dict.items():
condition_expression = F.col("filter_col").rlike(conditions[0])
for condition in conditions[1:]:
condition_expression |= F.col("filter_col").rlike(condition)
column_expression = column_expression.when(condition_expression, F.lit(k))
df.withColumn("new_col", column_expression.otherwise(None)).show()
输出
# column_expression Equivalent to writing the expression by hand
Column<'CASE WHEN (RLIKE(filter_col, aaaa\.com) OR RLIKE(filter_col, aaaa\.es)) THEN A WHEN ((RLIKE(filter_col, bbbb\.com) OR RLIKE(filter_col, bbbb\.es)) OR RLIKE(filter_col, bbbb\.net)) THEN B WHEN RLIKE(filter_col, ccccc\.com) THEN C END'>
## Df with expression applied
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaaa.com| A|
| aaaa.es| A|
| bbbb.com| B|
| zzzz.com| null|
+----------+-------+
我有这样一本字典:
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
我想在执行以下操作的 Spark DataFrame 中添加一列:
(
df
.withColumn(
"new_col",
F.when(
(F.col("filter_col").rlike("aaaa\.com")) |
(F.col("filter_col").rlike("aaaa\.es")),
F.lit("A")
)
.when(
(F.col("filter_col").rlike("bbbb\.com")) |
(F.col("filter_col").rlike("bbbb\.es")) |
(F.col("filter_col").rlike("bbbb\.net")),
F.lit("B")
)
.when(
(F.col("filter_col").rlike("cccc\.com")),
F.lit("C")
)
.otherwise(None)
)
)
但是,当然,我希望它是动态的,这样我就可以将新组件添加到我的字典中,并且该列会自动考虑它们并根据规则添加一个新类别。
这可能吗?
如果您可以更改您的列以便查找完全匹配的内容,您可以使用 df.replace()
:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de'),
Row(filter_col='aaa.es'),
Row(filter_col='bbb.de'),
Row(filter_col='bbb.es'),
])
d = {
'aaa.de': 'A',
'aaa.es': 'A',
'bbb.de': 'B',
'bbb.es': 'B',
}
(
df
.withColumn('new_col', F.col('filter_col'))
.withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
.replace(d, None, subset='new_col')
.show()
)
# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaa.de| A|
| aaa.es| A|
| bbb.de| B|
| bbb.es| B|
| foo| null|
+----------+-------+
可能有一种更高效的方法来用“None”(您的“否则”条件)替换字典中未提及的值。
更新:
如果无法重新格式化,您将不得不遍历您的字典:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de/foo'),
Row(filter_col='aaa.es/foo'),
Row(filter_col='bbb.de/foo'),
Row(filter_col='bbb.es/foo'),
Row(filter_col='foo'),
])
d = {
'aaa\.de': 'A',
'aaa\.es': 'A',
'bbb\.de': 'B',
'bbb\.es': 'B',
}
df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo| A|
|aaa.es/foo| A|
|bbb.de/foo| B|
|bbb.es/foo| B|
| foo| null|
+----------+-------+
您可以通过遍历 dict
来构建列表达式,并将此表达式分配给您的 withColumn
调用。
from pyspark.sql import functions as F
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
data = [("aaaa.com", ), ("aaaa.es", ), ("bbbb.com", ), ("zzzz.com", ), ]
df = spark.createDataFrame(data, ("filter_col", ))
column_expression = F
for k, conditions in sample_dict.items():
condition_expression = F.col("filter_col").rlike(conditions[0])
for condition in conditions[1:]:
condition_expression |= F.col("filter_col").rlike(condition)
column_expression = column_expression.when(condition_expression, F.lit(k))
df.withColumn("new_col", column_expression.otherwise(None)).show()
输出
# column_expression Equivalent to writing the expression by hand
Column<'CASE WHEN (RLIKE(filter_col, aaaa\.com) OR RLIKE(filter_col, aaaa\.es)) THEN A WHEN ((RLIKE(filter_col, bbbb\.com) OR RLIKE(filter_col, bbbb\.es)) OR RLIKE(filter_col, bbbb\.net)) THEN B WHEN RLIKE(filter_col, ccccc\.com) THEN C END'>
## Df with expression applied
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaaa.com| A|
| aaaa.es| A|
| bbbb.com| B|
| zzzz.com| null|
+----------+-------+