使用 pyspark 从文本中提取代码和名称用户

extract code and name user from text using pyspark

我想使用 pyspark 在描述字段中提取用户名和用户名。我的数据框包含 3 列:日期、ID 和描述

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

input = [{"date":'08-04-22',"id":'001',"description":"""XXX\A9
 04.09.2019 08:44:38 Martin Tyaunli (AH306FC)
 Localisation:FUSELAGE
 SECTION 15 (CADRES 23 À 29),*,DROIT,EXTÉRIEUR
 De Cadre 24 à 25,De Lisse 14 à 15
 ______________________________
 Non-conformité 0001 :
 Type de non-conf. : Equipement

 la valve FCV dans la BF RHS a etait deposée car le
 potentiomètre IEV n´avait pas de valeurs stables.
 ______________________________"""},
        {"date":'08-04-22',"id":'002',"description":"""XXXX/YYY
 03.06.2019 15:51:45 HANA HUBO (CO97908) Tél. 00
 Localisation:FUSELAGE
 SECTION 15 (CADRES 23 À 29),*,GAUCHE,EXTÉRIEUR
 ______________________________
 Non-conformité 0001 :
 Type de non-conf. : Assemblage structural et mécanique

 ______________________________
 04.06.2019 07:56:03 HANA HUBO (CO97908) Tél. 00
 SUITE AM 9691295
 Impossibilité de monter le panneau Belly fairing neuf côté gauche (191
 RL PN: 14S53974103-060)
 Modification du panneau d'origine
 Documents envoyés par mail
 Demande Avis du BE en attente réception panneau neuf montable
 05.06.2019 08:24:16 LAURENT DUQ (ST10568) Tél. 
 1) Modification du panneau 191RL actuel acceptable en l'état.
 2) Veuillez nous fournir un numéro de dérogation pour l'identification
 du doubleur et du cover.
 3) Dérogation à créer pour obturation du panneau 191RL à l'aide d'un
 doubleur et d'un cover.
 05.06.2019 10:39:27 HYU TOFFI (NG34CF4) Tél. 
 Dérogation TR-005462740 émise pour solution BE.
 10.07.2019 16:23:50 HYU TOFFI (NG34CF4) Tél. 
 Veuillez mettre à jour la solution de réparation avec TR-005462740.
 10.07.2019 16:36:35 MICKAEL LESPIAUCQ (ST10568) Tél. 
 Veuillez prendre enc ompte l'indice B00 de l'annexe pour rédaction de la
 dérogation.
 10.07.2019 17:55:31 HYU TOFFI (NG34CF4) Tél. 
 Dérogation TR-005462740 émise.
 Veuillez vérifier que la description de la dérogation corresponde à
 l'état physique de l'avion.

 20.01.2020 08:56:25 HANA HUBO (CO97908) Tél. 
 Remplacé par belly fairing de série
 Dérogation DQ2 09/10/19"""}
        ]
df = spark.createDataFrame(input)
df.show()
+--------+--------------------+---+
|    date|         description| id|
+--------+--------------------+---+
|08-04-22|XXX\A9
 04.09.201...|001|
|08-04-22|XXXX/YYY
 03.06.2...|002|
+--------+--------------------+---+

预期的输出是一个包含 5 列 dat、id、description、name 的数据框,user_code。

+--------+--------------------+---+--------------------+--------------------+
|    date|         description| id|                name|           user_code|
+--------+--------------------+---+--------------------+--------------------+
|08-04-22|XXX\A9
 04.09.201...|001|      Martin Tyaunli|             AH306FC|
|08-04-22|XXXX/YYY
 03.06.2...|002|HANA HUBO,LAURENT...|CO97908,ST10568,N...|
+--------+--------------------+---+--------------------+--------------------+
the two columns added must contain the code and the name of the users:
{"name":"Martin Tyaunli","user_code":"AH306FC"}
{"name":"HANA HUBO,LAURENT DUQ,HYU TOFFI,HYU TOFFI,SARA ESPINAZ","user_code":"CO97908,ST10568,NG34CF4,NG34CF4,ST10568",}

我试试这个,但它提取了第一个匹配项:

df=df.withColumn('name', F.regexp_extract(F.col('description'), 'AT9-\d{6}/SA\d{3}\n[\s]*\d{2}.\d{2}.\d{4}[\s]*\d{2}.\d{2}.\d{2}\s(.*)\s\(', 1))
df=df.withColumn('user_code', F.regexp_extract(F.col('description'), '\((.+?)\)', 1))

感谢任何帮助。

无法提取所有匹配项(参见 ) because the function regexp_extract_all 仅适用于 Spark > 3.1(奇怪的是,我有 pyspark 3.1.2,但 regexp_extract_all 不可用!)。

df.withColumn('user_code', F.regexp_extract_all(F.col('description'), '\((.+?)\)', 1)).show()
# AttributeError: module 'pyspark.sql.functions' has no attribute 'regexp_extract_all'

作为解决方法,您可以将正则表达式搜索包装在 udf 函数中,例如提取代码:

import re
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F

def extract(s):
    all_matches = re.findall(r'\((.+?)\)', s)
    return all_matches

extract_udf = F.udf(extract, ArrayType(StringType()))

df = df.withColumn('user_code', extract_udf('description'))
df[['user_code']].show(truncate=False)
# +-------------------------------------------------------------------------# ---------------+
# |user_code                                                                               # |
# +----------------------------------------------------------------------------------------+
# |[AH306FC, CADRES 23 À 29]                                                               # |
# |[CO97908, CADRES 23 À 29, CO97908, ST10568, NG34CF4, NG34CF4, ST10568, NG34CF4, CO97908]|
# +----------------------------------------------------------------------------------------+

(另见: