如何从 Databricks 集群 运行 SQL 语句
How to run SQL statement from Databricks cluster
我有一个处理各种 table 的 Azure Databricks 集群,然后作为最后一步,我将这些 table 推送到 Azure SQL 服务器以供其他一些进程使用.我在数据块中有一个看起来像这样的单元格:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
这按预期工作。我遇到的问题是订单 table 非常大,每天只有一小部分行可能会发生变化,所以我想做的是将覆盖模式更改为追加模式并更改数据框架从整个 table 到仅可能已更改的行。所有这些我都知道如何轻松完成,但我想做的是 运行 一个针对 Azure SQL 数据库的简单 SQL 语句,以删除已经要删除的行在那里,以便他们可能更改的行将被插入。
我想 运行 针对 Azure SQL 数据库的 SQL 语句,例如
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
您需要使用 pyodbc 库。您可以连接并使用 sql 语句。
import pyodbc
conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=mydatabe.database.azure.net;'
'DATABASE=AdventureWorks;UID=jonnyFast;'
'PWD=MyPassword')
# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
不幸的是,让它在数据块上工作有点痛苦。我不久前写了一篇博客 post,应该会有帮助。
https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark
我想分享我的发现。
1) pyodbc - 我问了微软技术支持,得到的答案如下:
####========================================================
### cell 1: install pyodbc
####========================================================
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev
sudo apt-get install python3-pip -y
pip3 install --upgrade pyodbc
####========================================================
### cell 2: connect
####========================================================
import pyodbc
conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=xxxxx.database.windows.net;'
'DATABASE=xxxxx;UID=xxxxx;'
'PWD=xxxxx')
####========================================================
### cell 3: create table
####========================================================
conn.execute('CREATE TABLE dbo.Bob (Bob1 VARCHAR(30), Bob2 VARCHAR(30))')
####========================================================
### cell 4: insert into table
####========================================================
conn.execute('INSERT INTO dbo.Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
备注:
a) 运行时:6.2(Scala 2.11、Spark 2.4.4)
b) 此 Runtime 版本仅支持 Python 3.
2) Spark connector for Azure SQL Database and SQL Server - 在谷歌搜索安装 pyodbc 的解决方案时,我找到了这个。我更喜欢这个,我要试一试。
我有一个处理各种 table 的 Azure Databricks 集群,然后作为最后一步,我将这些 table 推送到 Azure SQL 服务器以供其他一些进程使用.我在数据块中有一个看起来像这样的单元格:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
这按预期工作。我遇到的问题是订单 table 非常大,每天只有一小部分行可能会发生变化,所以我想做的是将覆盖模式更改为追加模式并更改数据框架从整个 table 到仅可能已更改的行。所有这些我都知道如何轻松完成,但我想做的是 运行 一个针对 Azure SQL 数据库的简单 SQL 语句,以删除已经要删除的行在那里,以便他们可能更改的行将被插入。
我想 运行 针对 Azure SQL 数据库的 SQL 语句,例如
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
您需要使用 pyodbc 库。您可以连接并使用 sql 语句。
import pyodbc
conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=mydatabe.database.azure.net;'
'DATABASE=AdventureWorks;UID=jonnyFast;'
'PWD=MyPassword')
# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
不幸的是,让它在数据块上工作有点痛苦。我不久前写了一篇博客 post,应该会有帮助。 https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark
我想分享我的发现。
1) pyodbc - 我问了微软技术支持,得到的答案如下:
####========================================================
### cell 1: install pyodbc
####========================================================
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev
sudo apt-get install python3-pip -y
pip3 install --upgrade pyodbc
####========================================================
### cell 2: connect
####========================================================
import pyodbc
conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=xxxxx.database.windows.net;'
'DATABASE=xxxxx;UID=xxxxx;'
'PWD=xxxxx')
####========================================================
### cell 3: create table
####========================================================
conn.execute('CREATE TABLE dbo.Bob (Bob1 VARCHAR(30), Bob2 VARCHAR(30))')
####========================================================
### cell 4: insert into table
####========================================================
conn.execute('INSERT INTO dbo.Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
备注: a) 运行时:6.2(Scala 2.11、Spark 2.4.4) b) 此 Runtime 版本仅支持 Python 3.
2) Spark connector for Azure SQL Database and SQL Server - 在谷歌搜索安装 pyodbc 的解决方案时,我找到了这个。我更喜欢这个,我要试一试。