Azure Synapse 'overwrite' 方法中的 Apache Spark 函数不起作用

Apache Spark in Azure Synapse 'overwrite' method Function not working

我有一个很好的功能让我可以在将查询结果保存到 ADLS 时覆盖和重命名文件,请参见下文

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

def rename_file(origin_path, dest_path, file_type, new_name):
  filelist = mssparkutils.fs.ls(origin_path)#list all files from origin path
  filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
  if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
    print("Too many "+file_type+" files. You will need a different implementation")
  elif len(filtered_filelist) == 0: #check if there are no files of that type
    print("No "+file_type+" files found")
  else:
    mssparkutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process

我通常将此函数与 Databricks 一起使用,在 Databricks 中我会使用 dbutils 而不是 mssparkutils。

无论如何,我将使用以下代码实现上述功能:

df_staging_ccd_probate = df_staging_ccd_probate = "abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/"

myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filename")
mssparkutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)

对于 Databricks,这可以正常工作,但是对于 Apache Spark 和 Azure Synapse,我收到以下错误:

Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/filename.parquet': File exists

出于某种原因,'overwrite' 方法似乎不适用于 Synapse 中的 Apache Spark。

谁能告诉我 'overwrite' 的等效方法是什么?或者我错过了什么? 谢谢

只是让你知道

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

def rename_file(origin_path, dest_path, file_type, new_name):
  filelist = dbutils.fs.ls(origin_path)#list all files from origin path
  filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
  if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
    print("Too many "+file_type+" files. You will need a different implementation")
  elif len(filtered_filelist) == 0: #check if there are no files of that type
    print("No "+file_type+" files found")
  else:
    dbutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process

每次使用 Databricks 都会覆盖以下内容,因此它一定是 Synapse 无法正常工作的原因:

myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filemane")
dbutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)

你很接近。这是移动文件并允许覆盖的方法。

mssparkutils.fs.mv(source_path, dest_path, overwrite=True)