如何使用 AvroParquetWriter 并通过 AmazonS3 api 写入 S3?
How can I use the AvroParquetWriter and write to S3 via the AmazonS3 api?
我目前正在使用下面的代码通过 Avro 编写镶木地板。此代码将其写入文件系统,但我想写入 S3。
try {
StopWatch sw = StopWatch.createStarted();
Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
final String parquetFile = "parquet/data.parquet";
final Path path = new Path(parquetFile);
ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withConf(new org.apache.hadoop.conf.Configuration())
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
.build();
for (Map<String, Object> row : message.getTransformedMessage()) {
StopWatch stopWatch = StopWatch.createStarted();
final GenericRecord record = new GenericData.Record(avroSchema);
row.forEach((k, v) -> {
record.put(k, v);
});
writer.write(record);
}
//todo: Write to S3. We should probably write via the AWS objects. This does not show that.
//
writer.close();
System.out.println("Total Time: " + sw);
} catch (Exception e) {
//do somethign here. retryable? non-retryable? Wrap this excetion in one of these?
transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
}
这可以很好地写入文件,但如何才能将其流式传输到 AmazonS3 api?我在网上找到了一些使用 Hadoop-aws jar 的代码,但这需要一些 Windows exe 文件才能工作,当然,我们希望避免这种情况。目前我只使用:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
所以问题是,有没有办法拦截 AvroParquetWriter 上的输出流,以便我可以将其流式传输到 S3?我想这样做的主要原因是重试。 S3 自动重试最多 3 次。这对我们有很大帮助。
希望我没有误解这个问题,但您在这里所做的似乎是将 avro 转换为镶木地板,并且您想将镶木地板上传到 s3
关闭 ParquetWriter 后,您应该调用如下所示的方法(假设这不会拦截从 avro 到 parquet 的流写入,它只是流式传输不再写入的 parquet 文件):
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
S3Path outputPath = new S3Path();
outputPath.setBucket("YOUR_BUCKET");
outputPath.setKey("YOUR_FOLDER_PATH");
try {
InputStream parquetStream = new FileInputStream(new File(parquetFile));
s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
使用 AWS SDK
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.749</version>
</dependency>
当然,该方法将驻留在不同的 utils class 中,并且此方法的构造函数应使用凭据初始化 AmazonS3 s3Client,因此您需要做的就是调用和访问它的 s3Client 成员放置对象
希望这对您有所帮助
这确实取决于 hadoop-aws jar,所以如果您不愿意使用它,我不确定我能否帮助您。但是,我 运行 在 mac 上并且没有任何 windows exe 文件,所以我不确定你说的这些是从哪里来的。 AvroParquetWriter 已经依赖于 Hadoop,所以即使这种额外的依赖性对你来说是不可接受的,但对其他人来说可能不是什么大问题:
您可以使用 AvroParquetWriter 直接流式传输到 S3,方法是将其传递给使用 URI 参数创建的 Hadoop 路径并设置适当的配置。
val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)
val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)
val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()
我使用了以下依赖项(sbt 格式):
"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"
我目前正在使用下面的代码通过 Avro 编写镶木地板。此代码将其写入文件系统,但我想写入 S3。
try {
StopWatch sw = StopWatch.createStarted();
Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
final String parquetFile = "parquet/data.parquet";
final Path path = new Path(parquetFile);
ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withConf(new org.apache.hadoop.conf.Configuration())
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
.build();
for (Map<String, Object> row : message.getTransformedMessage()) {
StopWatch stopWatch = StopWatch.createStarted();
final GenericRecord record = new GenericData.Record(avroSchema);
row.forEach((k, v) -> {
record.put(k, v);
});
writer.write(record);
}
//todo: Write to S3. We should probably write via the AWS objects. This does not show that.
//
writer.close();
System.out.println("Total Time: " + sw);
} catch (Exception e) {
//do somethign here. retryable? non-retryable? Wrap this excetion in one of these?
transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
}
这可以很好地写入文件,但如何才能将其流式传输到 AmazonS3 api?我在网上找到了一些使用 Hadoop-aws jar 的代码,但这需要一些 Windows exe 文件才能工作,当然,我们希望避免这种情况。目前我只使用:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
所以问题是,有没有办法拦截 AvroParquetWriter 上的输出流,以便我可以将其流式传输到 S3?我想这样做的主要原因是重试。 S3 自动重试最多 3 次。这对我们有很大帮助。
希望我没有误解这个问题,但您在这里所做的似乎是将 avro 转换为镶木地板,并且您想将镶木地板上传到 s3
关闭 ParquetWriter 后,您应该调用如下所示的方法(假设这不会拦截从 avro 到 parquet 的流写入,它只是流式传输不再写入的 parquet 文件):
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
S3Path outputPath = new S3Path();
outputPath.setBucket("YOUR_BUCKET");
outputPath.setKey("YOUR_FOLDER_PATH");
try {
InputStream parquetStream = new FileInputStream(new File(parquetFile));
s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
使用 AWS SDK
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.749</version>
</dependency>
当然,该方法将驻留在不同的 utils class 中,并且此方法的构造函数应使用凭据初始化 AmazonS3 s3Client,因此您需要做的就是调用和访问它的 s3Client 成员放置对象
希望这对您有所帮助
这确实取决于 hadoop-aws jar,所以如果您不愿意使用它,我不确定我能否帮助您。但是,我 运行 在 mac 上并且没有任何 windows exe 文件,所以我不确定你说的这些是从哪里来的。 AvroParquetWriter 已经依赖于 Hadoop,所以即使这种额外的依赖性对你来说是不可接受的,但对其他人来说可能不是什么大问题:
您可以使用 AvroParquetWriter 直接流式传输到 S3,方法是将其传递给使用 URI 参数创建的 Hadoop 路径并设置适当的配置。
val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)
val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)
val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()
我使用了以下依赖项(sbt 格式):
"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"