使用 Spark 的 Cosmos DB 空间查询
Cosmos DB spatial query using Spark
我想使用空间查询来查询 cosmos 数据库集合。特别是 ST_DISTANCE
查询。此查询使用 azure-cosmos
Python SDK 按预期工作。
我希望通过 Apache Spark 使用此查询来获得更复杂的查询模式。但是,在笔记本的 SQL
单元格中使用 ST_DISTANCE
查询会导致以下错误。
Error in SQL statement: AnalysisException: Undefined function: 'ST_DISTANCE'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.
笔记本初始化如下
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.createOrReplaceTempView("outlets")
_______________________________________________________________________
%sql
SELECT * FROM outlets f WHERE ST_DISTANCE(f.boundary, POINT(0,0)) < 600
根据我从 Cosmos DB Spark 连接器 github 存储库 [1] 中了解到的情况,并非所有 Cosmos DB 筛选器查询都通过连接器支持(还?)。因此空间系列中的 ST_DISTANCE
和其他过滤函数将无法工作,因为它们不是 Spark 原生支持的谓词,无法被推送到数据库。
找到了至少可以暂时解决此问题的方法。查询配置 [2] 允许将自定义查询直接发送到 Cosmos DB。可以构建和查询临时视图。这不适用于所有用例,但这解决了我需要完成距离过滤的单个视图的问题。其余的可以通过 Spark SQL.
处理
参考下面示例中的spark.cosmos.read.customQuery
[2]。
outlets_cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
"spark.cosmos.read.customQuery" : "SELECT * FROM c WHERE ST_DISTANCE(c.location,{\"type\":\"Point\",\"coordinates\": [12.832489, 18.9553242]}) < 1000"
}
df = spark.read.format("cosmos.oltp").options(**outlets_cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.createOrReplaceTempView("outlets")
[1] https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/
我想使用空间查询来查询 cosmos 数据库集合。特别是 ST_DISTANCE
查询。此查询使用 azure-cosmos
Python SDK 按预期工作。
我希望通过 Apache Spark 使用此查询来获得更复杂的查询模式。但是,在笔记本的 SQL
单元格中使用 ST_DISTANCE
查询会导致以下错误。
Error in SQL statement: AnalysisException: Undefined function: 'ST_DISTANCE'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.
笔记本初始化如下
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
from pyspark.sql.functions import col
df = spark.read.format("cosmos.oltp").options(**cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.createOrReplaceTempView("outlets")
_______________________________________________________________________
%sql
SELECT * FROM outlets f WHERE ST_DISTANCE(f.boundary, POINT(0,0)) < 600
根据我从 Cosmos DB Spark 连接器 github 存储库 [1] 中了解到的情况,并非所有 Cosmos DB 筛选器查询都通过连接器支持(还?)。因此空间系列中的 ST_DISTANCE
和其他过滤函数将无法工作,因为它们不是 Spark 原生支持的谓词,无法被推送到数据库。
找到了至少可以暂时解决此问题的方法。查询配置 [2] 允许将自定义查询直接发送到 Cosmos DB。可以构建和查询临时视图。这不适用于所有用例,但这解决了我需要完成距离过滤的单个视图的问题。其余的可以通过 Spark SQL.
处理参考下面示例中的spark.cosmos.read.customQuery
[2]。
outlets_cfg = {
"spark.cosmos.accountEndpoint" : cosmosEndpoint,
"spark.cosmos.accountKey" : cosmosMasterKey,
"spark.cosmos.database" : cosmosDatabaseName,
"spark.cosmos.container" : cosmosContainerName,
"spark.cosmos.read.customQuery" : "SELECT * FROM c WHERE ST_DISTANCE(c.location,{\"type\":\"Point\",\"coordinates\": [12.832489, 18.9553242]}) < 1000"
}
df = spark.read.format("cosmos.oltp").options(**outlets_cfg)\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()
df.createOrReplaceTempView("outlets")
[1] https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/