如何在 PySpark 中对嵌套 for 循环使用列表理解

How to use list comprehension for nested for loops in PySpark

我打算在以下 PySpark 数据帧上使用 difflib.SequenceMatcher()。

tech.show()

+-----------------------------+----------------------+
|        concat_tech          |Vendor_product        |
+-----------------------------+----------------------+
|AWS Cloud Administration     |AWS Cloud Map         |
|Grad Portal                  |CA Identity Portal    |
|Html/php                     |HTML                  |
|UX Xpect                     |HP-UX                 |
|Debian-based                 |Debian                |
|Microsoft Office excel       |Microsoft Office      |
|Oracle EBusiness Suite 12.2.4|Oracle Primavera Suite|
|Solaris 10.XX                |Solaris               |
|CA7 Job Scheduler            |CA Scheduler          |
|Windows NT/XP/Vista          |Windows XP            |
+-----------------------------+----------------------+

techno.show()
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|vendor                        |product                        |category                        |sub_category                                       |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|Notion Labs, Inc.             |Notion                         |Project Management              |Project Management                                 |
|Apptricity Corporation        |Apptricity                     |Enterprise Applications         |Enterprise Resource Planning (ERP)                 |
|Resolution Software, Ltd.     |Xcase                          |IT Governance                   |Application Development & Management               |
|The Apache Software Foundation|Apache Mynewt                  |IT Governance                   |Application Development & Management               |
|NetApp, Inc.                  |NetApp iSCSI SAN Storage System|Data Center Solutions           |Data Management & Storage (Hardware)               |
|HP, Inc.                      |HP Z820                        |Hardware (Basic)                |Consumer Electronics, Personal Computers & Software|
|Dell Technologies, Inc.       |Dell EMC FormWare              |Customer Relationship Management|Help Desk Management                               |
|ServiceMax, Inc.              |ServiceMax                     |Customer Relationship Management|Service & Field Support Management                 |
|MaxMind, Inc.                 |MaxMind GeoIP                  |Software (Basic)                |Server Technologies (Software)                     |
|Campus Management Corporation |Campus Management              |Vertical Markets                |Academic & Education Management Software           |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
import pandas as pd
from difflib import SequenceMatcher
def similar(a,b):
    if pd.isnull(a) or pd.isnull(b):
        return 0
    else:
        return SequenceMatcher(None, a, b).ratio()

函数 SequenceMatcher(a, b) 从数据帧 tech 中获取 concat_tech 作为 'a' 和product 来自数据框 techno 作为 'b' 和 returns a 和 b 之间的比率。目标是在所有 product 和 return 数据框中找到 concat_tech 的最佳匹配最佳匹配,即一个 product 将从 product 列中 returned,为 a 生成最佳(最大)比率concat_tech 使用 SequenceMatcher() 的值。

它应该是一个一对多的操作,可以在 pandas 中使用列表理解来完成,但是如何在 PySpark 中实现相同的操作?我在两个数据框中都有数百万行,这里我给出了 10 个样本。

您正在尝试将数据帧 tech 中的每个元素与数据帧 techno 中的每个元素进行比较。这种操作的结果是 crossJoin。除非此连接的一侧相当小,或者有一种方法可以减少可能组合的数量(从而避免交叉连接),否则这将是一个非常昂贵的操作。

实际代码很简单:进行连接,在 udf 的帮助下计算每对的比率,然后从 tech:[=18 中找到每个元素的最大值=]

import pandas as pd
from difflib import SequenceMatcher
from pyspark.sql import functions as F

@F.udf("double")
def similar(a,b):
    if pd.isnull(a) or pd.isnull(b):
        return 0
    else:
        return SequenceMatcher(None, a, b).ratio()

df = tech.select("concat_tech").crossJoin(techno.select("product")) \
    .withColumn("ratio", similar("concat_tech", "product")) \
    .groupBy("concat_tech").agg(F.expr("max_by(product, ratio)"), F.max("ratio"))
df.show(truncate=False)

示例数据的输出:

+--------------------------+----------------------+-------------------+         
|concat_tech               |max_by(product, ratio)|max(ratio)         |
+--------------------------+----------------------+-------------------+
|UXXpect                   |Apptricity            |0.35294117647058826|
|GradPortal                |Notion                |0.25               |
|OracleEBusinessSuite12.2.4|ApacheMynewt          |0.3157894736842105 |
|MicrosoftOfficeexcel      |ServiceMax            |0.3333333333333333 |
|AWSCloudAdministration    |Notion                |0.35714285714285715|
|CA7JobScheduler           |ApacheMynewt          |0.37037037037037035|
|Html/php                  |HPZ820                |0.14285714285714285|
|WindowsNT/XP/Vista        |MaxMindGeoIP          |0.3333333333333333 |
|Debian-based              |Xcase                 |0.35294117647058826|
|Solaris10.XX              |Xcase                 |0.23529411764705882|
+--------------------------+----------------------+-------------------+

使用 Vectorized UDF 可能会稍微提高性能,但交叉连接对于大型数据帧仍然是个问题。