使用正则表达式或任何其他转换的 Pyspark 列转换
Pyspark Column transformation using regex or any other transformations
能否请您告诉我如何使用 pyspark 将 url 列转换为 Dataframe 中的输出列行。
转换规则:
- 将 https 替换为 abfss
- 将“blob.core.windows.net”替换为dfs.core.windows.net
- 提取第三个“/”和最后一个“/”之间的文本,+“@”+第二个“/”和“.”之间的文本+ 剩余的字符串。
从pyspark.sql导入行
lst = [Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_1.csv', \
output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_1.csv'), \
Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_2.csv', \
output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_2.csv'), \
Row(url='https://inputfile.blob.core.windows.net/inputstorage/newfolder/AvailabilityZones_3.csv', \
output='abfss://inputstorage/newfolder@inputfile.dfs.core.windows.net/AvailabilityZones_3.csv')]
df= spark.createDataFrame(lst)
预期数据框如下:
expected output
没人回答,我自己回答
from pyspark.sql.functions import col, udf
def parseurl(url):
fin_url=""
url = url.replace('https://', 'abfss://')
url = url.replace('blob.core.windows.net', 'dfs.core.windows.net')
arr = url.split('/')
sub_arr = []
for pos in range(4,len(arr)-1):
sub_arr.append(arr[pos])
subFolder = ""
if len(sub_arr)>0:
subFolder = "/".join(str(x) for x in sub_arr)
if subFolder != "":
fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
else:
fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
return fin_url
urlUDF = udf(lambda z: parseurl(z))
df.select(col("url"), \
urlUDF(col("url")).alias("fin_url") ) \
.show(truncate=False)
能否请您告诉我如何使用 pyspark 将 url 列转换为 Dataframe 中的输出列行。
转换规则:
- 将 https 替换为 abfss
- 将“blob.core.windows.net”替换为dfs.core.windows.net
- 提取第三个“/”和最后一个“/”之间的文本,+“@”+第二个“/”和“.”之间的文本+ 剩余的字符串。
从pyspark.sql导入行
lst = [Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_1.csv', \
output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_1.csv'), \
Row(url='https://inputfile.blob.core.windows.net/inputstorage/AvailabilityZones_2.csv', \
output='abfss://inputstorage@inputfile.dfs.core.windows.net/AvailabilityZones_2.csv'), \
Row(url='https://inputfile.blob.core.windows.net/inputstorage/newfolder/AvailabilityZones_3.csv', \
output='abfss://inputstorage/newfolder@inputfile.dfs.core.windows.net/AvailabilityZones_3.csv')]
df= spark.createDataFrame(lst)
预期数据框如下: expected output
没人回答,我自己回答
from pyspark.sql.functions import col, udf
def parseurl(url):
fin_url=""
url = url.replace('https://', 'abfss://')
url = url.replace('blob.core.windows.net', 'dfs.core.windows.net')
arr = url.split('/')
sub_arr = []
for pos in range(4,len(arr)-1):
sub_arr.append(arr[pos])
subFolder = ""
if len(sub_arr)>0:
subFolder = "/".join(str(x) for x in sub_arr)
if subFolder != "":
fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
else:
fin_url = url[:8] + arr[3] + '/' + subFolder + '@' + arr[2] + '/' + arr[-1]
return fin_url
urlUDF = udf(lambda z: parseurl(z))
df.select(col("url"), \
urlUDF(col("url")).alias("fin_url") ) \
.show(truncate=False)