基于 PySpark 的内联正则表达式匹配方法,如 Pandas

PySpark based approach to inline regex matching like Pandas

我有一个在 Pandas 中运行良好的代码片段,但是我的数据量很大并且 Pandas 消耗了大量内存。这是我尝试基于 PySpark 或 Koalas 的解决方案的地方,因为它们都是基于 Spark 且高度可扩展的。由于我是 Spark 的新手,我不确定如何针对这种规模优化正则表达式和替换字符串。

我的代码片段:

pd_dataset['details_trunc'] = pd_dataset['details'].str.replace(r'[0-9]+GB? ', '', regex=True, flags=re.IGNORECASE).str.replace(r'[0-9]+MB?P?S? ', '', regex=True, flags=re.IGNORECASE).str.replace(r'[0-9]+\s?mins? ', '',regex=True,flags=re.IGNORECASE).str.replace(r"\(.*\)","").str.split("$").str[0].str.split('-').str[0].str.replace(r"\b[0-9]+\b", '', regex=True).str.split('fr').str[0].str.split('ends').str[0].str.split(':').str[0].str.strip()
pd_dataset['details_trunc'].replace(to_replace =r'Apple App Store.*$', value = 'Apple App Store', regex = True, inplace=True)
pd_dataset['details_trunc'].replace(to_replace =r'Google Play.*$', value = 'Google Play', regex = True, inplace=True)
pd_dataset['details_trunc'].replace('', 'NA', inplace=True)

编辑 1

在table下面,details是输入,details_trunc是输出

details_trunc details Class
Local Airtime Call Charge Local Airtime Call Charge AAB
Local Airtime Call Charge Local Airtime Call Charge AAB
Local Calls Local Calls - Incoming 0.00 AAB
Local Calls Local Calls - Outgoing 0.00 AAB
STD Call STD Call - E STD 020 Call Mobile No. AAB
v019 Call v019 Call - [=14=].66 AAB
v019 Call v019 Call - .80 AAB
v019 Call v019 Call - Mobile No. [=14=].92 AAB
v019 Call v019 Call - Mobile No. [=14=].25 AAB
v019 Call v019 Call - Mobile No. .84 AAB
IDD Call IDD 001 Call - E Mobile No. AAB
IDD Call IDD 001 Call - IDD 001 Call - S AAB
Roaming Incoming Call Roaming Incoming Call 193813 RRE
Roaming Incoming Call Roaming Incoming Call 204459 RRE
Roaming Incoming Call Roaming Outgoing Call 000911 Int'l Call ILL
Roaming Incoming Call Roaming Outgoing Call 000954 Int'l Call(S'pore) INL
Roaming Incoming Call Roaming Outgoing Call 001447 Int'l Call(S'pore) INL
AutoRoam Rerouted IDD/STD Call AutoRoam Rerouted IDD/STD Call - AutoRoam Rerouted IDD Call - D -(TSM: quantity set to 0 as counted under corresponding AutoRoam Call) AAB
Local Mobile Data/GPRS Data Local Mobile Data/GPRS Data (1GB = 1024MB; 1MB = 1024KB) AAB
Local MMS Local MMS (M1/StarHub) BRQ
SmartMessage SmartMessage (Local) BRQ
Global SMS Global SMS AKK
Global SMS Global SMS AKK

如果我正确理解了第一个 par 替换,你可以在 pyspark 中这样写:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "details_trunc",
    F.split(
        F.regexp_replace(
            "details",
            r"[0-9]+MB?P?S? |\(.*\)|[0-9]+\s?mins? |[0-9]+GB? |\b[0-9]+\b",
            ""
        ),
        r"\-|fr|ends|:|$"
    )[0]
)

基本上,这里的函数regexp_replace删除了所有对应的匹配项。然后通过正则表达式 \-|fr|ends|:|$ 使用 split 并获取第一个元素。

对于第二部分,您可以在 when 表达式中使用 rlike

df2 = df1.withColumn(
    "details_trunc",
    F.when(
        F.col("details_trunc").rlike(r'Apple App Store.*$'), 'Apple App Store'
    ).when(
        F.col("details_trunc").rlike(r'Google Play.*$'), 'Google Play'
    ).when(
        F.col("details_trunc") == '', 'NA'
    ).otherwise(F.col("details_trunc"))
)

我还没有对所有情况进行测试,但这是一个很好的起点,可以让您了解如何将 pandas 逻辑重写到 Pyspark 中。