为什么我的 aws glue 作业只使用一个执行程序和驱动程序?
why is my aws glue job uses only one executor and the driver?
在我的脚本中,我在 pyspark 中将所有 dynamicframe
转换为 dataframe
,并执行 groupby
和 join
操作。然后在matrics
视图中发现无论我设置多少DPU都只有一个executor是active的
作业在大约 2 小时后失败
Diagnostics: Container
[pid=8417,containerID=container_1532458272694_0001_01_000001] is
running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB
physical memory used; 7.7 GB of 27.5 GB virtual memory used. Killing
container.
我有大约 20 亿行数据。我的 DPU
设置为 80。
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")
applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))
df1.join(df2, "fieldB")
result = DynamicFrame.fromDF(result_joined, glueContext, "result")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://test-bucket"}, format = "json", transformation_ctx = "datasink2")
job.commit()
我错过了什么吗?
尝试repartition
你的DataFrame
。您可以重新分区 based on a column, or to an arbitrary number of partitions or both.
像这样:
df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))
df1_r = df1.repartition(df1("fieldB"))
df2_r = df2.repartition(df2("fieldB"))
df1_r.join(df2_r, "fieldB")
原来是因为我的输入数据太大,所以卡在一开始只有一个executor是active的。计算开始后,我会看到多个执行程序处于活动状态。
df1.repartition(df1("fieldB"))
实际上使它变慢了,也许我没有正确使用它。
在我的脚本中,我在 pyspark 中将所有 dynamicframe
转换为 dataframe
,并执行 groupby
和 join
操作。然后在matrics
视图中发现无论我设置多少DPU都只有一个executor是active的
作业在大约 2 小时后失败
Diagnostics: Container [pid=8417,containerID=container_1532458272694_0001_01_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.7 GB of 27.5 GB virtual memory used. Killing container.
我有大约 20 亿行数据。我的 DPU
设置为 80。
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "in_json", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "out_json", transformation_ctx = "datasource0")
applymapping0 = ApplyMapping.apply(frame = datasource0, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = [("fieldA", "int", "fieldA", "int"), ("fieldB", "string", "fieldB", "string")], transformation_ctx = "applymapping1")
df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))
df1.join(df2, "fieldB")
result = DynamicFrame.fromDF(result_joined, glueContext, "result")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://test-bucket"}, format = "json", transformation_ctx = "datasink2")
job.commit()
我错过了什么吗?
尝试repartition
你的DataFrame
。您可以重新分区 based on a column, or to an arbitrary number of partitions or both.
像这样:
df1 = applymapping0.toDF().groupBy("fieldA").agg(count('*').alias("total_number_1"))
df2 = applymapping1.toDF().groupBy("fieldA").agg(count('*').alias("total_number_2"))
df1_r = df1.repartition(df1("fieldB"))
df2_r = df2.repartition(df2("fieldB"))
df1_r.join(df2_r, "fieldB")
原来是因为我的输入数据太大,所以卡在一开始只有一个executor是active的。计算开始后,我会看到多个执行程序处于活动状态。
df1.repartition(df1("fieldB"))
实际上使它变慢了,也许我没有正确使用它。