aws Glue 作业:如何在 s3 中合并多个输出 .csv 文件
aws Glue job: how to merge multiple output .csv files in s3
我创建了一个 aws Glue Crawler 和作业。目的是将数据从 postgres RDS 数据库 table 传输到 S3 中的一个 .csv 文件。一切正常,但我在 S3 中总共获得了 19 个文件。除了三个文件外,每个文件都是空的,其中包含一行数据库 table 以及 headers。所以数据库的每一行都被写入一个单独的 .csv 文件。
我可以在这里做什么来指定我只想要一个文件,其中第一行是 headers,然后是数据库的每一行?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("password", "string", "password", "string"), ("user_id", "string", "user_id", "string"), ("username", "string", "username", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
数据库如下所示:
Databse picture
在 S3 中看起来像这样:
S3 Bucket
S3 中的一个示例 .csv 如下所示:
password,user_id,username
346sdfghj45g,user3,dieter
正如我所说,每个 table 行有一个文件。
编辑:
到 s3 的 multipartupload 似乎无法正常工作。它只是上传零件,但在完成后不会将它们合并在一起。这是作业日志的最后几行:
这是日志的最后几行:
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
19/04/04 13:26:41 INFO Executor: Finished task 16.0 in stage 2.0 (TID 18). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00017
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 17.0 in stage 2.0 (TID 19). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO Executor: Finished task 18.0 in stage 2.0 (TID 20). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 19.0 in stage 2.0 (TID 21). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO MemoryStore: MemoryStore cleared
19/04/04 13:26:41 INFO BlockManager: BlockManager stopped
19/04/04 13:26:41 INFO ShutdownHookManager: Shutdown hook called
End of LogType:stderr
你能试试下面的方法吗?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## Force one partition, so it can save only 1 file instead of 19
repartition = applymapping1.repartition(1)
datasink2 = glueContext.write_dynamic_frame.from_options(frame = repartition, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
另外,如果你想查看当前有多少个分区,你可以试试下面的代码。我猜有 19 个,这就是将 19 个文件保存回 s3 的原因:
## Change to Pyspark Dataframe
dataframe = DynamicFrame.toDF(applymapping1)
## Print number of partitions
print(dataframe.rdd.getNumPartitions())
## Change back to DynamicFrame
datasink2 = DynamicFrame.fromDF(dataframe, glueContext, "datasink2")
我创建了一个 aws Glue Crawler 和作业。目的是将数据从 postgres RDS 数据库 table 传输到 S3 中的一个 .csv 文件。一切正常,但我在 S3 中总共获得了 19 个文件。除了三个文件外,每个文件都是空的,其中包含一行数据库 table 以及 headers。所以数据库的每一行都被写入一个单独的 .csv 文件。 我可以在这里做什么来指定我只想要一个文件,其中第一行是 headers,然后是数据库的每一行?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("password", "string", "password", "string"), ("user_id", "string", "user_id", "string"), ("username", "string", "username", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
数据库如下所示: Databse picture
在 S3 中看起来像这样: S3 Bucket
S3 中的一个示例 .csv 如下所示:
password,user_id,username
346sdfghj45g,user3,dieter
正如我所说,每个 table 行有一个文件。
编辑: 到 s3 的 multipartupload 似乎无法正常工作。它只是上传零件,但在完成后不会将它们合并在一起。这是作业日志的最后几行: 这是日志的最后几行:
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
19/04/04 13:26:41 INFO Executor: Finished task 16.0 in stage 2.0 (TID 18). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00017
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 17.0 in stage 2.0 (TID 19). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO Executor: Finished task 18.0 in stage 2.0 (TID 20). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 19.0 in stage 2.0 (TID 21). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO MemoryStore: MemoryStore cleared
19/04/04 13:26:41 INFO BlockManager: BlockManager stopped
19/04/04 13:26:41 INFO ShutdownHookManager: Shutdown hook called
End of LogType:stderr
你能试试下面的方法吗?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## Force one partition, so it can save only 1 file instead of 19
repartition = applymapping1.repartition(1)
datasink2 = glueContext.write_dynamic_frame.from_options(frame = repartition, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
另外,如果你想查看当前有多少个分区,你可以试试下面的代码。我猜有 19 个,这就是将 19 个文件保存回 s3 的原因:
## Change to Pyspark Dataframe
dataframe = DynamicFrame.toDF(applymapping1)
## Print number of partitions
print(dataframe.rdd.getNumPartitions())
## Change back to DynamicFrame
datasink2 = DynamicFrame.fromDF(dataframe, glueContext, "datasink2")