使用正则表达式或任何其他转换的 Pyspark 列转换

Pyspark Column transformation using regex or any other transformations

能否请您告诉我如何使用 pyspark 将 url 列转换为 Dataframe 中的输出列行。

转换规则:

  1. 将 https 替换为 abfss
  2. 将“blob.core.windows.net”替换为dfs.core.windows.net
  3. 提取第三个“/”和最后一个“/”之间的文本,+“@”+第二个“/”和“.”之间的文本+ 剩余的字符串。

从pyspark.sql导入行

lst = [Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_1.csv', \
       output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_1.csv'), \
   Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_2.csv', \
       output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_2.csv'), \
   Row(url='https://inputfile.blob.core.windows.net/inputstorage/newfolder/AvailabilityZones_3.csv', \
       output='abfss://inputstorage/newfolder@inputfile.dfs.core.windows.net/AvailabilityZones_3.csv')]

df= spark.createDataFrame(lst)

预期数据框如下: expected output

没人回答,我自己回答

from pyspark.sql.functions import col, udf
def parseurl(url):
    fin_url=""
    url = url.replace('https://', 'abfss://')
    url = url.replace('blob.core.windows.net', 'dfs.core.windows.net')
    arr = url.split('/')
    sub_arr = []
    for pos in range(4,len(arr)-1):
        sub_arr.append(arr[pos])
    subFolder = ""
    if len(sub_arr)>0:
        subFolder = "/".join(str(x) for x in sub_arr)
    if subFolder != "":
        fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
    else:
        fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
    return fin_url
urlUDF = udf(lambda z: parseurl(z))
df.select(col("url"), \
    urlUDF(col("url")).alias("fin_url") ) \
   .show(truncate=False)