如何在 PySpark 中对嵌套 for 循环使用列表理解
How to use list comprehension for nested for loops in PySpark
我打算在以下 PySpark 数据帧上使用 difflib.SequenceMatcher()。
tech.show()
+-----------------------------+----------------------+
| concat_tech |Vendor_product |
+-----------------------------+----------------------+
|AWS Cloud Administration |AWS Cloud Map |
|Grad Portal |CA Identity Portal |
|Html/php |HTML |
|UX Xpect |HP-UX |
|Debian-based |Debian |
|Microsoft Office excel |Microsoft Office |
|Oracle EBusiness Suite 12.2.4|Oracle Primavera Suite|
|Solaris 10.XX |Solaris |
|CA7 Job Scheduler |CA Scheduler |
|Windows NT/XP/Vista |Windows XP |
+-----------------------------+----------------------+
techno.show()
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|vendor |product |category |sub_category |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|Notion Labs, Inc. |Notion |Project Management |Project Management |
|Apptricity Corporation |Apptricity |Enterprise Applications |Enterprise Resource Planning (ERP) |
|Resolution Software, Ltd. |Xcase |IT Governance |Application Development & Management |
|The Apache Software Foundation|Apache Mynewt |IT Governance |Application Development & Management |
|NetApp, Inc. |NetApp iSCSI SAN Storage System|Data Center Solutions |Data Management & Storage (Hardware) |
|HP, Inc. |HP Z820 |Hardware (Basic) |Consumer Electronics, Personal Computers & Software|
|Dell Technologies, Inc. |Dell EMC FormWare |Customer Relationship Management|Help Desk Management |
|ServiceMax, Inc. |ServiceMax |Customer Relationship Management|Service & Field Support Management |
|MaxMind, Inc. |MaxMind GeoIP |Software (Basic) |Server Technologies (Software) |
|Campus Management Corporation |Campus Management |Vertical Markets |Academic & Education Management Software |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
import pandas as pd
from difflib import SequenceMatcher
def similar(a,b):
if pd.isnull(a) or pd.isnull(b):
return 0
else:
return SequenceMatcher(None, a, b).ratio()
函数 SequenceMatcher(a, b) 从数据帧 tech 中获取 concat_tech 作为 'a' 和product 来自数据框 techno 作为 'b' 和 returns a 和 b 之间的比率。目标是在所有 product 和 return 数据框中找到 concat_tech 的最佳匹配最佳匹配,即一个 product 将从 product 列中 returned,为 a 生成最佳(最大)比率concat_tech 使用 SequenceMatcher() 的值。
它应该是一个一对多的操作,可以在 pandas 中使用列表理解来完成,但是如何在 PySpark 中实现相同的操作?我在两个数据框中都有数百万行,这里我给出了 10 个样本。
您正在尝试将数据帧 tech
中的每个元素与数据帧 techno
中的每个元素进行比较。这种操作的结果是 crossJoin。除非此连接的一侧相当小,或者有一种方法可以减少可能组合的数量(从而避免交叉连接),否则这将是一个非常昂贵的操作。
实际代码很简单:进行连接,在 udf 的帮助下计算每对的比率,然后从 tech
:[=18 中找到每个元素的最大值=]
import pandas as pd
from difflib import SequenceMatcher
from pyspark.sql import functions as F
@F.udf("double")
def similar(a,b):
if pd.isnull(a) or pd.isnull(b):
return 0
else:
return SequenceMatcher(None, a, b).ratio()
df = tech.select("concat_tech").crossJoin(techno.select("product")) \
.withColumn("ratio", similar("concat_tech", "product")) \
.groupBy("concat_tech").agg(F.expr("max_by(product, ratio)"), F.max("ratio"))
df.show(truncate=False)
示例数据的输出:
+--------------------------+----------------------+-------------------+
|concat_tech |max_by(product, ratio)|max(ratio) |
+--------------------------+----------------------+-------------------+
|UXXpect |Apptricity |0.35294117647058826|
|GradPortal |Notion |0.25 |
|OracleEBusinessSuite12.2.4|ApacheMynewt |0.3157894736842105 |
|MicrosoftOfficeexcel |ServiceMax |0.3333333333333333 |
|AWSCloudAdministration |Notion |0.35714285714285715|
|CA7JobScheduler |ApacheMynewt |0.37037037037037035|
|Html/php |HPZ820 |0.14285714285714285|
|WindowsNT/XP/Vista |MaxMindGeoIP |0.3333333333333333 |
|Debian-based |Xcase |0.35294117647058826|
|Solaris10.XX |Xcase |0.23529411764705882|
+--------------------------+----------------------+-------------------+
使用 Vectorized UDF 可能会稍微提高性能,但交叉连接对于大型数据帧仍然是个问题。
我打算在以下 PySpark 数据帧上使用 difflib.SequenceMatcher()。
tech.show()
+-----------------------------+----------------------+
| concat_tech |Vendor_product |
+-----------------------------+----------------------+
|AWS Cloud Administration |AWS Cloud Map |
|Grad Portal |CA Identity Portal |
|Html/php |HTML |
|UX Xpect |HP-UX |
|Debian-based |Debian |
|Microsoft Office excel |Microsoft Office |
|Oracle EBusiness Suite 12.2.4|Oracle Primavera Suite|
|Solaris 10.XX |Solaris |
|CA7 Job Scheduler |CA Scheduler |
|Windows NT/XP/Vista |Windows XP |
+-----------------------------+----------------------+
techno.show()
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|vendor |product |category |sub_category |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|Notion Labs, Inc. |Notion |Project Management |Project Management |
|Apptricity Corporation |Apptricity |Enterprise Applications |Enterprise Resource Planning (ERP) |
|Resolution Software, Ltd. |Xcase |IT Governance |Application Development & Management |
|The Apache Software Foundation|Apache Mynewt |IT Governance |Application Development & Management |
|NetApp, Inc. |NetApp iSCSI SAN Storage System|Data Center Solutions |Data Management & Storage (Hardware) |
|HP, Inc. |HP Z820 |Hardware (Basic) |Consumer Electronics, Personal Computers & Software|
|Dell Technologies, Inc. |Dell EMC FormWare |Customer Relationship Management|Help Desk Management |
|ServiceMax, Inc. |ServiceMax |Customer Relationship Management|Service & Field Support Management |
|MaxMind, Inc. |MaxMind GeoIP |Software (Basic) |Server Technologies (Software) |
|Campus Management Corporation |Campus Management |Vertical Markets |Academic & Education Management Software |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
import pandas as pd
from difflib import SequenceMatcher
def similar(a,b):
if pd.isnull(a) or pd.isnull(b):
return 0
else:
return SequenceMatcher(None, a, b).ratio()
函数 SequenceMatcher(a, b) 从数据帧 tech 中获取 concat_tech 作为 'a' 和product 来自数据框 techno 作为 'b' 和 returns a 和 b 之间的比率。目标是在所有 product 和 return 数据框中找到 concat_tech 的最佳匹配最佳匹配,即一个 product 将从 product 列中 returned,为 a 生成最佳(最大)比率concat_tech 使用 SequenceMatcher() 的值。
它应该是一个一对多的操作,可以在 pandas 中使用列表理解来完成,但是如何在 PySpark 中实现相同的操作?我在两个数据框中都有数百万行,这里我给出了 10 个样本。
您正在尝试将数据帧 tech
中的每个元素与数据帧 techno
中的每个元素进行比较。这种操作的结果是 crossJoin。除非此连接的一侧相当小,或者有一种方法可以减少可能组合的数量(从而避免交叉连接),否则这将是一个非常昂贵的操作。
实际代码很简单:进行连接,在 udf 的帮助下计算每对的比率,然后从 tech
:[=18 中找到每个元素的最大值=]
import pandas as pd
from difflib import SequenceMatcher
from pyspark.sql import functions as F
@F.udf("double")
def similar(a,b):
if pd.isnull(a) or pd.isnull(b):
return 0
else:
return SequenceMatcher(None, a, b).ratio()
df = tech.select("concat_tech").crossJoin(techno.select("product")) \
.withColumn("ratio", similar("concat_tech", "product")) \
.groupBy("concat_tech").agg(F.expr("max_by(product, ratio)"), F.max("ratio"))
df.show(truncate=False)
示例数据的输出:
+--------------------------+----------------------+-------------------+
|concat_tech |max_by(product, ratio)|max(ratio) |
+--------------------------+----------------------+-------------------+
|UXXpect |Apptricity |0.35294117647058826|
|GradPortal |Notion |0.25 |
|OracleEBusinessSuite12.2.4|ApacheMynewt |0.3157894736842105 |
|MicrosoftOfficeexcel |ServiceMax |0.3333333333333333 |
|AWSCloudAdministration |Notion |0.35714285714285715|
|CA7JobScheduler |ApacheMynewt |0.37037037037037035|
|Html/php |HPZ820 |0.14285714285714285|
|WindowsNT/XP/Vista |MaxMindGeoIP |0.3333333333333333 |
|Debian-based |Xcase |0.35294117647058826|
|Solaris10.XX |Xcase |0.23529411764705882|
+--------------------------+----------------------+-------------------+
使用 Vectorized UDF 可能会稍微提高性能,但交叉连接对于大型数据帧仍然是个问题。