有没有办法改善 运行 时间或优化 Python / Pyspark 代码?
Is there a way to improve the run time or optimize Python / Pyspark code?
我致力于从 Azure SQL 服务器中提取 table 计数信息超过 350+ tables。由于系统元数据 tables 没有定期刷新,所以我不能依赖它。我写了下面的代码来帮助我实现同样的目标 -
import pyodbc
from pyspark.sql.types import *
pyodbc.pooling = False
def get_table_count(query ,server, username, password, database):
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = conn.cursor()
cursor.execute(query)
row = cursor.fetchone()
columns = StructType([StructField('tableCount', LongType(), True) , StructField('tableName', StringType(), True), StructField('databaseName', StringType(), True)])
data = [(row[0], row[1], row[2])]
df = spark.createDataFrame( data = data,schema = columns)
cursor.close()
del cursor
conn.close()
return df
import pyspark.sql.functions as F
dbList = [ SQLServerDB1 , SQLServerDB1 ]
SQLServerDB1_query = ""
SQLServerDB2_query = ""
for db in dbList:
print("Currently loading for "+db+" database")
serverName = db + "SQLServerName"
serverUser = db + "SQLServerUser"
serverPassword = db + "SQLServerPassword"
serverDB = db + "SQLServerDB"
tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']
for tablename in list(tables):
if tablename != list(tables)[-1]:
vars()["%s_query"%db] = f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
else:
vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )
# exec(f'{db}_DF = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )')
# print(tablename + " Loaded")
低于错误 -
('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Parse error at line: 3, column: 1: Incorrect syntax near 'union'. (103010) (SQLExecDirectW)")
我尝试打印 SQL 语句,它在 SQL 服务器数据库中没有任何问题。
请指出我哪里写错了代码。
尝试使用以下代码,它有效。谢谢大家的建议!
def get_table_count(query ,server, username, password, database):
jdbc_url = f"jdbc:sqlserver://{server}:1433;databaseName={database}"
df_read = spark.read \
.format("jdbc") \
.option("url",jdbc_url) \
.option("query", query) \
.option("user", username) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
df_save = df_read.write.mode('overwrite').parquet('/tmp/' + f"{database}" + '.parquet')
df = spark.read.parquet('/tmp/' + f"{database}" + '.parquet')
return df
import pyspark.sql.functions as F
dbList = [ SQLServerDB1 , SQLServerDB1 ]
SQLServerDB1_query = ""
SQLServerDB2_query = ""
for db in dbList:
print("Currently loading for "+db+" database")
serverName = db + "SQLServerName"
serverUser = db + "SQLServerUser"
serverPassword = db + "SQLServerPassword"
serverDB = db + "SQLServerDB"
tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']
for tablename in list(tables):
if tablename != list(tables)[-1]:
vars()["%s_query"%db] = f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
else:
vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
print(vars()["%s_query"%db])
vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )
vars()["%s_DF"%db].createOrReplaceTempView(f"{db}_tablesCount")
print(f"{db}"+ " Loaded")
我致力于从 Azure SQL 服务器中提取 table 计数信息超过 350+ tables。由于系统元数据 tables 没有定期刷新,所以我不能依赖它。我写了下面的代码来帮助我实现同样的目标 -
import pyodbc
from pyspark.sql.types import *
pyodbc.pooling = False
def get_table_count(query ,server, username, password, database):
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = conn.cursor()
cursor.execute(query)
row = cursor.fetchone()
columns = StructType([StructField('tableCount', LongType(), True) , StructField('tableName', StringType(), True), StructField('databaseName', StringType(), True)])
data = [(row[0], row[1], row[2])]
df = spark.createDataFrame( data = data,schema = columns)
cursor.close()
del cursor
conn.close()
return df
import pyspark.sql.functions as F
dbList = [ SQLServerDB1 , SQLServerDB1 ]
SQLServerDB1_query = ""
SQLServerDB2_query = ""
for db in dbList:
print("Currently loading for "+db+" database")
serverName = db + "SQLServerName"
serverUser = db + "SQLServerUser"
serverPassword = db + "SQLServerPassword"
serverDB = db + "SQLServerDB"
tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']
for tablename in list(tables):
if tablename != list(tables)[-1]:
vars()["%s_query"%db] = f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
else:
vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )
# exec(f'{db}_DF = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )')
# print(tablename + " Loaded")
低于错误 -
('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Parse error at line: 3, column: 1: Incorrect syntax near 'union'. (103010) (SQLExecDirectW)")
我尝试打印 SQL 语句,它在 SQL 服务器数据库中没有任何问题。 请指出我哪里写错了代码。
尝试使用以下代码,它有效。谢谢大家的建议!
def get_table_count(query ,server, username, password, database):
jdbc_url = f"jdbc:sqlserver://{server}:1433;databaseName={database}"
df_read = spark.read \
.format("jdbc") \
.option("url",jdbc_url) \
.option("query", query) \
.option("user", username) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
df_save = df_read.write.mode('overwrite').parquet('/tmp/' + f"{database}" + '.parquet')
df = spark.read.parquet('/tmp/' + f"{database}" + '.parquet')
return df
import pyspark.sql.functions as F
dbList = [ SQLServerDB1 , SQLServerDB1 ]
SQLServerDB1_query = ""
SQLServerDB2_query = ""
for db in dbList:
print("Currently loading for "+db+" database")
serverName = db + "SQLServerName"
serverUser = db + "SQLServerUser"
serverPassword = db + "SQLServerPassword"
serverDB = db + "SQLServerDB"
tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']
for tablename in list(tables):
if tablename != list(tables)[-1]:
vars()["%s_query"%db] = f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
else:
vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
print(vars()["%s_query"%db])
vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )
vars()["%s_DF"%db].createOrReplaceTempView(f"{db}_tablesCount")
print(f"{db}"+ " Loaded")