有没有办法改善 运行 时间或优化 Python / Pyspark 代码?

Is there a way to improve the run time or optimize Python / Pyspark code?

我致力于从 Azure SQL 服务器中提取 table 计数信息超过 350+ tables。由于系统元数据 tables 没有定期刷新,所以我不能依赖它。我写了下面的代码来帮助我实现同样的目标 -

import pyodbc
from pyspark.sql.types import *
pyodbc.pooling = False

def get_table_count(query ,server, username, password, database):

  conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
  cursor = conn.cursor()  
  cursor.execute(query)
  row = cursor.fetchone()
  
  columns = StructType([StructField('tableCount', LongType(), True) , StructField('tableName', StringType(), True), StructField('databaseName', StringType(), True)])
  data = [(row[0], row[1], row[2])]  
  df = spark.createDataFrame( data = data,schema = columns)

  cursor.close()
  del cursor
  conn.close()
  
  return df
import pyspark.sql.functions  as F

dbList = [ SQLServerDB1 , SQLServerDB1 ]

SQLServerDB1_query = ""
SQLServerDB2_query = ""

for db in dbList:
  print("Currently loading for "+db+" database")
  serverName = db + "SQLServerName"
  serverUser = db + "SQLServerUser"
  serverPassword = db + "SQLServerPassword"
  serverDB = db + "SQLServerDB"  
  tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']

  for tablename in list(tables):
    if tablename != list(tables)[-1]:
      vars()["%s_query"%db] = f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
    else:
      vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
    
    vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )

#     exec(f'{db}_DF = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )')

    
#     print(tablename + " Loaded")

低于错误 -

('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Parse error at line: 3, column: 1: Incorrect syntax near 'union'. (103010) (SQLExecDirectW)")

我尝试打印 SQL 语句,它在 SQL 服务器数据库中没有任何问题。 请指出我哪里写错了代码。

尝试使用以下代码,它有效。谢谢大家的建议!

def get_table_count(query ,server, username, password, database):
  
  jdbc_url = f"jdbc:sqlserver://{server}:1433;databaseName={database}"
  
  df_read = spark.read \
            .format("jdbc") \
            .option("url",jdbc_url) \
            .option("query", query) \
            .option("user", username) \
            .option("password", password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()
  
  df_save = df_read.write.mode('overwrite').parquet('/tmp/' + f"{database}" + '.parquet')
  
  df = spark.read.parquet('/tmp/' + f"{database}" + '.parquet')
    
  return df
import pyspark.sql.functions  as F

dbList = [ SQLServerDB1 , SQLServerDB1 ]

SQLServerDB1_query = ""
SQLServerDB2_query = ""

for db in dbList:
  print("Currently loading for "+db+" database")
  serverName = db + "SQLServerName"
  serverUser = db + "SQLServerUser"
  serverPassword = db + "SQLServerPassword"
  serverDB = db + "SQLServerDB"  
  tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']

  for tablename in list(tables):
    if tablename != list(tables)[-1]:
      vars()["%s_query"%db] = f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
    else:
      vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"    
    
  print(vars()["%s_query"%db])
    
  vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )
  
  vars()["%s_DF"%db].createOrReplaceTempView(f"{db}_tablesCount")
  
  print(f"{db}"+ " Loaded")