将 JSON 数据从 DocumentDB(或 CosmosDB)移动到 Azure Data Lake

Move JSON Data from DocumentDB (or CosmosDB) to Azure Data Lake

我在 Cosmos DB(以前称为 Document DB)中有很多 JSON 个文件(以百万计),我想将其移动到 Azure Data Lake 中进行冷存储。

我找到了这份文件 https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.documents.client.documentclient.readdocumentfeedasync?view=azure-dotnet,但它没有任何示例。

我该如何继续,非常感谢任何代码示例。

谢谢。

是的,更改提要就可以了。

你有两个选择。第一个(在这种情况下可能是您想要的)是通过 SDK 使用它。

Microsoft 有一个详细的页面,介绍了如何在此处包含代码示例:https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed#rest-apis

第二个是 Change Feed Library,它允许您有一项服务 运行 随时侦听更改并根据您的需要处理它们。有关更改提要库的代码示例的更多详细信息,请参见此处:https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed#change-feed-processor

(两个页面(实际上是相同的,只是不同的部分)包含 link 到包含代码示例的 Microsoft github 存储库。)

请记住,根据 RU/s,您仍然需要为使用此功能付费,但据我所知,费用相对较低(或至少低于您开始阅读时支付的费用集合本身。)

我建议您使用 Azure Data Factory 来实现您的要求。

有关如何将数据导入 ADL,请参阅此doc about how to export json documents from cosmos db and this doc

希望对你有帮助。


更新答案:

请参考:Azure Cosmos DB as source,您可以在管道中创建 query

您也可以使用逻辑应用程序。可以使用定时器触发器。这将是一个无代码解决方案

  1. 查询文件
  2. 遍历文档
  3. 添加到数据湖

优点是您可以在发送到数据湖之前应用任何规则

您还可以通过 Spark 阅读更改提要。以下 python 代码示例生成按加载日期分区的镶木地板文件以更改数据。每天在 Azure Databricks 笔记本中工作:

    # Get DB secrets
    endpoint = dbutils.preview.secret.get(scope = "cosmosdb", key = "endpoint")
    masterkey = dbutils.preview.secret.get(scope = "cosmosdb", key = "masterkey")

    # database & collection
    database = "<yourdatabase>"
    collection = "<yourcollection"

    # Configs
    dbConfig = {
    "Endpoint" : endpoint,
    "Masterkey" : masterkey,
    "Database" : database,
    "Collection" : collection, 
    "ReadChangeFeed" : "True",
    "ChangeFeedQueryName" : database + collection + " ",
    "ChangeFeedStartFromTheBeginning" : "False",
    "ChangeFeedUseNextToken" : "True",
    "RollingChangeFeed" : "False",
    "ChangeFeedCheckpointLocation" : "/tmp/changefeedcheckpointlocation",
    "SamplingRatio" : "1.0"
    }

    # Connect via Spark connector to create Spark DataFrame
    df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**dbConfig).load()     

    # set partition to current date
    import datetime
    from pyspark.sql.functions import lit

    partition_day= datetime.date.today()
    partition_datetime=datetime.datetime.now().isoformat()

    # new dataframe with ingest date (=partition key)
    df_part= df.withColumn("ingest_date", lit(partition_day))

    # write parquet file
    df_part.write.partitionBy('ingest_date').mode('append').json('dir')