从 Databricks 到 Cosmos 的 UPSERT /INSERT/ UPDATE

UPSERT /INSERT/ UPDATE between Databricks to Cosmos

目前我们使用 Azure Databricks 作为转换层,转换后的数据通过连接器加载到 Cosmos DB。

场景:

我们有 2 个文件作为源文件。

第一个文件包含姓名、年龄

第二个文件包含姓名、州、国家/地区

在 Cosmos 中,我使用 id、分区键创建了集合

在数据块中,我将这 2 个文件加载为 Dataframe 并创建一个临时文件 table 来查询内容。

我正在查询第一个文件 [select name as id, name, age from file ] 的内容并将其加载到 Cosmos Collection。

来自第二个文件。我正在使用 [ select name as id, state, country] 并加载到同一个集合,期望第二个文件的内容根据 id 字段插入到同一文档的同一个集合中。

这里的问题是,当我从第二个文件加载内容时,第一个文件的属性 'age' 被删除,并且在 cosmos 文档中只能看到 id、name、state、country。发生这种情况是因为我在数据块中使用 UPSERT 加载到 Cosmos。

当我将 UPSERT 更改为 INSERT 或 UPDATE 时,它抛出错误 'Resource with id already exists'

Databricks 与 Cosmos 的连接:

val configMap = Map(
  "Endpoint" -> {"https://"},
  "Masterkey" -> {""},
  "Database" -> {"ods"},
  "Collection" -> {"tval"},
  "preferredRegions" -> {"West US"},
  "upsert" -> {"true"}) 
  val config = com.microsoft.azure.cosmosdb.spark.config.Config(configMap)

有没有办法从第二个文件插入属性而不删除已经存在的属性。我没有使用 JOIN 操作,因为用例不适合使用。

根据模糊的记忆,您需要在数据框上设置 id 属性以匹配两个数据集。 如果你省略这个字段,Cosmos 会生成一个新记录——这就是你正在发生的事情。

所以如果 df1 和 df2 在第一条记录上有 id=1 那么第一个将插入它,第二个将更新它。

但如果它们是相同的记录,那么加入 Spark 会更有效率。