Azure Databricks Spark SQL 查询 CosmosDB 从其他文档获取列
Azure Databricks Spark SQL Query to CosmosDB getting columns from other documents
当使用 Azure-Cosmosdb-spark 连接器将 sql 查询传递给 CosmosDB 时,它似乎添加了集合中其他文档的列。集合中有许多文档,例如用户和公司,它们由实体类型分隔。例如用户设置为
{ "id": "user-000003",
"email": "someemail",
"firstName": "firstname",
"lastName": "lastname",
"username": "someusername",
"companyId": "company-000003",
"entity": "user"
}
并且公司设置为:
{ "id": "company-000003",
"contactName": "namegoes here",
"addressLine1": "Address line 1",
"addressLine2": "Address line 2",
"entity": "company"
}
我使用 Azure-Cosmosdb-spark sdk 创建连接
cosmosConfig = {
"Endpoint" : "my endpoint goes here",
"Masterkey" : "my key goes here",
"Database" : "my database goes here",
"preferredRegions" : "my region goes here",
"Collection" : "my collection",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
}
然后是
将其设置为使用该连接
cosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
cosmosdbConnection.createOrReplaceTempView("c")
然后我运行查询
exampleQuery= 'SELECT c.* FROM c WHERE c.entity = "user"'
users = spark.sql(exampleQuery)
我希望得到一个数据框,其中包含用户文档中定义的列、id、电子邮件、名字、姓氏、用户名、公司 ID 和实体。但是,它也从公司文档中提取列名称,但所有值为空值。
运行 Azure Cosmos DB 存储资源管理器或 Azure 门户中的相同查询,只是带回用户文档。
我可以只指定我想要的列名称,但如果模式发生变化,我将需要添加这些列。
我假设它是查询?我正在寻找从 sql 查询中的文档中获取列的方法。我确实认为它只会将 sql 查询传递给 cosmosdb SQL API.
这是我第一次将 databricks 与 cosmos db 一起使用,用谷歌搜索了一下,但似乎看不出我做错了什么。
问题是在配置设置中,没有指定对集合的 SQL 查询,它只是读取集合中的所有文档,而不是不同的文档,应该按 entity/theme 类型拆分。
通过在配置中添加SQL查询
cosmosConfig = {
"Endpoint" : "my endpoint goes here",
"Masterkey" : "my key goes here",
"Database" : "my database goes here",
"preferredRegions" : "my region goes here",
"Collection" : "my collection",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"Query" : "SELECT * FROM c WHERE C.entity = 'SomeEntity"
"query_pagesize" : "2147483647",
}
它将基于此创建连接。如果需要添加多个实体,创建一个函数例如:
def createCosmosDBObject(useEntity):
query = "SELECT * FROM c WHERE c.entity = " + "'" + useEntity + "'"
# Create connection setting
cosmosConfig = {
"Endpoint" : "Kyour endpoint",
"Masterkey" : "Your Key",
"Database" : "Your Database",
"preferredRegions" : "Azure Region",
"Collection" : "Your Collection",
"ConnectionMode": "DirectHttps",
"SamplingRatio" : "1.0",
"schema_samplesize" : "20000",
"query_pagesize" : "2147483647",
"query_custom" : query
}
createConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
createConnection.createOrReplaceTempView(useEntity)
然后您可以通过文档实体名称调用它:
createCosmosDBObject("customer")
然后它将 'customer' 插入到查询中,然后创建一个您可以查询的临时视图,不会与 Cosmos DB 中的其他文档实体类型重叠
希望对您有所帮助
当使用 Azure-Cosmosdb-spark 连接器将 sql 查询传递给 CosmosDB 时,它似乎添加了集合中其他文档的列。集合中有许多文档,例如用户和公司,它们由实体类型分隔。例如用户设置为
{ "id": "user-000003",
"email": "someemail",
"firstName": "firstname",
"lastName": "lastname",
"username": "someusername",
"companyId": "company-000003",
"entity": "user"
}
并且公司设置为:
{ "id": "company-000003",
"contactName": "namegoes here",
"addressLine1": "Address line 1",
"addressLine2": "Address line 2",
"entity": "company"
}
我使用 Azure-Cosmosdb-spark sdk 创建连接
cosmosConfig = {
"Endpoint" : "my endpoint goes here",
"Masterkey" : "my key goes here",
"Database" : "my database goes here",
"preferredRegions" : "my region goes here",
"Collection" : "my collection",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
}
然后是
将其设置为使用该连接
cosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
cosmosdbConnection.createOrReplaceTempView("c")
然后我运行查询
exampleQuery= 'SELECT c.* FROM c WHERE c.entity = "user"'
users = spark.sql(exampleQuery)
我希望得到一个数据框,其中包含用户文档中定义的列、id、电子邮件、名字、姓氏、用户名、公司 ID 和实体。但是,它也从公司文档中提取列名称,但所有值为空值。 运行 Azure Cosmos DB 存储资源管理器或 Azure 门户中的相同查询,只是带回用户文档。
我可以只指定我想要的列名称,但如果模式发生变化,我将需要添加这些列。
我假设它是查询?我正在寻找从 sql 查询中的文档中获取列的方法。我确实认为它只会将 sql 查询传递给 cosmosdb SQL API.
这是我第一次将 databricks 与 cosmos db 一起使用,用谷歌搜索了一下,但似乎看不出我做错了什么。
问题是在配置设置中,没有指定对集合的 SQL 查询,它只是读取集合中的所有文档,而不是不同的文档,应该按 entity/theme 类型拆分。
通过在配置中添加SQL查询
cosmosConfig = {
"Endpoint" : "my endpoint goes here",
"Masterkey" : "my key goes here",
"Database" : "my database goes here",
"preferredRegions" : "my region goes here",
"Collection" : "my collection",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"Query" : "SELECT * FROM c WHERE C.entity = 'SomeEntity"
"query_pagesize" : "2147483647",
}
它将基于此创建连接。如果需要添加多个实体,创建一个函数例如:
def createCosmosDBObject(useEntity):
query = "SELECT * FROM c WHERE c.entity = " + "'" + useEntity + "'"
# Create connection setting
cosmosConfig = {
"Endpoint" : "Kyour endpoint",
"Masterkey" : "Your Key",
"Database" : "Your Database",
"preferredRegions" : "Azure Region",
"Collection" : "Your Collection",
"ConnectionMode": "DirectHttps",
"SamplingRatio" : "1.0",
"schema_samplesize" : "20000",
"query_pagesize" : "2147483647",
"query_custom" : query
}
createConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
createConnection.createOrReplaceTempView(useEntity)
然后您可以通过文档实体名称调用它:
createCosmosDBObject("customer")
然后它将 'customer' 插入到查询中,然后创建一个您可以查询的临时视图,不会与 Cosmos DB 中的其他文档实体类型重叠
希望对您有所帮助