Azure Synapse 'overwrite' 方法中的 Apache Spark 函数不起作用
Apache Spark in Azure Synapse 'overwrite' method Function not working
我有一个很好的功能让我可以在将查询结果保存到 ADLS 时覆盖和重命名文件,请参见下文
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = mssparkutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
mssparkutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
我通常将此函数与 Databricks 一起使用,在 Databricks 中我会使用 dbutils 而不是 mssparkutils。
无论如何,我将使用以下代码实现上述功能:
df_staging_ccd_probate = df_staging_ccd_probate = "abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/"
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filename")
mssparkutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
对于 Databricks,这可以正常工作,但是对于 Apache Spark 和 Azure Synapse,我收到以下错误:
Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/filename.parquet': File exists
出于某种原因,'overwrite' 方法似乎不适用于 Synapse 中的 Apache Spark。
谁能告诉我 'overwrite' 的等效方法是什么?或者我错过了什么?
谢谢
只是让你知道
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = dbutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
dbutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
每次使用 Databricks 都会覆盖以下内容,因此它一定是 Synapse 无法正常工作的原因:
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filemane")
dbutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
你很接近。这是移动文件并允许覆盖的方法。
mssparkutils.fs.mv(source_path, dest_path, overwrite=True)
我有一个很好的功能让我可以在将查询结果保存到 ADLS 时覆盖和重命名文件,请参见下文
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = mssparkutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
mssparkutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
我通常将此函数与 Databricks 一起使用,在 Databricks 中我会使用 dbutils 而不是 mssparkutils。
无论如何,我将使用以下代码实现上述功能:
df_staging_ccd_probate = df_staging_ccd_probate = "abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/"
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filename")
mssparkutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
对于 Databricks,这可以正常工作,但是对于 Apache Spark 和 Azure Synapse,我收到以下错误:
Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/filename.parquet': File exists
出于某种原因,'overwrite' 方法似乎不适用于 Synapse 中的 Apache Spark。
谁能告诉我 'overwrite' 的等效方法是什么?或者我错过了什么? 谢谢
只是让你知道
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = dbutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
dbutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
每次使用 Databricks 都会覆盖以下内容,因此它一定是 Synapse 无法正常工作的原因:
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filemane")
dbutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
你很接近。这是移动文件并允许覆盖的方法。
mssparkutils.fs.mv(source_path, dest_path, overwrite=True)