在 Spark 中执行以 "WITH" 关键字开头的 SQL 语句
Execute SQL statement starting with "WITH" keyword in Spark
以下查询在 SQL 服务器中运行良好:
with tt AS
(
select schema_name(fk_tab.schema_id) + '.' + fk_tab.name as foreign_table,
'>-' as rel,
schema_name(pk_tab.schema_id) + '.' + pk_tab.name as primary_table,
fk_cols.constraint_column_id as no,
fk_col.name as fk_column_name,
' = ' as [join],
pk_col.name as pk_column_name,
fk.name as fk_constraint_name
from sys.foreign_keys fk
inner join sys.tables fk_tab
on fk_tab.object_id = fk.parent_object_id
inner join sys.tables pk_tab
on pk_tab.object_id = fk.referenced_object_id
inner join sys.foreign_key_columns fk_cols
on fk_cols.constraint_object_id = fk.object_id
inner join sys.columns fk_col
on fk_col.column_id = fk_cols.parent_column_id
and fk_col.object_id = fk_tab.object_id
inner join sys.columns pk_col
on pk_col.column_id = fk_cols.referenced_column_id
and pk_col.object_id = pk_tab.object_id
)
select tt.foreign_table, tt.primary_table, tt.fk_constraint_name
,(
SELECT SUBSTRING(
(
SELECT ',' + t.fk_column_name
FROM tt as t
WHERE tt.fk_constraint_name = t.fk_constraint_name FOR XML PATH('')), 2, 200000)
) AS fk_constraint_name
,(
SELECT SUBSTRING(
(
SELECT ',' + t.pk_column_name
FROM tt as t
WHERE tt.fk_constraint_name = t.fk_constraint_name FOR XML PATH('')), 2, 200000)
) AS pk_column_name
from tt
group by tt.foreign_table, tt.primary_table, tt.fk_constraint_name
它 returns table 包含我数据库中的所有外键。
我现在正尝试在 Databricks 上的 Spark 脚本中执行它。
这就是我所做的。
// ch contains my query
val df=spark.sql(ch)
但是我得到这个错误:
ParseException:
no viable alternative at input 'with tt AS\n(\nselect schema_name(fk_tab.schema_id) + '.' + fk_tab.name as foreign_table,\n'>-' as rel,\nschema_name(pk_tab.schema_id) + '.' + pk_tab.name as primary_table,\nfk_cols.constraint_column_id as no,\nfk_col.name as fk_column_name,\n' = ' as
是否无法使用 spark sql 执行此类查询?
如果没有,请问我该怎么做?
好的,所以我想通了,对于这样的请求,应该使用 createStatement:
import java.sql._
val connection = DriverManager.getConnection(jdbcUrl)
val stmt1 = connection.createStatement
val rs: ResultSet = stmt1.executeQuery(ch)
while (rs.next) {
println(rs.getString("col1"))
// ...
}
以下查询在 SQL 服务器中运行良好:
with tt AS
(
select schema_name(fk_tab.schema_id) + '.' + fk_tab.name as foreign_table,
'>-' as rel,
schema_name(pk_tab.schema_id) + '.' + pk_tab.name as primary_table,
fk_cols.constraint_column_id as no,
fk_col.name as fk_column_name,
' = ' as [join],
pk_col.name as pk_column_name,
fk.name as fk_constraint_name
from sys.foreign_keys fk
inner join sys.tables fk_tab
on fk_tab.object_id = fk.parent_object_id
inner join sys.tables pk_tab
on pk_tab.object_id = fk.referenced_object_id
inner join sys.foreign_key_columns fk_cols
on fk_cols.constraint_object_id = fk.object_id
inner join sys.columns fk_col
on fk_col.column_id = fk_cols.parent_column_id
and fk_col.object_id = fk_tab.object_id
inner join sys.columns pk_col
on pk_col.column_id = fk_cols.referenced_column_id
and pk_col.object_id = pk_tab.object_id
)
select tt.foreign_table, tt.primary_table, tt.fk_constraint_name
,(
SELECT SUBSTRING(
(
SELECT ',' + t.fk_column_name
FROM tt as t
WHERE tt.fk_constraint_name = t.fk_constraint_name FOR XML PATH('')), 2, 200000)
) AS fk_constraint_name
,(
SELECT SUBSTRING(
(
SELECT ',' + t.pk_column_name
FROM tt as t
WHERE tt.fk_constraint_name = t.fk_constraint_name FOR XML PATH('')), 2, 200000)
) AS pk_column_name
from tt
group by tt.foreign_table, tt.primary_table, tt.fk_constraint_name
它 returns table 包含我数据库中的所有外键。
我现在正尝试在 Databricks 上的 Spark 脚本中执行它。 这就是我所做的。
// ch contains my query
val df=spark.sql(ch)
但是我得到这个错误:
ParseException:
no viable alternative at input 'with tt AS\n(\nselect schema_name(fk_tab.schema_id) + '.' + fk_tab.name as foreign_table,\n'>-' as rel,\nschema_name(pk_tab.schema_id) + '.' + pk_tab.name as primary_table,\nfk_cols.constraint_column_id as no,\nfk_col.name as fk_column_name,\n' = ' as
是否无法使用 spark sql 执行此类查询? 如果没有,请问我该怎么做?
好的,所以我想通了,对于这样的请求,应该使用 createStatement:
import java.sql._
val connection = DriverManager.getConnection(jdbcUrl)
val stmt1 = connection.createStatement
val rs: ResultSet = stmt1.executeQuery(ch)
while (rs.next) {
println(rs.getString("col1"))
// ...
}