来自 DataBricks 问题的简单 COSMOSDB 查询

Simple COSMOSDB query from DataBricks problems

我正在为基于 pydocumentdb 库的函数编写替换 PySpark 例程,由于 pydocumentdb 库被弃用,该例程不再有效。函数调用看起来像这样 def QueryCosmos (container, query, schema, target) 我以前没有接触过 COSMOS,但我觉得这应该是一个相当简单的任务,但无论我采用哪种方法,我似乎都遇到了障碍。我尝试过的 3 种方法是:

我确定这是因为我缺乏经验,但我觉得我在某处遗漏了一些东西。

首先我在集群上设置的库如下:

第 2 行完整路径 (com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.10.0)

SDK 变体代码

#cosmos db libraries

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
import os
import json

def QueryCosmos_example (container, query, schema, target):

  #get the connection settings
  Env = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxx")
  CosmosAddress = "https://xxxxxxxxxxx" + Env + ".documents.azure.com:443/"
  CosmosSecretKey = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxxxxx")

  #get the container link
  database = "dbs/xxxxxx-cosmosdb-01-" + Env
  collection = database + "/colls/" + container

  #perform query
  client = cosmos_client.CosmosClient(CosmosAddress, {'masterKey': CosmosSecretKey} )
  print(CosmosAddress)  
  
  db = client.get_database_client(database)
  cont = db.get_container_client(container)
  print(cont)

  test_query="select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'" 
    
  items = list(cont.query_items(
        query=test_query,
        enable_cross_partition_query=True
    ))

  print(items)

  #convert to data frame
  df = spark.createDataFrame(items, schema=schema)
 
  
  #create a temp view based on the results
  df.createOrReplaceTempView(target)

上面的代码已被更改以删除敏感信息。然而,正如您从下面的输出中看到的那样,容器信息正在通过,所以看起来查询调用是不正确的,尽管环顾四周我看不出它有什么问题。

有没有办法查看来自 DataBricks 的请求,或者有关响应包含的内容的更多信息。我假设它可能包含更多异常信息...

COSMOS 连接器代码为:

..........
 database = "xxxxxxxxxxxx" + Env
# The code above here is same as the previous snippet

 test_query= "select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'"
  cfg = {
    "spark.cosmos.accountEndpoint" : CosmosAddress,
    "spark.cosmos.accountKey" : CosmosSecretKey,
    "spark.cosmos.database" : database,
    "spark.cosmos.container" : container,
    "spark.cosmos.read.customQuery" : test_query
    
  }
  
  item = spark.read.format("cosmos.oltp").options(**cfg).load()
  item.createOrReplaceTempView("supporter")
  display(item)

当我 运行 找不到数据时,但是如果我 运行 在门户中查询,我会返回结果。

非常感谢 理查德

所以我设法找到了问题所在。

查看第一个示例,当您查看返回的容器地址时:

可以看到dbs路径层级重复了。代码片段直接取自 pydocumentdb 代码,因此这两个库调用之间存在轻微的不一致。

原始数据库参考是:

database = "dbs/xxxxxx-cosmosdb-01-" + Env

更正参考

database = "xxxxxx-cosmosdb-01-" + Env