Synapse spark select 列 space

Synapse spark select column with space

我正在尝试读取突触 table,它的列名中有空格。

阅读 table 一直在工作,直到我 selecting 没有空格或特殊字符的列:

%%spark
val df = spark.read.synapsesql("<Pool>.<schema>.<table>").select("TYPE", "Year").limit(100)
df.show()

输出:

+------+----+
|  TYPE|Year|
+------+----+
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|

当我开始 selecting 带空格的列时,出现错误。我尝试了很多变体:

.select(col("""`Country Code`"""))
.select(col("`Country Code`"))
.select(col("""[Country Code]"""))
.select(col("Country Code"))
.select($"`Country Code`")
.select("`Country Code`")
.select("""`Country Code`""")

会return这个错误: 错误:com.microsoft.sqlserver.jdbc.SQLServerException:列名称无效 'Country'。

如果我在 select 中省略 ` 例如:

.select("[Country Code]")

错误:com.microsoft.sqlserver.jdbc.SQLServerException:列名“[国家代码]”无效。

在突触中使用反引号火花只需将第一个单词作为列。

有经验吗?

select 本身会起作用,添加 show(或任何其他类似 count 的操作)不会。 Synapse synapsesql API 似乎确实存在问题。 Invalid column name 'country' 错误来自 SQL 引擎,因为似乎无法将方括号传回给它。镶木地板文件也不支持列名中的空格,因此它可能已连接。

解决方法是简单地在列名中使用空格。如果需要,修复先前 Synapse 管道步骤中的 tables。我会调查一下,但可能没有其他答案。

如果你想重命名数据库中现有的列,你可以使用sp_rename,例如

EXEC sp_rename 'dbo.countries.country Type', 'countryType', 'COLUMN';

此代码已经在 Synapse 专用 SQL 池上进行了测试。

不幸的是,

那个特定的 API (sysnapsesql.read) 无法处理视图。您必须在之前的 Synapse Pipeline 步骤中使用 CTAS 来具体化它。 API 对于简单模式很有用(get table -> process -> put back)但是非常有限。您甚至无法管理 table 分布(散列、round_robin、复制)或索引(聚集列存储、聚集索引、堆)或分区,但您永远不知道它们有一天可能会增加。无论如何,我会在下一次 MS 会议期间密切关注。

我已经使用 JDBC 创建了 运行 查询函数。感谢这个我能够从视图中读取。我添加了 saplme 代码如何使用 TokenLibrary.

从 KeyVault 获取密码
def spark_query(db, query):
    jdbc_hostname = "<synapse_db>.sql.azuresynapse.net"
    user = "<spark_db_client>"
    password = "<strong_password>"
    # password_from_kv = TokenLibrary.getSecret("<Linked_Key_Vault_Service_Name>", "<Key_Vault_Key_Name>", "<Key_Vault_Name>")
    return spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{jdbc_hostname }:1433;databaseName={db};user={user};password={password}") \
    .option("query", query) \
    .load()

然后我创建了 VIEW,列名没有空格:

CREATE VIEW v_my_table
AS
SELECT [Country code] as country_code from my_table

已授予对 <spark_db_client> 的访问权限:

GRANT SELECT ON v_my_table to <spark_db_client>

完成整个准备工作后,我能够从 VIEW 中读取 table 并保存到 spark 数据库:

query = """
SELECT country_code FROM dbo.v_my_table
"""

df = spark_query(db="<my_database>", query=query)
spark.sql("CREATE DATABASE IF NOT EXISTS spark_poc")
df.write.mode("overwrite").saveAsTable("spark_poc.my_table")
df.registerTempTable("my_table")

这是<placeholder_variables>