使用 python 数据块转换 Azure 数据工厂中的数据

transform data in azure data factory using python data bricks

我的任务是将数百万个 JSON 文件转换并合并为大型 CSV 文件。

使用副本activity并映射模式,操作会非常简单,我已经测试过,问题是大量文件的JSON格式不正确。

我知道错误是什么,修复也很简单,我想我可以使用 Python 数据块 activity 来修复字符串,然后将输出传递给一个副本activity 可以将记录合并到一个大的 CSV 文件中。

我有这样的想法,我不确定这是否是解决此任务的正确方法。我不知道在数据块中使用 Copy Activy 的输出 activity

将 JSON 文件复制到存储(例如 BLOB),您可以从 Databricks 访问存储。然后您可以使用 Python 修复文件,甚至转换为具有簇 运行.

的所需格式

因此,在复制数据 activity 中将文件复制到 BLOB(如果您还没有的话)。

听起来您想使用 Azure 数据工厂转换大量单个 JSON 文件,但正如@KamilNowinski 所说,它现在在 Azure 上不支持。但是,既然你使用的是 Azure Databricks,编写一个简单的 Python 脚本来完成同样的事情对你来说会更容易。因此,一个变通的解决方案是直接使用 Azure Storage SDK 和 pandas Python 包通过 Azure Databricks 上的几个步骤来做到这一点。

  1. 可能这些JSON文件都在Azure Blob Storage的一个容器中,所以你需要通过list_blob_names and generate their urls with sas token for pandas read_json函数将它们列在容器中,代码如下。

    from azure.storage.blob.baseblobservice import BaseBlobService
    from azure.storage.blob import ContainerPermissions
    from datetime import datetime, timedelta
    
    account_name = '<your account name>'
    account_key = '<your account key>'
    container_name = '<your container name>'
    
    service = BaseBlobService(account_name=account_name, account_key=account_key)
    token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),)
    
    blob_names = service.list_blob_names(container_name)
    blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names)
    
    #print(list(blob_urls_with_token))
    
  2. 然后,您可以通过 read_json 函数直接从 blob 读取这些 JSON 文件,以创建它们的 pandas Dataframe。

    import pandas as pd
    
    for blob_url_with_token in blob_urls_with_token:
        df = pd.read_json(blob_url_with_token)
    

    即使您想将它们合并到一个大的 CSV 文件中,您也可以先通过 Combining / joining / merging 中列出的 pandas 函数将它们合并到一个大的 Dataframe 中,例如 append.

  3. 要将数据帧写入csv文件,我认为通过to_csv函数非常容易。或者您可以将 pandas 数据帧转换为 Azure Databricks 上的 PySpark 数据帧,如下面的代码。

    from pyspark.sql import SQLContext
    from pyspark import SparkContext
    
    sc = SparkContext()
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(df)
    

所以接下来,无论你想做什么,都很简单。如果你想在 Azure Databricks 中将脚本安排为笔记本,你可以参考官方文档 Jobs to 运行 Spark jobs.

希望对您有所帮助。