提高两个大型数据集之间 Fuzzywuzzy 的性能

Improve performance of Fuzzywuzzy between two large datasets

我已经在我的机器上安装了 apache spark 运行 本地。 我的机器有这个规格: 2,3 GHz 四核英特尔酷睿 i7 16 GB 3733 MHz LPDDR4X

所以,我配置了一个本地 spark 会话 -> local[6]。

我的问题是: 我必须根据字符串匹配两个大型 spark 数据集。一个有 240 万条记录,另一个有 38k 条记录。我在 python 3.9 中使用了 Fuzzywuzzy 库,我的代码的性能真的很差。我已经 运行 匹配了 7 个小时,但还没有完成。

我想在较小的数据集中找到最佳匹配来完成大型数据集中的信息。因此,我将要在较小数据集中搜索的大型数据集中的信息连接起来,并应用了 process.extractBests()。我知道有很多比较要做,但我还没有找到任何其他解决方案来尽量减少比较次数。另一方面,我觉得我没有利用并行处理。 你能给我一些建议吗?

header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
                    "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
                    "nr_seats_ad"]

header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
                     "name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat", "id"]

dataset1 = df1.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))

dataset2 = df2 \
    .withColumn("id", F.monotonically_increasing_id()) \
    .withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))

matches = []
list1 = df1.select("header1").rdd.flatMap(lambda x: x).collect()

list2 = df2.select("header2").rdd.flatMap(lambda x: x).collect()

for item1 in list1:
    matches.append((item1,process.extractBests(item1, list2, scorer=fuzz.token_set_ratio, score_cutoff=50, limit=2)[0][0]
                    .split(", ")[-1]))
dfIndexes = sparkSession.createDataFrame(matches).toDF("header1","id")
dfAdsIndexed = df1.join(dfIndexes, on="header1", how="inner").drop("header1")
dfMapping = dfAdsIndexed.join(df2, on="id", how="inner").drop("header2","id")

数据集 1:

name_make_ad name_model_ad prod_year_ad name_fueltype_ad engine_capacity_ccm engine_power_km engine_power_kw name_transmission_ad name_body_ad name_drivetype_ad name_color_ad nr_door_ad nr_seats_ad mileage_ad currency_ad price_ad
fiat freemont 2016 diesel 1956.0 170.0 125.0 automatic suv all-wheel-auto grey 5 7 82000.0 PLN 66500.0
fiat freemont 2015 diesel 1956.0 170.0 125.0 manual suv front-wheel grey 5 7 140000.0 PLN 64900.0
fiat freemont 2013 diesel 1956.0 140.0 103.0 manual suv front-wheel black 5 7 189000.0 PLN 47970.0

数据集2:

name_make name_model modnamegrp2 name_vehtype_et y_modbegin y_modend name_body_et cnt_seat cnt_door name_fueltype pwr_km_base pwr_kw_base pwr_km_hyb pwr_kw_hyb cap_ccm torque_base torque_hyb cnt_hyb name_drivetype_et name_transmission_et
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Petrol 170 125 0 0 2360 220 0 0 Front wheel drive Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Petrol 280 206 0 0 3605 342 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 5 5 Petrol 170 125 0 0 2360 220 0 0 Front wheel drive Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 Front wheel drive Manual gearbox
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 140 103 0 0 1956 350 0 0 Front wheel drive Manual gearbox

所以,我必须找出数据集 1 中的每条记录,基于相似性在数据集 2 上的最佳匹配是什么。

此示例的输出为:

A B C D E F G H I J K L M N O P Q R S T
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 Front wheel drive Manual gearbox
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 140 103 0 0 1956 350 0 0 Front wheel drive Manual gearbox

collect 和后续的 for 循环没有被 Spark 并行化。因此,性能下降。您可以在 df1df2 上应用笛卡尔连接,然后调用 udf 对每一行进行评分。最后可以根据分数排序,选择topn行。


from pyspark.sql import functions as F
from pyspark.sql import Window
from fuzzywuzzy import process, fuzz
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

cutoff = 50
max_matches = 1

@udf(returnType=DoubleType()) 
def scorer(query: str, choice: str):
    match_score = list(process.extractWithoutOrder(query, [choice], scorer=fuzz.token_set_ratio, score_cutoff=cutoff))
    if len(match_score) == 0:
        return 0.0
    return float(match_score[0][1])

df_1_data = [("fiat", "freemont", 2016, "diesel", 1956.0, 170.0, 125.0, "automatic", "suv", "all-wheel-auto", "grey", 5, 7, 82000.0, "PLN", 66500.0),
("fiat", "freemont", 2015, "diesel", 1956.0, 170.0, 125.0, "manual", "suv", "front-wheel", "grey", 5, 7, 140000.0, "PLN", 64900.0),
("fiat", "freemont", 2013, "diesel", 1956.0, 140.0, 103.0, "manual", "suv", "front-wheel", "lack", 5, 7, 189000.0, "PLN", 47970.0),]

df_2_data = [("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 280, 206, 0, 0, 3605, 342, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 5, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 140, 103, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),]


header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
                    "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
                    "nr_seats_ad"]

header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
                     "name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat"]

df1 = spark.createDataFrame(df_1_data, ("name_make_ad", "name_model_ad", "prod_year_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km", "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "name_color_ad", "nr_door_ad", "nr_seats_ad", "mileage_ad", "currency_ad", "price_ad",))

df2 = spark.createDataFrame(df_2_data, ("name_make", "name_model", "modnamegrp2", "name_vehtype_et", "y_modbegin", "y_modend", "name_body_et", "cnt_seat", "cnt_door", "name_fueltype", "pwr_km_base", "pwr_kw_base", "pwr_km_hyb", "pwr_kw_hyb", "cap_ccm", "torque_base", "torque_hyb", "cnt_hyb",  "name_drivetype_et",  "name_transmission_et",))

dataset1 = df1.withColumn("id", F.monotonically_increasing_id())\
              .withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))

dataset2 = df2.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))

combined_df = F.broadcast(dataset1).crossJoin(dataset2)

df_score_matching_cutoff = combined_df.withColumn("fuzz_score", scorer(col("concatenated1"), col("concatenated2")))\
                      .filter(col("fuzz_score") >= cutoff)

window_spec = Window.partitionBy("id").orderBy(F.desc("fuzz_score"))

results = df_score_matching_cutoff.withColumn("rn", F.row_number().over(window_spec))\
                        .filter(col("rn") <= max_matches)

results.show(truncate=False)

输出

+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|name_make_ad|name_model_ad|prod_year_ad|name_fueltype_ad|engine_capacity_ccm|engine_power_km|engine_power_kw|name_transmission_ad|name_body_ad|name_drivetype_ad|name_color_ad|nr_door_ad|nr_seats_ad|mileage_ad|currency_ad|price_ad|id |concatenated1                                                                     |name_make|name_model|modnamegrp2|name_vehtype_et|y_modbegin|y_modend|name_body_et|cnt_seat|cnt_door|name_fueltype|pwr_km_base|pwr_kw_base|pwr_km_hyb|pwr_kw_hyb|cap_ccm|torque_base|torque_hyb|cnt_hyb|name_drivetype_et    |name_transmission_et  |concatenated2                                                                                   |fuzz_score|rn |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|fiat        |freemont     |2016        |diesel          |1956.0             |170.0          |125.0          |automatic           |suv         |all-wheel-auto   |grey         |5         |7          |82000.0   |PLN        |66500.0 |0  |fiat, freemont, diesel, 1956.0, 170.0, 125.0, automatic, suv, all-wheel-auto, 5, 7|FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |170        |125        |0         |0         |1956   |350        |0         |0      |4 wheel drive general|Automatic transmission|FIAT, Freemont, Diesel, 1956, 170, 125, Automatic transmission, Van, 4 wheel drive general, 5, 7|88.0      |1  |
|fiat        |freemont     |2015        |diesel          |1956.0             |170.0          |125.0          |manual              |suv         |front-wheel      |grey         |5         |7          |140000.0  |PLN        |64900.0 |1  |fiat, freemont, diesel, 1956.0, 170.0, 125.0, manual, suv, front-wheel, 5, 7      |FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |170        |125        |0         |0         |1956   |350        |0         |0      |Front wheel drive    |Manual gearbox        |FIAT, Freemont, Diesel, 1956, 170, 125, Manual gearbox, Van, Front wheel drive, 5, 7            |95.0      |1  |
|fiat        |freemont     |2013        |diesel          |1956.0             |140.0          |103.0          |manual              |suv         |front-wheel      |lack         |5         |7          |189000.0  |PLN        |47970.0 |2  |fiat, freemont, diesel, 1956.0, 140.0, 103.0, manual, suv, front-wheel, 5, 7      |FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |140        |103        |0         |0         |1956   |350        |0         |0      |Front wheel drive    |Manual gearbox        |FIAT, Freemont, Diesel, 1956, 140, 103, Manual gearbox, Van, Front wheel drive, 5, 7            |95.0      |1  |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+