Apache Flink - 将流写入 S3 错误 - 空 uri 主机

Apache Flink - writing stream to S3 error - null uri host

我有一个 Flink 数据管道,可以转换从 S3 下载的日志文件,并以 parquet 文件格式写回另一个 S3 存储桶。我已经在 flink-conf.yaml 中使用

配置了 S3 密钥和机密
s3.access-key: "key"
s3.secret-key: "secret"

另外将 flink-s3-fs-hadoop-1.15.0.jar & aws-java-sdk-1.12.217.jar 复制到 FLINK_HOME/plugins/s3-fs-presto 目录。

当使用 flink run 命令将作业提交到集群时,出现以下异常

Caused by: java.io.IOException: null uri host.
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter(FileSink.java:669)
    at org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer(FileSink.java:660)
    at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    ... 35 more
Caused by: java.lang.NullPointerException: null uri host.
    at java.base/java.util.Objects.requireNonNull(Objects.java:246)
    at org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:73)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:486)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:246)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
    ... 41 more

将数据写入s3的代码如下

public static FileSink<Recc> getFileSink() {
        Item<Recc> item = new Item<>(Recc.class);
        ParquetWriterFactory<Recc> factory = AvroParquetWriters.forReflectRecord(item.getT());
        Path path = new Path("s3a:", "log-backup/", "vvra/logs"); //s3 also tried
        return FileSink.forBulkFormat(
                path,
                factory)
                .build();
    }

我什至尝试了 minio,但我仍然遇到同样的错误。如何解决这个问题?还需要配置什么?

问题是 :(冒号)。没有明确记录

您只需要去掉方案名称中的冒号“:”即可。作为 java.net.URI 状态的 JavaDocs:

... If a scheme is given then it is appended to the result, followed by a colon character (':'). ...

org.apache.flink.core.fs.Path 在幕后依赖 java.net.URI