从 AWS s3 存储桶中读取镶木地板数据
Read parquet data from AWS s3 bucket
我需要从 aws s3 读取镶木地板数据。如果我为此使用 aws sdk,我可以获得这样的输入流:
S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, bucketKey));
InputStream inputStream = object.getObjectContent();
但是 apache parquet reader 只使用这样的本地文件:
ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), new Path(file.getAbsolutePath()))
.withConf(conf)
.build();
reader.read()
所以我不知道如何解析 parquet 文件的输入流。
例如,对于 csv 文件,有使用输入流的 CSVParser。
我知道使用 spark 来实现这个目标的解决方案。
像这样:
SparkSession spark = SparkSession
.builder()
.getOrCreate();
Dataset<Row> ds = spark.read().parquet("s3a://bucketName/file.parquet");
但是我不能用spark
谁能告诉我从 s3 读取镶木地板数据的任何解决方案?
String SCHEMA_TEMPLATE = "{" +
"\"type\": \"record\",\n" +
" \"name\": \"schema\",\n" +
" \"fields\": [\n" +
" {\"name\": \"timeStamp\", \"type\": \"string\"},\n" +
" {\"name\": \"temperature\", \"type\": \"double\"},\n" +
" {\"name\": \"pressure\", \"type\": \"double\"}\n" +
" ]" +
"}";
String PATH_SCHEMA = "s3a";
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName);
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE);
Configuration configuration = new Configuration();
AvroReadSupport.setRequestedProjection(configuration, schema);
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build();
GenericRecord genericRecord = parquetReader.read();
while(genericRecord != null) {
Map<String, String> valuesMap = new HashMap<>();
genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString()));
genericRecord = parquetReader.read();
}
Gradle 依赖项
compile 'com.amazonaws:aws-java-sdk:1.11.213'
compile 'org.apache.parquet:parquet-avro:1.9.0'
compile 'org.apache.parquet:parquet-hadoop:1.9.0'
compile 'org.apache.hadoop:hadoop-common:2.8.1'
compile 'org.apache.hadoop:hadoop-aws:2.8.1'
compile 'org.apache.hadoop:hadoop-client:2.8.1'
我需要从 aws s3 读取镶木地板数据。如果我为此使用 aws sdk,我可以获得这样的输入流:
S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, bucketKey));
InputStream inputStream = object.getObjectContent();
但是 apache parquet reader 只使用这样的本地文件:
ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), new Path(file.getAbsolutePath()))
.withConf(conf)
.build();
reader.read()
所以我不知道如何解析 parquet 文件的输入流。 例如,对于 csv 文件,有使用输入流的 CSVParser。
我知道使用 spark 来实现这个目标的解决方案。 像这样:
SparkSession spark = SparkSession
.builder()
.getOrCreate();
Dataset<Row> ds = spark.read().parquet("s3a://bucketName/file.parquet");
但是我不能用spark
谁能告诉我从 s3 读取镶木地板数据的任何解决方案?
String SCHEMA_TEMPLATE = "{" +
"\"type\": \"record\",\n" +
" \"name\": \"schema\",\n" +
" \"fields\": [\n" +
" {\"name\": \"timeStamp\", \"type\": \"string\"},\n" +
" {\"name\": \"temperature\", \"type\": \"double\"},\n" +
" {\"name\": \"pressure\", \"type\": \"double\"}\n" +
" ]" +
"}";
String PATH_SCHEMA = "s3a";
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName);
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE);
Configuration configuration = new Configuration();
AvroReadSupport.setRequestedProjection(configuration, schema);
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build();
GenericRecord genericRecord = parquetReader.read();
while(genericRecord != null) {
Map<String, String> valuesMap = new HashMap<>();
genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString()));
genericRecord = parquetReader.read();
}
Gradle 依赖项
compile 'com.amazonaws:aws-java-sdk:1.11.213'
compile 'org.apache.parquet:parquet-avro:1.9.0'
compile 'org.apache.parquet:parquet-hadoop:1.9.0'
compile 'org.apache.hadoop:hadoop-common:2.8.1'
compile 'org.apache.hadoop:hadoop-aws:2.8.1'
compile 'org.apache.hadoop:hadoop-client:2.8.1'