执行合并操作(upsert)时 databricks delta table 的性能改进
performance improvement on databricks delta table while performing merge operation (upsert)
我有一个 delta table 'targetTable',它有 350 亿条记录。
我每天从源中获取1亿条记录,我必须对targetTable进行upsert操作。
目标表:
- 25 列
- 250 亿条记录
- 每行唯一的 1 列 commID
截至目前 - 完成这项工作需要 45 分钟 - 1 小时,并且还在不断增加。
是否使用 Z 顺序或文件分区以获得更好的性能或任何其他建议
您可以使用并行线程概念。请找到我们为 s3 编写处理的示例代码。您可以为 adls 编写调整相同的逻辑。
dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",1000000) \
.load()
print(data_df.count())
# DBTITLE 1,Multithreaded S3 raw/server write
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
curr_df = data_df.filter(f"dt_id='{curr_date}'")
print(f"curr_date - {curr_date} and count - {curr_df.count()}")
curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
print(f"completed for {curr_date}")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
total_days = abs(end_date-start_date).days
print(f"total days - {total_days}. Creating {total_days} threads..")
jobs = []
results_done = []
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")
我有一个 delta table 'targetTable',它有 350 亿条记录。 我每天从源中获取1亿条记录,我必须对targetTable进行upsert操作。
目标表:
- 25 列
- 250 亿条记录
- 每行唯一的 1 列 commID
截至目前 - 完成这项工作需要 45 分钟 - 1 小时,并且还在不断增加。
是否使用 Z 顺序或文件分区以获得更好的性能或任何其他建议
您可以使用并行线程概念。请找到我们为 s3 编写处理的示例代码。您可以为 adls 编写调整相同的逻辑。
dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",1000000) \
.load()
print(data_df.count())
# DBTITLE 1,Multithreaded S3 raw/server write
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
curr_df = data_df.filter(f"dt_id='{curr_date}'")
print(f"curr_date - {curr_date} and count - {curr_df.count()}")
curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
print(f"completed for {curr_date}")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
total_days = abs(end_date-start_date).days
print(f"total days - {total_days}. Creating {total_days} threads..")
jobs = []
results_done = []
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")