使用 dataproc hadoop 集群和气流将数据从 google 云存储移动到 S3
Move data from google cloud storage to S3 using dataproc hadoop cluster and airflow
我正在尝试将大量数据从 GCS 传输到 S3 存储桶。我已经使用 Google DataProc 启动了一个 hadoop 集群。
我可以使用以下命令通过 Hadoop CLI 运行 作业:
hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
我是 mapreduce 和 hadoop 的新手。我正在尝试使用 DataProcHadoopOperator
:
将其添加到我的气流工作流程中
export_to_s3 = DataProcHadoopOperator(
task_id='export_to_s3',
main_jar=None,
main_class=None,
arguments=None,
archives=None,
files=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name='optimize-m',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
region='global',
dag=dag
)
我的气流在计算引擎实例上被 dockerized 运行ning。
我不确定如何制作它以创建以下作业
hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
我遵循了建议并构建了以下气流任务:
export_to_s3 = DataProcHadoopOperator(
task_id='export_to_s3',
main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
main_class=None,
arguments='-update gs://umg-comm-tech-dev/data/apollo/QA/ s3a://[mys3accessid]:[mys3secret]@s3://umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' '),
archives=None,
files=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name='optimize',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
region='global',
dag=dag
)
但是我现在收到以下错误:
18/01/18 10:13:42 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.2-hadoop2
18/01/18 10:13:42 WARN s3native.S3xLoginHelper: The Filesystem URI contains login details. This is insecure and may be unsupported in future.
18/01/18 10:13:43 WARN s3a.S3AFileSystem: Client: Amazon S3 error 400: 400 Bad Request; Bad Request (retryable)
com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
18/01/18 10:13:43 ERROR tools.DistCp: Invalid arguments:
org.apache.hadoop.fs.s3a.AWSS3IOException: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:282)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
... 18 more
Invalid arguments: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
usage: distcp OPTIONS [source_path...] <target_path>
OPTIONS
-append Reuse existing data in target files and
append new data to them if possible
-async Should distcp execution be blocking
-atomic Commit all changes or none
-bandwidth <arg> Specify bandwidth per map in MB
-delete Delete from target, files missing in source
-diff <arg> Use snapshot diff report to identify the
difference between source and target
-f <arg> List of files that need to be copied
-filelimit <arg> (Deprecated!) Limit number of files copied
to <= n
-filters <arg> The path to a file containing a list of
strings for paths to be excluded from the
copy.
-i Ignore failures during copy
-log <arg> Folder on DFS where distcp execution logs
are saved
-m <arg> Max number of concurrent maps to use for
copy
-mapredSslConf <arg> Configuration for ssl config file, to use
with hftps://. Must be in the classpath.
-numListstatusThreads <arg> Number of threads to use for building file
listing (max 40).
-overwrite Choose to overwrite target files
unconditionally, even if they exist.
-p <arg> preserve status (rbugpcaxt)(replication,
block-size, user, group, permission,
checksum-type, ACL, XATTR, timestamps). If
-p is specified with no <arg>, then
preserves replication, block size, user,
group, permission, checksum type and
timestamps. raw.* xattrs are preserved when
both the source and destination paths are
in the /.reserved/raw hierarchy (HDFS
only). raw.* xattrpreservation is
independent of the -p flag. Refer to the
DistCp documentation for more details.
-sizelimit <arg> (Deprecated!) Limit number of files copied
to <= n bytes
-skipcrccheck Whether to skip CRC checks between source
and target paths.
-strategy <arg> Copy strategy to use. Default is dividing
work based on file sizes
-tmp <arg> Intermediate work path to be used for
atomic commit
-update Update target, copying only missingfiles or
directories
非常感谢您的帮助。
谢谢!
为什么要使用数据处理? gsutil 命令不是更简单吗?
例如:
gsutil -m rsync -r gs://GCS s3://S3
这样的操作会将您的数据从 GCS 移动到 S3,请注意那里的 -m 标志,这意味着它将使用并行同步上传,您也可以使用其他标志在原点删除 -d。
我不熟悉 Airflow,但看起来 DataProcHadoopOperator 只是 gcloud dataproc jobs submit hadoop
.
的包装器
我相信 hadoop distcp
只是 hadoop jar /usr/lib/hadoop-mapreduce/hadoop-distcp.jar
的包装器,因此通过 Dataproc API 提交 distcp 将如下所示:
gcloud dataproc jobs submit hadoop --cluster=optimize --jar file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar -- -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
所以对于气流你可能想要这样的东西:
main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
arguments='-update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder'.split(' '),
仅供参考,如果 optimize-m
是主 VM 名称,则 airflow 的集群参数可能只是 optimize
。
我正在尝试将大量数据从 GCS 传输到 S3 存储桶。我已经使用 Google DataProc 启动了一个 hadoop 集群。
我可以使用以下命令通过 Hadoop CLI 运行 作业:
hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
我是 mapreduce 和 hadoop 的新手。我正在尝试使用 DataProcHadoopOperator
:
export_to_s3 = DataProcHadoopOperator(
task_id='export_to_s3',
main_jar=None,
main_class=None,
arguments=None,
archives=None,
files=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name='optimize-m',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
region='global',
dag=dag
)
我的气流在计算引擎实例上被 dockerized 运行ning。
我不确定如何制作它以创建以下作业
hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
我遵循了建议并构建了以下气流任务:
export_to_s3 = DataProcHadoopOperator(
task_id='export_to_s3',
main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
main_class=None,
arguments='-update gs://umg-comm-tech-dev/data/apollo/QA/ s3a://[mys3accessid]:[mys3secret]@s3://umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' '),
archives=None,
files=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name='optimize',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
region='global',
dag=dag
)
但是我现在收到以下错误:
18/01/18 10:13:42 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.2-hadoop2
18/01/18 10:13:42 WARN s3native.S3xLoginHelper: The Filesystem URI contains login details. This is insecure and may be unsupported in future.
18/01/18 10:13:43 WARN s3a.S3AFileSystem: Client: Amazon S3 error 400: 400 Bad Request; Bad Request (retryable)
com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
18/01/18 10:13:43 ERROR tools.DistCp: Invalid arguments:
org.apache.hadoop.fs.s3a.AWSS3IOException: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:282)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
... 18 more
Invalid arguments: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
usage: distcp OPTIONS [source_path...] <target_path>
OPTIONS
-append Reuse existing data in target files and
append new data to them if possible
-async Should distcp execution be blocking
-atomic Commit all changes or none
-bandwidth <arg> Specify bandwidth per map in MB
-delete Delete from target, files missing in source
-diff <arg> Use snapshot diff report to identify the
difference between source and target
-f <arg> List of files that need to be copied
-filelimit <arg> (Deprecated!) Limit number of files copied
to <= n
-filters <arg> The path to a file containing a list of
strings for paths to be excluded from the
copy.
-i Ignore failures during copy
-log <arg> Folder on DFS where distcp execution logs
are saved
-m <arg> Max number of concurrent maps to use for
copy
-mapredSslConf <arg> Configuration for ssl config file, to use
with hftps://. Must be in the classpath.
-numListstatusThreads <arg> Number of threads to use for building file
listing (max 40).
-overwrite Choose to overwrite target files
unconditionally, even if they exist.
-p <arg> preserve status (rbugpcaxt)(replication,
block-size, user, group, permission,
checksum-type, ACL, XATTR, timestamps). If
-p is specified with no <arg>, then
preserves replication, block size, user,
group, permission, checksum type and
timestamps. raw.* xattrs are preserved when
both the source and destination paths are
in the /.reserved/raw hierarchy (HDFS
only). raw.* xattrpreservation is
independent of the -p flag. Refer to the
DistCp documentation for more details.
-sizelimit <arg> (Deprecated!) Limit number of files copied
to <= n bytes
-skipcrccheck Whether to skip CRC checks between source
and target paths.
-strategy <arg> Copy strategy to use. Default is dividing
work based on file sizes
-tmp <arg> Intermediate work path to be used for
atomic commit
-update Update target, copying only missingfiles or
directories
非常感谢您的帮助。
谢谢!
为什么要使用数据处理? gsutil 命令不是更简单吗?
例如:
gsutil -m rsync -r gs://GCS s3://S3
这样的操作会将您的数据从 GCS 移动到 S3,请注意那里的 -m 标志,这意味着它将使用并行同步上传,您也可以使用其他标志在原点删除 -d。
我不熟悉 Airflow,但看起来 DataProcHadoopOperator 只是 gcloud dataproc jobs submit hadoop
.
我相信 hadoop distcp
只是 hadoop jar /usr/lib/hadoop-mapreduce/hadoop-distcp.jar
的包装器,因此通过 Dataproc API 提交 distcp 将如下所示:
gcloud dataproc jobs submit hadoop --cluster=optimize --jar file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar -- -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder
所以对于气流你可能想要这样的东西:
main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
arguments='-update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder'.split(' '),
仅供参考,如果 optimize-m
是主 VM 名称,则 airflow 的集群参数可能只是 optimize
。