从 S3 文档中获取特定字段
Fetching specific fields from an S3 document
我在我的应用程序中使用 AWS Java SDK 与我的 S3 存储桶之一对话,该存储桶以 JSON 格式保存对象。
文档可能如下所示:
{
"a" : dataA,
"b" : dataB,
"c" : dataC,
"d" : dataD,
"e" : dataE
}
现在,对于某个文档,假设 document1
我需要获取对应于字段 a
和 b
的值,而不是获取整个文档。
这听起来不太可能,因为 S3 存储桶中可以包含任何类型的文档,而不仅仅是 JSONs。
这是可以实现的吗?
实际上是 doable。您可以像您描述的那样执行 selects,但仅限于特定格式:JSON、CSV、Parquet。
假设在 eu-central-1
的 so67315601
存储桶中有一个 data.json
文件:
{
"a": "dataA",
"b": "dataB",
"c": "dataC",
"d": "dataD",
"e": "dataE"
}
首先,了解如何通过 S3 控制台 select 字段。使用“对象操作”→“使用 S3 Select 查询”:
AWS Java SDK 1.x
这是使用 AWS Java SDK 1.x:
执行 select 的代码
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private AmazonS3 client;
@Test
void test() throws IOException {
// LINES: Each line in the input data contains a single JSON object
// DOCUMENT: A single JSON object can span multiple lines in the input
final JSONInput input = new JSONInput();
input.setType(JSONType.DOCUMENT);
// Configure input format and compression
final InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setJson(input);
inputSerialization.setCompressionType(CompressionType.NONE);
// Configure output format
final OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setJson(new JSONOutput());
// Build the request
final SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName("so67315601");
request.setKey("data.json");
request.setExpression("SELECT s.a, s.b FROM s3object s LIMIT 5");
request.setExpressionType(ExpressionType.SQL);
request.setInputSerialization(inputSerialization);
request.setOutputSerialization(outputSerialization);
// Run the query
final SelectObjectContentResult result = client.selectObjectContent(request);
// Parse the results
final InputStream stream = result.getPayload().getRecordsInputStream();
IOUtils.copy(stream, System.out);
}
}
输出为:
{"a":"dataA","b":"dataB"}
AWS Java SDK 2.x
AWS Java SDK 2.x 的代码更加巧妙。有关详细信息,请参阅 this ticket。
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private S3AsyncClient client;
@Test
void test() throws Exception {
final InputSerialization inputSerialization = InputSerialization
.builder()
.json(JSONInput.builder().type(JSONType.DOCUMENT).build())
.compressionType(CompressionType.NONE)
.build();
final OutputSerialization outputSerialization = OutputSerialization.builder()
.json(JSONOutput.builder().build())
.build();
final SelectObjectContentRequest select = SelectObjectContentRequest.builder()
.bucket("so67315601")
.key("data.json")
.expression("SELECT s.a, s.b FROM s3object s LIMIT 5")
.expressionType(ExpressionType.SQL)
.inputSerialization(inputSerialization)
.outputSerialization(outputSerialization)
.build();
final TestHandler handler = new TestHandler();
client.selectObjectContent(select, handler).get();
RecordsEvent response = (RecordsEvent) handler.receivedEvents.stream()
.filter(e -> e.sdkEventType() == SelectObjectContentEventStream.EventType.RECORDS)
.findFirst()
.orElse(null);
System.out.println(response.payload().asUtf8String());
}
private static class TestHandler implements SelectObjectContentResponseHandler {
private SelectObjectContentResponse response;
private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
private Throwable exception;
@Override
public void responseReceived(SelectObjectContentResponse response) {
this.response = response;
}
@Override
public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
publisher.subscribe(receivedEvents::add);
}
@Override
public void exceptionOccurred(Throwable throwable) {
exception = throwable;
}
@Override
public void complete() {
}
}
}
如您所见,可以通过编程方式制作 S3 select!
您可能想知道 @AWSClient
和 @ExtendWith( S3.class )
是什么?
这是一个用于在您的测试中注入 AWS 客户端的小型库,名为 aws-junit5
. It would greatly simplify your tests. I am the author. The usage 非常简单 — 在您的下一个项目中试试吧!
我在我的应用程序中使用 AWS Java SDK 与我的 S3 存储桶之一对话,该存储桶以 JSON 格式保存对象。
文档可能如下所示:
{
"a" : dataA,
"b" : dataB,
"c" : dataC,
"d" : dataD,
"e" : dataE
}
现在,对于某个文档,假设 document1
我需要获取对应于字段 a
和 b
的值,而不是获取整个文档。
这听起来不太可能,因为 S3 存储桶中可以包含任何类型的文档,而不仅仅是 JSONs。
这是可以实现的吗?
实际上是 doable。您可以像您描述的那样执行 selects,但仅限于特定格式:JSON、CSV、Parquet。
假设在 eu-central-1
的 so67315601
存储桶中有一个 data.json
文件:
{
"a": "dataA",
"b": "dataB",
"c": "dataC",
"d": "dataD",
"e": "dataE"
}
首先,了解如何通过 S3 控制台 select 字段。使用“对象操作”→“使用 S3 Select 查询”:
AWS Java SDK 1.x
这是使用 AWS Java SDK 1.x:
执行 select 的代码@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private AmazonS3 client;
@Test
void test() throws IOException {
// LINES: Each line in the input data contains a single JSON object
// DOCUMENT: A single JSON object can span multiple lines in the input
final JSONInput input = new JSONInput();
input.setType(JSONType.DOCUMENT);
// Configure input format and compression
final InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setJson(input);
inputSerialization.setCompressionType(CompressionType.NONE);
// Configure output format
final OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setJson(new JSONOutput());
// Build the request
final SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName("so67315601");
request.setKey("data.json");
request.setExpression("SELECT s.a, s.b FROM s3object s LIMIT 5");
request.setExpressionType(ExpressionType.SQL);
request.setInputSerialization(inputSerialization);
request.setOutputSerialization(outputSerialization);
// Run the query
final SelectObjectContentResult result = client.selectObjectContent(request);
// Parse the results
final InputStream stream = result.getPayload().getRecordsInputStream();
IOUtils.copy(stream, System.out);
}
}
输出为:
{"a":"dataA","b":"dataB"}
AWS Java SDK 2.x
AWS Java SDK 2.x 的代码更加巧妙。有关详细信息,请参阅 this ticket。
@ExtendWith(S3.class)
class SelectTest {
@AWSClient(endpoint = Endpoint.class)
private S3AsyncClient client;
@Test
void test() throws Exception {
final InputSerialization inputSerialization = InputSerialization
.builder()
.json(JSONInput.builder().type(JSONType.DOCUMENT).build())
.compressionType(CompressionType.NONE)
.build();
final OutputSerialization outputSerialization = OutputSerialization.builder()
.json(JSONOutput.builder().build())
.build();
final SelectObjectContentRequest select = SelectObjectContentRequest.builder()
.bucket("so67315601")
.key("data.json")
.expression("SELECT s.a, s.b FROM s3object s LIMIT 5")
.expressionType(ExpressionType.SQL)
.inputSerialization(inputSerialization)
.outputSerialization(outputSerialization)
.build();
final TestHandler handler = new TestHandler();
client.selectObjectContent(select, handler).get();
RecordsEvent response = (RecordsEvent) handler.receivedEvents.stream()
.filter(e -> e.sdkEventType() == SelectObjectContentEventStream.EventType.RECORDS)
.findFirst()
.orElse(null);
System.out.println(response.payload().asUtf8String());
}
private static class TestHandler implements SelectObjectContentResponseHandler {
private SelectObjectContentResponse response;
private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
private Throwable exception;
@Override
public void responseReceived(SelectObjectContentResponse response) {
this.response = response;
}
@Override
public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
publisher.subscribe(receivedEvents::add);
}
@Override
public void exceptionOccurred(Throwable throwable) {
exception = throwable;
}
@Override
public void complete() {
}
}
}
如您所见,可以通过编程方式制作 S3 select!
您可能想知道 @AWSClient
和 @ExtendWith( S3.class )
是什么?
这是一个用于在您的测试中注入 AWS 客户端的小型库,名为 aws-junit5
. It would greatly simplify your tests. I am the author. The usage 非常简单 — 在您的下一个项目中试试吧!