从 S3 文档中获取特定字段

Fetching specific fields from an S3 document

我在我的应用程序中使用 AWS Java SDK 与我的 S3 存储桶之一对话,该存储桶以 JSON 格式保存对象。

文档可能如下所示:

{
    "a" : dataA,
    "b" : dataB,
    "c" : dataC,
    "d" : dataD,
    "e" : dataE
} 

现在,对于某个文档,假设 document1 我需要获取对应于字段 ab 的值,而不是获取整个文档。

这听起来不太可能,因为 S3 存储桶中可以包含任何类型的文档,而不仅仅是 JSONs。

这是可以实现的吗?

实际上是 doable。您可以像您描述的那样执行 selects,但仅限于特定格式:JSON、CSV、Parquet。

假设在 eu-central-1so67315601 存储桶中有一个 data.json 文件:

{
  "a": "dataA",
  "b": "dataB",
  "c": "dataC",
  "d": "dataD",
  "e": "dataE"
}

首先,了解如何通过 S3 控制台 select 字段。使用“对象操作”→“使用 S3 Select 查询”:


AWS Java SDK 1.x

这是使用 AWS Java SDK 1.x:

执行 select 的代码
@ExtendWith(S3.class)
class SelectTest {
    @AWSClient(endpoint = Endpoint.class)
    private AmazonS3 client;

    @Test
    void test() throws IOException {
        // LINES: Each line in the input data contains a single JSON object
        // DOCUMENT: A single JSON object can span multiple lines in the input
        final JSONInput input = new JSONInput();
        input.setType(JSONType.DOCUMENT);

        // Configure input format and compression
        final InputSerialization inputSerialization = new InputSerialization();
        inputSerialization.setJson(input);
        inputSerialization.setCompressionType(CompressionType.NONE);

        // Configure output format
        final OutputSerialization outputSerialization = new OutputSerialization();
        outputSerialization.setJson(new JSONOutput());

        // Build the request
        final SelectObjectContentRequest request = new SelectObjectContentRequest();
        request.setBucketName("so67315601");
        request.setKey("data.json");
        request.setExpression("SELECT s.a, s.b FROM s3object s LIMIT 5");
        request.setExpressionType(ExpressionType.SQL);
        request.setInputSerialization(inputSerialization);
        request.setOutputSerialization(outputSerialization);

        // Run the query
        final SelectObjectContentResult result = client.selectObjectContent(request);

        // Parse the results
        final InputStream stream = result.getPayload().getRecordsInputStream();

        IOUtils.copy(stream, System.out);
    }
}

输出为:

{"a":"dataA","b":"dataB"}

AWS Java SDK 2.x

AWS Java SDK 2.x 的代码更加巧妙。有关详细信息,请参阅 this ticket

@ExtendWith(S3.class)
class SelectTest {
    @AWSClient(endpoint = Endpoint.class)
    private S3AsyncClient client;

    @Test
    void test() throws Exception {
        final InputSerialization inputSerialization = InputSerialization
            .builder()
            .json(JSONInput.builder().type(JSONType.DOCUMENT).build())
            .compressionType(CompressionType.NONE)
            .build();

        final OutputSerialization outputSerialization = OutputSerialization.builder()
            .json(JSONOutput.builder().build())
            .build();

        final SelectObjectContentRequest select = SelectObjectContentRequest.builder()
            .bucket("so67315601")
            .key("data.json")
            .expression("SELECT s.a, s.b FROM s3object s LIMIT 5")
            .expressionType(ExpressionType.SQL)
            .inputSerialization(inputSerialization)
            .outputSerialization(outputSerialization)
            .build();
        final TestHandler handler = new TestHandler();

        client.selectObjectContent(select, handler).get();

        RecordsEvent response = (RecordsEvent) handler.receivedEvents.stream()
            .filter(e -> e.sdkEventType() == SelectObjectContentEventStream.EventType.RECORDS)
            .findFirst()
            .orElse(null);

        System.out.println(response.payload().asUtf8String());
    }

    private static class TestHandler implements SelectObjectContentResponseHandler {
        private SelectObjectContentResponse response;
        private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
        private Throwable exception;

        @Override
        public void responseReceived(SelectObjectContentResponse response) {
            this.response = response;
        }

        @Override
        public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
            publisher.subscribe(receivedEvents::add);
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            exception = throwable;
        }

        @Override
        public void complete() {
        }
    }
}

如您所见,可以通过编程方式制作 S3 select!

您可能想知道 @AWSClient@ExtendWith( S3.class ) 是什么?

这是一个用于在您的测试中注入 AWS 客户端的小型库,名为 aws-junit5. It would greatly simplify your tests. I am the author. The usage 非常简单 — 在您的下一个项目中试试吧!