将数据帧行与逐行模式条件匹配并添加数组列
Match dataframe rows with row-wise pattern conditions and add array column
假设您有一个数据框,并且您想要通过添加新的 pattern_name
列来过滤掉按行模式。
pattern_name
列的类型应该是数组,因为每一行都可能匹配多个模式。
# Input
df = spark.createDataFrame(
[(1, 21, 'A foo text'),
(2, 42, 'A foo'),
(2, 42, 'A foobar text'),
(2, 42, 'barz'),],
['id_1', 'id_2', 'text']
)
# Patterns:
# pattern_foo_1: id_1 = 1, id_2 = 21, text.rlike('foo')
# pattern_foo_2: id_1 = 2, id_2 = 42, text.rlike('foo')
# pattern_foobar: id_1 = 2, id_2 = 42, text.rlike('foobar')
# Desired output: (null can also be empty string, doesn't matter)
+------+------+----------------+------------------------------------+
| id_1| id_2| text| pattern_name|
+------+------+----------------+------------------------------------+
| 1| 21| 'A foo text'| ['pattern_foo_1', ]|
| 2| 42| 'A foo'| ['pattern_foo_2', ]|
| 2| 42| 'A foobar text'| ['pattern_foo_2', 'pattern_foobar']|
| 2| 42| 'barz'| null|
+------+------+----------------+------------------------------------+
你如何以有效的方式(没有 udf)做到这一点,因为输入非常大?
过去,我的 df 每行最多只有一个匹配项,所以我使用了 when 函数(如下例)。但是,如果您每行有多个匹配项,那么这将不起作用,您需要一个数组。
pattern_name_col = None
for pattern in pattern_list:
if pattern_name_col is None:
# pseudocode example
pattern_name_col = when(
(col('id_1') == 1) & (col('id_2') == 21)
& (col('text').rlike('foo')),
'pattern_foo_1')
else:
pattern_name_col = pattern_name_col.when(..., ...)
df = df.withColumn('pattern_name', pattern_name_col).filter(col('pattern_name').isNotNull())
您可以将 patterns
列表定义为:
patterns = [
(1, 21, "foo", "pattern_foo_1"), # (id_1, id_2, pattern, pattern_name)
(2, 42, "foo", "pattern_foo_2"),
(2, 42, "foobar", "pattern_foobar"),
]
然后使用带有列表理解的 array
函数和 when
您可以获得模式名称的列表列:
import pyspark.sql.functions as F
df1 = df.withColumn(
"pattern_name",
F.array(*[
F.when((F.col("id_1") == p[0]) & (F.col("id_2") == p[1]) & F.col("text").rlike(p[2]), p[3])
for p in patterns
])
).withColumn(
"pattern_name",
F.expr("filter(pattern_name, x -> x is not null)")
)
df1.show(truncate=False)
#+----+----+-------------+-------------------------------+
#|id_1|id_2|text |pattern_name |
#+----+----+-------------+-------------------------------+
#|1 |21 |A foo text |[pattern_foo_1] |
#|2 |42 |A foo |[pattern_foo_2] |
#|2 |42 |A foobar text|[pattern_foo_2, pattern_foobar]|
#|2 |42 |barz |[] |
#+----+----+-------------+-------------------------------+
您还可以从上面的列表中创建一个 patterns_df
数据框,然后使用 join 然后使用 goupby + collect_list:
patterns_df = spark.createDataFrame(patterns, ["id_1", "id_2", "pattern", "pattern_name"])
df1 = df.alias("df").join(
patterns_df.alias("p"),
F.expr("df.id_1 = p.id_1 and df.id_2 = p.id_2 and df.text rlike p.pattern")
).groupBy("df.id_1", "df.id_2", "text").agg(
F.collect_list("pattern_name").alias("pattern_name")
)
假设您有一个数据框,并且您想要通过添加新的 pattern_name
列来过滤掉按行模式。
pattern_name
列的类型应该是数组,因为每一行都可能匹配多个模式。
# Input
df = spark.createDataFrame(
[(1, 21, 'A foo text'),
(2, 42, 'A foo'),
(2, 42, 'A foobar text'),
(2, 42, 'barz'),],
['id_1', 'id_2', 'text']
)
# Patterns:
# pattern_foo_1: id_1 = 1, id_2 = 21, text.rlike('foo')
# pattern_foo_2: id_1 = 2, id_2 = 42, text.rlike('foo')
# pattern_foobar: id_1 = 2, id_2 = 42, text.rlike('foobar')
# Desired output: (null can also be empty string, doesn't matter)
+------+------+----------------+------------------------------------+
| id_1| id_2| text| pattern_name|
+------+------+----------------+------------------------------------+
| 1| 21| 'A foo text'| ['pattern_foo_1', ]|
| 2| 42| 'A foo'| ['pattern_foo_2', ]|
| 2| 42| 'A foobar text'| ['pattern_foo_2', 'pattern_foobar']|
| 2| 42| 'barz'| null|
+------+------+----------------+------------------------------------+
你如何以有效的方式(没有 udf)做到这一点,因为输入非常大?
过去,我的 df 每行最多只有一个匹配项,所以我使用了 when 函数(如下例)。但是,如果您每行有多个匹配项,那么这将不起作用,您需要一个数组。
pattern_name_col = None
for pattern in pattern_list:
if pattern_name_col is None:
# pseudocode example
pattern_name_col = when(
(col('id_1') == 1) & (col('id_2') == 21)
& (col('text').rlike('foo')),
'pattern_foo_1')
else:
pattern_name_col = pattern_name_col.when(..., ...)
df = df.withColumn('pattern_name', pattern_name_col).filter(col('pattern_name').isNotNull())
您可以将 patterns
列表定义为:
patterns = [
(1, 21, "foo", "pattern_foo_1"), # (id_1, id_2, pattern, pattern_name)
(2, 42, "foo", "pattern_foo_2"),
(2, 42, "foobar", "pattern_foobar"),
]
然后使用带有列表理解的 array
函数和 when
您可以获得模式名称的列表列:
import pyspark.sql.functions as F
df1 = df.withColumn(
"pattern_name",
F.array(*[
F.when((F.col("id_1") == p[0]) & (F.col("id_2") == p[1]) & F.col("text").rlike(p[2]), p[3])
for p in patterns
])
).withColumn(
"pattern_name",
F.expr("filter(pattern_name, x -> x is not null)")
)
df1.show(truncate=False)
#+----+----+-------------+-------------------------------+
#|id_1|id_2|text |pattern_name |
#+----+----+-------------+-------------------------------+
#|1 |21 |A foo text |[pattern_foo_1] |
#|2 |42 |A foo |[pattern_foo_2] |
#|2 |42 |A foobar text|[pattern_foo_2, pattern_foobar]|
#|2 |42 |barz |[] |
#+----+----+-------------+-------------------------------+
您还可以从上面的列表中创建一个 patterns_df
数据框,然后使用 join 然后使用 goupby + collect_list:
patterns_df = spark.createDataFrame(patterns, ["id_1", "id_2", "pattern", "pattern_name"])
df1 = df.alias("df").join(
patterns_df.alias("p"),
F.expr("df.id_1 = p.id_1 and df.id_2 = p.id_2 and df.text rlike p.pattern")
).groupBy("df.id_1", "df.id_2", "text").agg(
F.collect_list("pattern_name").alias("pattern_name")
)