如何使用 AvroParquetWriter 并通过 AmazonS3 api 写入 S3?

How can I use the AvroParquetWriter and write to S3 via the AmazonS3 api?

我目前正在使用下面的代码通过 Avro 编写镶木地板。此代码将其写入文件系统,但我想写入 S3。

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

这可以很好地写入文件,但如何才能将其流式传输到 AmazonS3 api?我在网上找到了一些使用 Hadoop-aws jar 的代码,但这需要一些 Windows exe 文件才能工作,当然,我们希望避免这种情况。目前我只使用:

 <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

所以问题是,有没有办法拦截 AvroParquetWriter 上的输出流,以便我可以将其流式传输到 S3?我想这样做的主要原因是重试。 S3 自动重试最多 3 次。这对我们有很大帮助。

希望我没有误解这个问题,但您在这里所做的似乎是将 avro 转换为镶木地板,并且您想将镶木地板上传到 s3

关闭 ParquetWriter 后,您应该调用如下所示的方法(假设这不会拦截从 avro 到 parquet 的流写入,它只是流式传输不再写入的 parquet 文件):

        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
        S3Path outputPath = new S3Path();
        outputPath.setBucket("YOUR_BUCKET");
        outputPath.setKey("YOUR_FOLDER_PATH");
        try {
            InputStream parquetStream = new FileInputStream(new File(parquetFile));
            s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

使用 AWS SDK

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.749</version>
</dependency>

当然,该方法将驻留在不同的 utils class 中,并且此方法的构造函数应使用凭据初始化 AmazonS3 s3Client,因此您需要做的就是调用和访问它的 s3Client 成员放置对象

希望这对您有所帮助

这确实取决于 hadoop-aws jar,所以如果您不愿意使用它,我不确定我能否帮助您。但是,我 运行 在 mac 上并且没有任何 windows exe 文件,所以我不确定你说的这些是从哪里来的。 AvroParquetWriter 已经依赖于 Hadoop,所以即使这种额外的依赖性对你来说是不可接受的,但对其他人来说可能不是什么大问题:

您可以使用 AvroParquetWriter 直接流式传输到 S3,方法是将其传递给使用 URI 参数创建的 Hadoop 路径并设置适当的配置。

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

我使用了以下依赖项(sbt 格式):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"