提高两个大型数据集之间 Fuzzywuzzy 的性能
Improve performance of Fuzzywuzzy between two large datasets
我已经在我的机器上安装了 apache spark 运行 本地。
我的机器有这个规格:
2,3 GHz 四核英特尔酷睿 i7
16 GB 3733 MHz LPDDR4X
所以,我配置了一个本地 spark 会话 -> local[6]。
我的问题是:
我必须根据字符串匹配两个大型 spark 数据集。一个有 240 万条记录,另一个有 38k 条记录。我在 python 3.9 中使用了 Fuzzywuzzy 库,我的代码的性能真的很差。我已经 运行 匹配了 7 个小时,但还没有完成。
我想在较小的数据集中找到最佳匹配来完成大型数据集中的信息。因此,我将要在较小数据集中搜索的大型数据集中的信息连接起来,并应用了 process.extractBests()。我知道有很多比较要做,但我还没有找到任何其他解决方案来尽量减少比较次数。另一方面,我觉得我没有利用并行处理。
你能给我一些建议吗?
header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
"engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
"nr_seats_ad"]
header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
"name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat", "id"]
dataset1 = df1.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))
dataset2 = df2 \
.withColumn("id", F.monotonically_increasing_id()) \
.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))
matches = []
list1 = df1.select("header1").rdd.flatMap(lambda x: x).collect()
list2 = df2.select("header2").rdd.flatMap(lambda x: x).collect()
for item1 in list1:
matches.append((item1,process.extractBests(item1, list2, scorer=fuzz.token_set_ratio, score_cutoff=50, limit=2)[0][0]
.split(", ")[-1]))
dfIndexes = sparkSession.createDataFrame(matches).toDF("header1","id")
dfAdsIndexed = df1.join(dfIndexes, on="header1", how="inner").drop("header1")
dfMapping = dfAdsIndexed.join(df2, on="id", how="inner").drop("header2","id")
数据集 1:
name_make_ad
name_model_ad
prod_year_ad
name_fueltype_ad
engine_capacity_ccm
engine_power_km
engine_power_kw
name_transmission_ad
name_body_ad
name_drivetype_ad
name_color_ad
nr_door_ad
nr_seats_ad
mileage_ad
currency_ad
price_ad
fiat
freemont
2016
diesel
1956.0
170.0
125.0
automatic
suv
all-wheel-auto
grey
5
7
82000.0
PLN
66500.0
fiat
freemont
2015
diesel
1956.0
170.0
125.0
manual
suv
front-wheel
grey
5
7
140000.0
PLN
64900.0
fiat
freemont
2013
diesel
1956.0
140.0
103.0
manual
suv
front-wheel
black
5
7
189000.0
PLN
47970.0
数据集2:
name_make
name_model
modnamegrp2
name_vehtype_et
y_modbegin
y_modend
name_body_et
cnt_seat
cnt_door
name_fueltype
pwr_km_base
pwr_kw_base
pwr_km_hyb
pwr_kw_hyb
cap_ccm
torque_base
torque_hyb
cnt_hyb
name_drivetype_et
name_transmission_et
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Petrol
170
125
0
0
2360
220
0
0
Front wheel drive
Automatic transmission
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Petrol
280
206
0
0
3605
342
0
0
4 wheel drive general
Automatic transmission
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
170
125
0
0
1956
350
0
0
4 wheel drive general
Automatic transmission
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
5
5
Petrol
170
125
0
0
2360
220
0
0
Front wheel drive
Automatic transmission
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
170
125
0
0
1956
350
0
0
Front wheel drive
Manual gearbox
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
140
103
0
0
1956
350
0
0
Front wheel drive
Manual gearbox
所以,我必须找出数据集 1 中的每条记录,基于相似性在数据集 2 上的最佳匹配是什么。
此示例的输出为:
A
B
C
D
E
F
G
H
I
J
K
L
M
N
O
P
Q
R
S
T
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
170
125
0
0
1956
350
0
0
4 wheel drive general
Automatic transmission
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
170
125
0
0
1956
350
0
0
Front wheel drive
Manual gearbox
FIAT
Freemont
Freemont
Passenger Car
2011
2016
Van
7
5
Diesel
140
103
0
0
1956
350
0
0
Front wheel drive
Manual gearbox
collect
和后续的 for
循环没有被 Spark 并行化。因此,性能下降。您可以在 df1
和 df2
上应用笛卡尔连接,然后调用 udf
对每一行进行评分。最后可以根据分数排序,选择top
n行。
from pyspark.sql import functions as F
from pyspark.sql import Window
from fuzzywuzzy import process, fuzz
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
cutoff = 50
max_matches = 1
@udf(returnType=DoubleType())
def scorer(query: str, choice: str):
match_score = list(process.extractWithoutOrder(query, [choice], scorer=fuzz.token_set_ratio, score_cutoff=cutoff))
if len(match_score) == 0:
return 0.0
return float(match_score[0][1])
df_1_data = [("fiat", "freemont", 2016, "diesel", 1956.0, 170.0, 125.0, "automatic", "suv", "all-wheel-auto", "grey", 5, 7, 82000.0, "PLN", 66500.0),
("fiat", "freemont", 2015, "diesel", 1956.0, 170.0, 125.0, "manual", "suv", "front-wheel", "grey", 5, 7, 140000.0, "PLN", 64900.0),
("fiat", "freemont", 2013, "diesel", 1956.0, 140.0, 103.0, "manual", "suv", "front-wheel", "lack", 5, 7, 189000.0, "PLN", 47970.0),]
df_2_data = [("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 280, 206, 0, 0, 3605, 342, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 5, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 140, 103, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),]
header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
"engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
"nr_seats_ad"]
header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
"name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat"]
df1 = spark.createDataFrame(df_1_data, ("name_make_ad", "name_model_ad", "prod_year_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km", "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "name_color_ad", "nr_door_ad", "nr_seats_ad", "mileage_ad", "currency_ad", "price_ad",))
df2 = spark.createDataFrame(df_2_data, ("name_make", "name_model", "modnamegrp2", "name_vehtype_et", "y_modbegin", "y_modend", "name_body_et", "cnt_seat", "cnt_door", "name_fueltype", "pwr_km_base", "pwr_kw_base", "pwr_km_hyb", "pwr_kw_hyb", "cap_ccm", "torque_base", "torque_hyb", "cnt_hyb", "name_drivetype_et", "name_transmission_et",))
dataset1 = df1.withColumn("id", F.monotonically_increasing_id())\
.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))
dataset2 = df2.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))
combined_df = F.broadcast(dataset1).crossJoin(dataset2)
df_score_matching_cutoff = combined_df.withColumn("fuzz_score", scorer(col("concatenated1"), col("concatenated2")))\
.filter(col("fuzz_score") >= cutoff)
window_spec = Window.partitionBy("id").orderBy(F.desc("fuzz_score"))
results = df_score_matching_cutoff.withColumn("rn", F.row_number().over(window_spec))\
.filter(col("rn") <= max_matches)
results.show(truncate=False)
输出
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|name_make_ad|name_model_ad|prod_year_ad|name_fueltype_ad|engine_capacity_ccm|engine_power_km|engine_power_kw|name_transmission_ad|name_body_ad|name_drivetype_ad|name_color_ad|nr_door_ad|nr_seats_ad|mileage_ad|currency_ad|price_ad|id |concatenated1 |name_make|name_model|modnamegrp2|name_vehtype_et|y_modbegin|y_modend|name_body_et|cnt_seat|cnt_door|name_fueltype|pwr_km_base|pwr_kw_base|pwr_km_hyb|pwr_kw_hyb|cap_ccm|torque_base|torque_hyb|cnt_hyb|name_drivetype_et |name_transmission_et |concatenated2 |fuzz_score|rn |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|fiat |freemont |2016 |diesel |1956.0 |170.0 |125.0 |automatic |suv |all-wheel-auto |grey |5 |7 |82000.0 |PLN |66500.0 |0 |fiat, freemont, diesel, 1956.0, 170.0, 125.0, automatic, suv, all-wheel-auto, 5, 7|FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |170 |125 |0 |0 |1956 |350 |0 |0 |4 wheel drive general|Automatic transmission|FIAT, Freemont, Diesel, 1956, 170, 125, Automatic transmission, Van, 4 wheel drive general, 5, 7|88.0 |1 |
|fiat |freemont |2015 |diesel |1956.0 |170.0 |125.0 |manual |suv |front-wheel |grey |5 |7 |140000.0 |PLN |64900.0 |1 |fiat, freemont, diesel, 1956.0, 170.0, 125.0, manual, suv, front-wheel, 5, 7 |FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |170 |125 |0 |0 |1956 |350 |0 |0 |Front wheel drive |Manual gearbox |FIAT, Freemont, Diesel, 1956, 170, 125, Manual gearbox, Van, Front wheel drive, 5, 7 |95.0 |1 |
|fiat |freemont |2013 |diesel |1956.0 |140.0 |103.0 |manual |suv |front-wheel |lack |5 |7 |189000.0 |PLN |47970.0 |2 |fiat, freemont, diesel, 1956.0, 140.0, 103.0, manual, suv, front-wheel, 5, 7 |FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |140 |103 |0 |0 |1956 |350 |0 |0 |Front wheel drive |Manual gearbox |FIAT, Freemont, Diesel, 1956, 140, 103, Manual gearbox, Van, Front wheel drive, 5, 7 |95.0 |1 |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
我已经在我的机器上安装了 apache spark 运行 本地。 我的机器有这个规格: 2,3 GHz 四核英特尔酷睿 i7 16 GB 3733 MHz LPDDR4X
所以,我配置了一个本地 spark 会话 -> local[6]。
我的问题是: 我必须根据字符串匹配两个大型 spark 数据集。一个有 240 万条记录,另一个有 38k 条记录。我在 python 3.9 中使用了 Fuzzywuzzy 库,我的代码的性能真的很差。我已经 运行 匹配了 7 个小时,但还没有完成。
我想在较小的数据集中找到最佳匹配来完成大型数据集中的信息。因此,我将要在较小数据集中搜索的大型数据集中的信息连接起来,并应用了 process.extractBests()。我知道有很多比较要做,但我还没有找到任何其他解决方案来尽量减少比较次数。另一方面,我觉得我没有利用并行处理。 你能给我一些建议吗?
header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
"engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
"nr_seats_ad"]
header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
"name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat", "id"]
dataset1 = df1.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))
dataset2 = df2 \
.withColumn("id", F.monotonically_increasing_id()) \
.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))
matches = []
list1 = df1.select("header1").rdd.flatMap(lambda x: x).collect()
list2 = df2.select("header2").rdd.flatMap(lambda x: x).collect()
for item1 in list1:
matches.append((item1,process.extractBests(item1, list2, scorer=fuzz.token_set_ratio, score_cutoff=50, limit=2)[0][0]
.split(", ")[-1]))
dfIndexes = sparkSession.createDataFrame(matches).toDF("header1","id")
dfAdsIndexed = df1.join(dfIndexes, on="header1", how="inner").drop("header1")
dfMapping = dfAdsIndexed.join(df2, on="id", how="inner").drop("header2","id")
数据集 1:
name_make_ad | name_model_ad | prod_year_ad | name_fueltype_ad | engine_capacity_ccm | engine_power_km | engine_power_kw | name_transmission_ad | name_body_ad | name_drivetype_ad | name_color_ad | nr_door_ad | nr_seats_ad | mileage_ad | currency_ad | price_ad |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
fiat | freemont | 2016 | diesel | 1956.0 | 170.0 | 125.0 | automatic | suv | all-wheel-auto | grey | 5 | 7 | 82000.0 | PLN | 66500.0 |
fiat | freemont | 2015 | diesel | 1956.0 | 170.0 | 125.0 | manual | suv | front-wheel | grey | 5 | 7 | 140000.0 | PLN | 64900.0 |
fiat | freemont | 2013 | diesel | 1956.0 | 140.0 | 103.0 | manual | suv | front-wheel | black | 5 | 7 | 189000.0 | PLN | 47970.0 |
数据集2:
name_make | name_model | modnamegrp2 | name_vehtype_et | y_modbegin | y_modend | name_body_et | cnt_seat | cnt_door | name_fueltype | pwr_km_base | pwr_kw_base | pwr_km_hyb | pwr_kw_hyb | cap_ccm | torque_base | torque_hyb | cnt_hyb | name_drivetype_et | name_transmission_et |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Petrol | 170 | 125 | 0 | 0 | 2360 | 220 | 0 | 0 | Front wheel drive | Automatic transmission |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Petrol | 280 | 206 | 0 | 0 | 3605 | 342 | 0 | 0 | 4 wheel drive general | Automatic transmission |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 170 | 125 | 0 | 0 | 1956 | 350 | 0 | 0 | 4 wheel drive general | Automatic transmission |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 5 | 5 | Petrol | 170 | 125 | 0 | 0 | 2360 | 220 | 0 | 0 | Front wheel drive | Automatic transmission |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 170 | 125 | 0 | 0 | 1956 | 350 | 0 | 0 | Front wheel drive | Manual gearbox |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 140 | 103 | 0 | 0 | 1956 | 350 | 0 | 0 | Front wheel drive | Manual gearbox |
所以,我必须找出数据集 1 中的每条记录,基于相似性在数据集 2 上的最佳匹配是什么。
此示例的输出为:
A | B | C | D | E | F | G | H | I | J | K | L | M | N | O | P | Q | R | S | T |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 170 | 125 | 0 | 0 | 1956 | 350 | 0 | 0 | 4 wheel drive general | Automatic transmission |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 170 | 125 | 0 | 0 | 1956 | 350 | 0 | 0 | Front wheel drive | Manual gearbox |
FIAT | Freemont | Freemont | Passenger Car | 2011 | 2016 | Van | 7 | 5 | Diesel | 140 | 103 | 0 | 0 | 1956 | 350 | 0 | 0 | Front wheel drive | Manual gearbox |
collect
和后续的 for
循环没有被 Spark 并行化。因此,性能下降。您可以在 df1
和 df2
上应用笛卡尔连接,然后调用 udf
对每一行进行评分。最后可以根据分数排序,选择top
n行。
from pyspark.sql import functions as F
from pyspark.sql import Window
from fuzzywuzzy import process, fuzz
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
cutoff = 50
max_matches = 1
@udf(returnType=DoubleType())
def scorer(query: str, choice: str):
match_score = list(process.extractWithoutOrder(query, [choice], scorer=fuzz.token_set_ratio, score_cutoff=cutoff))
if len(match_score) == 0:
return 0.0
return float(match_score[0][1])
df_1_data = [("fiat", "freemont", 2016, "diesel", 1956.0, 170.0, 125.0, "automatic", "suv", "all-wheel-auto", "grey", 5, 7, 82000.0, "PLN", 66500.0),
("fiat", "freemont", 2015, "diesel", 1956.0, 170.0, 125.0, "manual", "suv", "front-wheel", "grey", 5, 7, 140000.0, "PLN", 64900.0),
("fiat", "freemont", 2013, "diesel", 1956.0, 140.0, 103.0, "manual", "suv", "front-wheel", "lack", 5, 7, 189000.0, "PLN", 47970.0),]
df_2_data = [("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 280, 206, 0, 0, 3605, 342, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 5, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 140, 103, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),]
header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
"engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
"nr_seats_ad"]
header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
"name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat"]
df1 = spark.createDataFrame(df_1_data, ("name_make_ad", "name_model_ad", "prod_year_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km", "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "name_color_ad", "nr_door_ad", "nr_seats_ad", "mileage_ad", "currency_ad", "price_ad",))
df2 = spark.createDataFrame(df_2_data, ("name_make", "name_model", "modnamegrp2", "name_vehtype_et", "y_modbegin", "y_modend", "name_body_et", "cnt_seat", "cnt_door", "name_fueltype", "pwr_km_base", "pwr_kw_base", "pwr_km_hyb", "pwr_kw_hyb", "cap_ccm", "torque_base", "torque_hyb", "cnt_hyb", "name_drivetype_et", "name_transmission_et",))
dataset1 = df1.withColumn("id", F.monotonically_increasing_id())\
.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))
dataset2 = df2.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))
combined_df = F.broadcast(dataset1).crossJoin(dataset2)
df_score_matching_cutoff = combined_df.withColumn("fuzz_score", scorer(col("concatenated1"), col("concatenated2")))\
.filter(col("fuzz_score") >= cutoff)
window_spec = Window.partitionBy("id").orderBy(F.desc("fuzz_score"))
results = df_score_matching_cutoff.withColumn("rn", F.row_number().over(window_spec))\
.filter(col("rn") <= max_matches)
results.show(truncate=False)
输出
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|name_make_ad|name_model_ad|prod_year_ad|name_fueltype_ad|engine_capacity_ccm|engine_power_km|engine_power_kw|name_transmission_ad|name_body_ad|name_drivetype_ad|name_color_ad|nr_door_ad|nr_seats_ad|mileage_ad|currency_ad|price_ad|id |concatenated1 |name_make|name_model|modnamegrp2|name_vehtype_et|y_modbegin|y_modend|name_body_et|cnt_seat|cnt_door|name_fueltype|pwr_km_base|pwr_kw_base|pwr_km_hyb|pwr_kw_hyb|cap_ccm|torque_base|torque_hyb|cnt_hyb|name_drivetype_et |name_transmission_et |concatenated2 |fuzz_score|rn |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|fiat |freemont |2016 |diesel |1956.0 |170.0 |125.0 |automatic |suv |all-wheel-auto |grey |5 |7 |82000.0 |PLN |66500.0 |0 |fiat, freemont, diesel, 1956.0, 170.0, 125.0, automatic, suv, all-wheel-auto, 5, 7|FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |170 |125 |0 |0 |1956 |350 |0 |0 |4 wheel drive general|Automatic transmission|FIAT, Freemont, Diesel, 1956, 170, 125, Automatic transmission, Van, 4 wheel drive general, 5, 7|88.0 |1 |
|fiat |freemont |2015 |diesel |1956.0 |170.0 |125.0 |manual |suv |front-wheel |grey |5 |7 |140000.0 |PLN |64900.0 |1 |fiat, freemont, diesel, 1956.0, 170.0, 125.0, manual, suv, front-wheel, 5, 7 |FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |170 |125 |0 |0 |1956 |350 |0 |0 |Front wheel drive |Manual gearbox |FIAT, Freemont, Diesel, 1956, 170, 125, Manual gearbox, Van, Front wheel drive, 5, 7 |95.0 |1 |
|fiat |freemont |2013 |diesel |1956.0 |140.0 |103.0 |manual |suv |front-wheel |lack |5 |7 |189000.0 |PLN |47970.0 |2 |fiat, freemont, diesel, 1956.0, 140.0, 103.0, manual, suv, front-wheel, 5, 7 |FIAT |Freemont |Freemont |Passenger Car |2011 |2016 |Van |7 |5 |Diesel |140 |103 |0 |0 |1956 |350 |0 |0 |Front wheel drive |Manual gearbox |FIAT, Freemont, Diesel, 1956, 140, 103, Manual gearbox, Van, Front wheel drive, 5, 7 |95.0 |1 |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+