遍历数据块中的文件失败

Looping through files in databricks fails

继续 Managing huge zip files in dataBricks

Databricks 在 30 个文件后挂起。怎么办?

我已将巨大的 32Gb zip 分成 100 stand-alone 个部分。我已经从文件中拆分 header,因此可以像任何 CSV-file 一样处理它。我需要根据列过滤数据。文件位于 Azure Data Lake Storage Gen1 中,必须存储在那里。

尝试一次读取单个文件(或所有 100 个文件)在工作约 30 分钟后失败。 (参见上面的链接问题。)

我做了什么:

def lookup_csv(CR_nro, hlo_lista =[], output = my_output_dir ): 

  base_lib = 'adl://azuredatalakestore.net/<address>'
  all_files = pd.DataFrame(dbutils.fs.ls(base_lib + f'CR{CR_nro}'), columns = ['full', 'name', 'size'])
  done = pd.DataFrame(dbutils.fs.ls(output), columns = ['full', 'name', 'size'])
  all_files = all_files[~all_files['name'].isin(tehdyt['name'].str.replace('/', ''))]
  all_files = all_files[~all_files['name'].str.contains('header')]

  my_scema = spark.read.csv(base_lib + f'CR{CR_nro}/header.csv', sep='\t', header=True, maxColumns = 1000000).schema
  tmp_lst = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] + [i for i in hlo_lista  if i in my_scema.fieldNames()]

  for my_file in all_files.iterrows(): 
    print(my_file[1]['name'], time.ctime(time.time()))
    data = spark.read.option('comment', '#').option('maxColumns', 1000000).schema(my_scema).csv(my_file[1]['full'], sep='\t').select(tmp_lst)
    data.write.csv( output + my_file[1]['name'], header=True, sep='\t')

这行得通……有点。它可以工作 ~30 个文件然后挂断

Py4JJavaError: An error occurred while calling o70690.csv. ​ Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 154.0 failed 4 times, most recent failure: Lost task 0.3 in stage 154.0 (TID 1435, 10.11.64.46, executor 7): com.microsoft.azure.datalake.store.ADLException: Error creating file <my_output_dir>CR03_pt29.vcf.gz/_started_1438828951154916601 Operation CREATE failed with HTTP401 : null Last encountered exception thrown after 2 tries. [HTTP401(null),HTTP401(null)]

我尝试添加一些删除和休眠:

   ​data.unpersist()
   ​data = []
   ​time.sleep(5)

还有一些 try-exception 尝试。

for j in range(1,24): 
    for i in range(4): 
        try: 
            lookup_csv(j, hlo_lista =FN_list, output = blake +f'<my_output>/CR{j}/' )
        except Exception as e:
            print(i, j, e)
            time.sleep(60)

运气不好。一旦失败,就一直失败。

知道如何处理这个问题吗?我认为与 ADL-drive 的连接在一段时间后会失败,但是如果我 queue 命令:

lookup_csv(<inputs>) 
<next cell> 
lookup_csv(<inputs>) 

它工作,失败并且在下一个单元格工作得很好。我可以接受这一点,但非常烦人的是基本循环无法在这种环境中工作。

最好的解决方案是永久安装 ADSL 存储并为此使用 azure 应用程序。

在 Azure 中,请转到应用程序注册 - 使用名称例如“databricks_mount”注册应用程序。在您的增量湖存储中为该应用程序添加 IAM 角色“Storage Blob Data Contributor”。

configs = {"fs.azure.account.auth.type": "OAuth",
      "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", 
      "fs.azure.account.oauth2.client.id": "<your-client-id>",
      "fs.azure.account.oauth2.client.secret": "<your-secret>",
      "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your-endpoint>/oauth2/token"}

dbutils.fs.mount(
 source = "abfss://delta@yourdatalake.dfs.core.windows.net/",
 mount_point = "/mnt/delta",
 extra_configs = configs)

您无需挂载即可访问,但您仍然需要注册一个应用程序并通过笔记本中的 spark 设置应用配置才能访问 ADLS。由于 azure 应用程序,它应该在整个会话中是永久的:

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"), 
spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")

这个解释是最好的https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#access-adls-gen2-directly虽然我记得第一次我也有问题。该页面还解释了如何注册应用程序。也许您的公司政策没问题。