Azure Databricks Spark SQL 查询 CosmosDB 从其他文档获取列

Azure Databricks Spark SQL Query to CosmosDB getting columns from other documents

当使用 Azure-Cosmosdb-spark 连接器将 sql 查询传递给 CosmosDB 时,它似乎添加了集合中其他文档的列。集合中有许多文档,例如用户和公司,它们由实体类型分隔。例如用户设置为

{   "id": "user-000003",
    "email": "someemail",
    "firstName": "firstname",
    "lastName": "lastname",
    "username": "someusername",
    "companyId": "company-000003",
    "entity": "user"
}

并且公司设置为:

{   "id": "company-000003",
    "contactName": "namegoes here",
    "addressLine1": "Address line 1",
    "addressLine2": "Address line 2",
    "entity": "company"
}

我使用 Azure-Cosmosdb-spark sdk 创建连接

cosmosConfig = {
                "Endpoint" : "my endpoint goes here",
                "Masterkey" : "my key goes here",
                "Database" : "my database goes here",
                "preferredRegions" : "my region goes here",
                "Collection" : "my collection", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "1000",
                "query_pagesize" : "2147483647",
               }

然后是

将其设置为使用该连接

cosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
cosmosdbConnection.createOrReplaceTempView("c")

然后我运行查询

exampleQuery= 'SELECT c.* FROM c WHERE c.entity = "user"'
users = spark.sql(exampleQuery)

我希望得到一个数据框,其中包含用户文档中定义的列、id、电子邮件、名字、姓氏、用户名、公司 ID 和实体。但是,它也从公司文档中提取列名称,但所有值为空值。 运行 Azure Cosmos DB 存储资源管理器或 Azure 门户中的相同查询,只是带回用户文档。

我可以只指定我想要的列名称,但如果模式发生变化,我将需要添加这些列。

我假设它是查询?我正在寻找从 sql 查询中的文档中获取列的方法。我确实认为它只会将 sql 查询传递给 cosmosdb SQL API.

这是我第一次将 databricks 与 cosmos db 一起使用,用谷歌搜索了一下,但似乎看不出我做错了什么。

问题是在配置设置中,没有指定对集合的 SQL 查询,它只是读取集合中的所有文档,而不是不同的文档,应该按 entity/theme 类型拆分。

通过在配置中添加SQL查询

cosmosConfig = {
                "Endpoint" : "my endpoint goes here",
                "Masterkey" : "my key goes here",
                "Database" : "my database goes here",
                "preferredRegions" : "my region goes here",
                "Collection" : "my collection", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "1000",
                "Query" : "SELECT * FROM c WHERE C.entity = 'SomeEntity"
                "query_pagesize" : "2147483647",
               }

它将基于此创建连接。如果需要添加多个实体,创建一个函数例如:

def createCosmosDBObject(useEntity):
  query = "SELECT * FROM c WHERE c.entity = " + "'" + useEntity + "'"
  # Create connection setting
  cosmosConfig = {
                "Endpoint" : "Kyour endpoint",
                "Masterkey" : "Your Key",
                "Database" : "Your Database",
                "preferredRegions" : "Azure Region",
                "Collection" : "Your Collection", 
                "ConnectionMode": "DirectHttps", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "20000",
                "query_pagesize" : "2147483647",
                "query_custom" : query
               }

  createConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
  createConnection.createOrReplaceTempView(useEntity)

然后您可以通过文档实体名称调用它:

createCosmosDBObject("customer")

然后它将 'customer' 插入到查询中,然后创建一个您可以查询的临时视图,不会与 Cosmos DB 中的其他文档实体类型重叠

希望对您有所帮助