AWS Glue python 作业限制写入 S3 存储桶的数据量?
AWS Glue python job limits the data amount to write in S3 bucket?
我创建了一个 Glue 作业来从 glue 目录中读取数据并将其以 parquet 格式保存到 s3 存储桶中。它工作正常,但项目数量限制为 20。因此每次触发作业时,桶中只保存 20 个项目,我想保存所有项目。也许我在 python 脚本中遗漏了一些额外的 属性。
这是脚本(由 AWS 生成):
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
transformation_ctx = "datasource0"]
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cargoprobe_data", table_name = "dev_scv_completed_executions", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [*field list*], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
这是在后台自动完成的,称为分区。您可以通过调用
重新分区
partitioned_df = dropnullfields3.repartition(1)
将您的 DynamicFrame
重新分区到一个文件。
我创建了一个 Glue 作业来从 glue 目录中读取数据并将其以 parquet 格式保存到 s3 存储桶中。它工作正常,但项目数量限制为 20。因此每次触发作业时,桶中只保存 20 个项目,我想保存所有项目。也许我在 python 脚本中遗漏了一些额外的 属性。
这是脚本(由 AWS 生成):
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
transformation_ctx = "datasource0"]
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cargoprobe_data", table_name = "dev_scv_completed_executions", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [*field list*], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://bucketname"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
这是在后台自动完成的,称为分区。您可以通过调用
重新分区partitioned_df = dropnullfields3.repartition(1)
将您的 DynamicFrame
重新分区到一个文件。