来自 DataBricks 问题的简单 COSMOSDB 查询
Simple COSMOSDB query from DataBricks problems
我正在为基于 pydocumentdb 库的函数编写替换 PySpark 例程,由于 pydocumentdb 库被弃用,该例程不再有效。函数调用看起来像这样 def QueryCosmos (container, query, schema, target)
我以前没有接触过 COSMOS,但我觉得这应该是一个相当简单的任务,但无论我采用哪种方法,我似乎都遇到了障碍。我尝试过的 3 种方法是:
- 使用 COSMOS SDK
- 使用 COSMOS 连接器
- 使用 COSMOS 目录 API
我确定这是因为我缺乏经验,但我觉得我在某处遗漏了一些东西。
首先我在集群上设置的库如下:
第 2 行完整路径 (com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.10.0)
SDK 变体代码
#cosmos db libraries
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
import os
import json
def QueryCosmos_example (container, query, schema, target):
#get the connection settings
Env = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxx")
CosmosAddress = "https://xxxxxxxxxxx" + Env + ".documents.azure.com:443/"
CosmosSecretKey = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxxxxx")
#get the container link
database = "dbs/xxxxxx-cosmosdb-01-" + Env
collection = database + "/colls/" + container
#perform query
client = cosmos_client.CosmosClient(CosmosAddress, {'masterKey': CosmosSecretKey} )
print(CosmosAddress)
db = client.get_database_client(database)
cont = db.get_container_client(container)
print(cont)
test_query="select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'"
items = list(cont.query_items(
query=test_query,
enable_cross_partition_query=True
))
print(items)
#convert to data frame
df = spark.createDataFrame(items, schema=schema)
#create a temp view based on the results
df.createOrReplaceTempView(target)
上面的代码已被更改以删除敏感信息。然而,正如您从下面的输出中看到的那样,容器信息正在通过,所以看起来查询调用是不正确的,尽管环顾四周我看不出它有什么问题。
有没有办法查看来自 DataBricks 的请求,或者有关响应包含的内容的更多信息。我假设它可能包含更多异常信息...
COSMOS 连接器代码为:
..........
database = "xxxxxxxxxxxx" + Env
# The code above here is same as the previous snippet
test_query= "select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'"
cfg = {
"spark.cosmos.accountEndpoint" : CosmosAddress,
"spark.cosmos.accountKey" : CosmosSecretKey,
"spark.cosmos.database" : database,
"spark.cosmos.container" : container,
"spark.cosmos.read.customQuery" : test_query
}
item = spark.read.format("cosmos.oltp").options(**cfg).load()
item.createOrReplaceTempView("supporter")
display(item)
当我 运行 找不到数据时,但是如果我 运行 在门户中查询,我会返回结果。
非常感谢
理查德
所以我设法找到了问题所在。
查看第一个示例,当您查看返回的容器地址时:
可以看到dbs路径层级重复了。代码片段直接取自 pydocumentdb 代码,因此这两个库调用之间存在轻微的不一致。
原始数据库参考是:
database = "dbs/xxxxxx-cosmosdb-01-" + Env
更正参考
database = "xxxxxx-cosmosdb-01-" + Env
我正在为基于 pydocumentdb 库的函数编写替换 PySpark 例程,由于 pydocumentdb 库被弃用,该例程不再有效。函数调用看起来像这样 def QueryCosmos (container, query, schema, target)
我以前没有接触过 COSMOS,但我觉得这应该是一个相当简单的任务,但无论我采用哪种方法,我似乎都遇到了障碍。我尝试过的 3 种方法是:
- 使用 COSMOS SDK
- 使用 COSMOS 连接器
- 使用 COSMOS 目录 API
我确定这是因为我缺乏经验,但我觉得我在某处遗漏了一些东西。
首先我在集群上设置的库如下:
第 2 行完整路径 (com.azure.cosmos.spark:azure-cosmos-spark_3-2_2-12:4.10.0)
SDK 变体代码
#cosmos db libraries
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
import os
import json
def QueryCosmos_example (container, query, schema, target):
#get the connection settings
Env = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxx")
CosmosAddress = "https://xxxxxxxxxxx" + Env + ".documents.azure.com:443/"
CosmosSecretKey = dbutils.secrets.get(scope = "AzureKeyVault", key = "xxxxxxxxxxx")
#get the container link
database = "dbs/xxxxxx-cosmosdb-01-" + Env
collection = database + "/colls/" + container
#perform query
client = cosmos_client.CosmosClient(CosmosAddress, {'masterKey': CosmosSecretKey} )
print(CosmosAddress)
db = client.get_database_client(database)
cont = db.get_container_client(container)
print(cont)
test_query="select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'"
items = list(cont.query_items(
query=test_query,
enable_cross_partition_query=True
))
print(items)
#convert to data frame
df = spark.createDataFrame(items, schema=schema)
#create a temp view based on the results
df.createOrReplaceTempView(target)
上面的代码已被更改以删除敏感信息。然而,正如您从下面的输出中看到的那样,容器信息正在通过,所以看起来查询调用是不正确的,尽管环顾四周我看不出它有什么问题。
有没有办法查看来自 DataBricks 的请求,或者有关响应包含的内容的更多信息。我假设它可能包含更多异常信息...
COSMOS 连接器代码为:
..........
database = "xxxxxxxxxxxx" + Env
# The code above here is same as the previous snippet
test_query= "select * from c WHERE c.id = '245655b424852a89ea75ebe7fdf812df9de4e6220cdeba1489597ce6d1cd686d'"
cfg = {
"spark.cosmos.accountEndpoint" : CosmosAddress,
"spark.cosmos.accountKey" : CosmosSecretKey,
"spark.cosmos.database" : database,
"spark.cosmos.container" : container,
"spark.cosmos.read.customQuery" : test_query
}
item = spark.read.format("cosmos.oltp").options(**cfg).load()
item.createOrReplaceTempView("supporter")
display(item)
当我 运行 找不到数据时,但是如果我 运行 在门户中查询,我会返回结果。
非常感谢 理查德
所以我设法找到了问题所在。
查看第一个示例,当您查看返回的容器地址时:
可以看到dbs路径层级重复了。代码片段直接取自 pydocumentdb 代码,因此这两个库调用之间存在轻微的不一致。
原始数据库参考是:
database = "dbs/xxxxxx-cosmosdb-01-" + Env
更正参考
database = "xxxxxx-cosmosdb-01-" + Env