如何在 AWS Glue PySpark 中 运行 并行线程?
How to run parallel threads in AWS Glue PySpark?
我有一个 spark 作业,它只会从具有相同转换的多个 table 中提取数据。基本上是一个遍历 table 列表的 for 循环,查询目录 table,添加时间戳,然后推送到 Redshift(下面的示例)。
这项工作大约需要 30 分钟才能完成。有没有办法在相同的 spark/glue 上下文中并行 运行 这些?如果可以避免,我不想创建单独的粘合作业。
import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *
# query the runtime arguments
args = getResolvedOptions(
sys.argv,
["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)
# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()
tables = []
for table in tables:
catalog_table = glueContext.create_dynamic_frame.from_catalog(
database="test", table_name=table, transformation_ctx=table
)
data_set = catalog_table.toDF().withColumn(
"batchLoadTimestamp", lit(job_execution_timestamp)
)
# covert back to glue dynamic frame
export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")
# remove null rows from dynamic frame
non_null_records = DropNullFields.apply(
frame=export_frame, transformation_ctx="non_null_records"
)
temp_dir = os.path.join(args["TempDir"], redshift_table_name)
stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=non_null_records,
catalog_connection=args["redshift_catalog_connection"],
connection_options={
"dbtable": f"{args['target_schema']}.{redshift_table_name}",
"database": args["target_database"],
"preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
},
redshift_tmp_dir=temp_dir,
transformation_ctx="stores_redshiftSink",
) ```
您可以执行以下操作来加快此过程
- 启用并发执行作业。
- 分配足够数量的 DPU。
- 将 table 的列表作为参数传递
- 使用 Glue 工作流或步骤函数并行执行作业。
现在假设您有 100 个 table 要摄取,您可以将列表分成 10 个 table,每个 运行 作业并发 10 次。
由于您的数据将并行加载,因此 Glue 作业的时间 运行 将减少,因此产生的成本将减少。
更快的替代方法是直接使用 redshift 实用程序。
- 在 redshift 中创建 table 并保持 batchLoadTimestamp 列默认为 current_timestamp。
- 现在创建复制命令并将数据直接从 s3 加载到 table。
- 运行 使用 Glue 的复制命令 python shell 利用 pg8000 的作业。
为什么这种方法会更快??
因为 spark redshift jdbc 连接器首先将 spark 数据帧卸载到 s3,然后准备一个复制命令到 redshift table。当 运行ning 直接复制命令时,您正在消除 运行ning 卸载命令的开销,并将数据读入 spark df。
我有一个 spark 作业,它只会从具有相同转换的多个 table 中提取数据。基本上是一个遍历 table 列表的 for 循环,查询目录 table,添加时间戳,然后推送到 Redshift(下面的示例)。
这项工作大约需要 30 分钟才能完成。有没有办法在相同的 spark/glue 上下文中并行 运行 这些?如果可以避免,我不想创建单独的粘合作业。
import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *
# query the runtime arguments
args = getResolvedOptions(
sys.argv,
["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)
# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()
tables = []
for table in tables:
catalog_table = glueContext.create_dynamic_frame.from_catalog(
database="test", table_name=table, transformation_ctx=table
)
data_set = catalog_table.toDF().withColumn(
"batchLoadTimestamp", lit(job_execution_timestamp)
)
# covert back to glue dynamic frame
export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")
# remove null rows from dynamic frame
non_null_records = DropNullFields.apply(
frame=export_frame, transformation_ctx="non_null_records"
)
temp_dir = os.path.join(args["TempDir"], redshift_table_name)
stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=non_null_records,
catalog_connection=args["redshift_catalog_connection"],
connection_options={
"dbtable": f"{args['target_schema']}.{redshift_table_name}",
"database": args["target_database"],
"preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
},
redshift_tmp_dir=temp_dir,
transformation_ctx="stores_redshiftSink",
) ```
您可以执行以下操作来加快此过程
- 启用并发执行作业。
- 分配足够数量的 DPU。
- 将 table 的列表作为参数传递
- 使用 Glue 工作流或步骤函数并行执行作业。
现在假设您有 100 个 table 要摄取,您可以将列表分成 10 个 table,每个 运行 作业并发 10 次。
由于您的数据将并行加载,因此 Glue 作业的时间 运行 将减少,因此产生的成本将减少。
更快的替代方法是直接使用 redshift 实用程序。
- 在 redshift 中创建 table 并保持 batchLoadTimestamp 列默认为 current_timestamp。
- 现在创建复制命令并将数据直接从 s3 加载到 table。
- 运行 使用 Glue 的复制命令 python shell 利用 pg8000 的作业。
为什么这种方法会更快?? 因为 spark redshift jdbc 连接器首先将 spark 数据帧卸载到 s3,然后准备一个复制命令到 redshift table。当 运行ning 直接复制命令时,您正在消除 运行ning 卸载命令的开销,并将数据读入 spark df。