在 Spark 中读取两个不同的 ORC 模式文件
Read Two Different ORC Schema File In Spark
我们正在以 ORC 格式将处理输出保存到 HDFC。现在我们有多个模式的文件,我想读取所有文件并创建数据集。
一个选择是我将编写一些作业并将所有这些文件转换为单一模式,我想避免 bcz 数据太大,这是一次性解决方案,如果有一天模式将再次更改我必须重新生成所有数据
我的问题是有什么机制可以让我读取这些文件
假设我将在阅读本文时提供更高的模式,并且如果任何 orc 文件中不存在某些字段,reader 将自动分配空值。
我有一个 . The ORC schema merge is an open feature request,我们也像您评论中的其他用户一样切换到镶木地板。
仍然可以(不推荐,因为它很慢)逐个加载文件并将其保存到 .parquet,然后使用自动架构合并加载所有 .parquet 文件并将大内存保存到 .orc
AIBOTNET--
使用它可以将不同模式的 ORC 文件组合成单个 ORC 文件。我的模式是:
- 文件 1:first:int,第二个:整数
- 文件 2:first:int,第四个:字符串
- 文件 3:first:int,第三个:地图
如果您也需要文件生成器,我可以 post。
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.google.common.collect.ImmutableList;
public class ReaderSample {
public static void main(String[] args) throws IOException {
Path testFilePath1 = new Path("file1.orc");
Path testFilePath2 = new Path("file2.orc");
Path testFilePath3 = new Path("file3.orc");
Path mergePath = new Path("merge.orc");
Configuration conf = new Configuration();
FileSystem fileSystem = mergePath.getFileSystem(conf);
fileSystem.delete(mergePath, false);
List<Path> fileList = ImmutableList.of(testFilePath1, testFilePath2, testFilePath3);
TypeDescription schema = mergeSchema(conf, fileList);
System.out.println(schema);
try (Writer writer = OrcFile.createWriter(mergePath, OrcFile.writerOptions(conf)
.setSchema(schema))) {
VectorizedRowBatch writerBatch = schema.createRowBatch();
for (Path file : fileList) {
merge(file, conf, writer, writerBatch, schema);
}
}
}
private static TypeDescription mergeSchema(Configuration conf, List<Path> fileList) throws IOException {
List<TypeDescription> schemaList = new ArrayList<>();
for (Path path : fileList) {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
schemaList.add(reader.getSchema());
}
TypeDescription masterSchema = new TypeDescription(TypeDescription.Category.STRUCT);
for (TypeDescription td : schemaList) {
List<String> fieldNames = td.getFieldNames();
for (int f = 0; f < fieldNames.size(); f++) {
String field = fieldNames.get(f);
List<String> mergeFields = masterSchema.getFieldNames();
int indexOf = mergeFields.indexOf(field);
if (indexOf < 0) {
// add
masterSchema.addField(field, td.getChildren()
.get(f));
} else {
// check type at some point...
}
}
}
return masterSchema;
}
private static void merge(Path testFilePath1, Configuration conf, Writer writer, VectorizedRowBatch writerBatch,
TypeDescription masterSchema) throws IOException {
Reader reader = OrcFile.createReader(testFilePath1, OrcFile.readerOptions(conf));
int[] mapping = createMapping(masterSchema, reader.getSchema());
try (RecordReader rows = reader.rows()) {
VectorizedRowBatch readerBatch = reader.getSchema()
.createRowBatch();
while (rows.nextBatch(readerBatch)) {
for (int r = 0; r < readerBatch.size; ++r) {
for (int c = 0; c < mapping.length; c++) {
int index = mapping[c];
if (index == -1) {
writerBatch.cols[c].isNull[writerBatch.size] = true;
writerBatch.cols[c].noNulls = false;
} else {
writerBatch.cols[c] = readerBatch.cols[index];
}
}
writerBatch.size++;
}
writer.addRowBatch(writerBatch);
writerBatch.reset();
}
}
}
private static int[] createMapping(TypeDescription masterSchema, TypeDescription currentSchema) {
List<String> masterFieldNames = masterSchema.getFieldNames();
List<String> fieldNames = currentSchema.getFieldNames();
int[] mappings = new int[masterFieldNames.size()];
for (int f = 0; f < masterFieldNames.size(); f++) {
String name = masterFieldNames.get(f);
int indexOf = fieldNames.indexOf(name);
if (indexOf < 0) {
mappings[f] = -1;
} else {
mappings[f] = indexOf;
}
}
return mappings;
}
}
我们正在以 ORC 格式将处理输出保存到 HDFC。现在我们有多个模式的文件,我想读取所有文件并创建数据集。
一个选择是我将编写一些作业并将所有这些文件转换为单一模式,我想避免 bcz 数据太大,这是一次性解决方案,如果有一天模式将再次更改我必须重新生成所有数据
我的问题是有什么机制可以让我读取这些文件 假设我将在阅读本文时提供更高的模式,并且如果任何 orc 文件中不存在某些字段,reader 将自动分配空值。
我有一个
仍然可以(不推荐,因为它很慢)逐个加载文件并将其保存到 .parquet,然后使用自动架构合并加载所有 .parquet 文件并将大内存保存到 .orc
AIBOTNET--
使用它可以将不同模式的 ORC 文件组合成单个 ORC 文件。我的模式是:
- 文件 1:first:int,第二个:整数
- 文件 2:first:int,第四个:字符串
- 文件 3:first:int,第三个:地图
如果您也需要文件生成器,我可以 post。
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.google.common.collect.ImmutableList;
public class ReaderSample {
public static void main(String[] args) throws IOException {
Path testFilePath1 = new Path("file1.orc");
Path testFilePath2 = new Path("file2.orc");
Path testFilePath3 = new Path("file3.orc");
Path mergePath = new Path("merge.orc");
Configuration conf = new Configuration();
FileSystem fileSystem = mergePath.getFileSystem(conf);
fileSystem.delete(mergePath, false);
List<Path> fileList = ImmutableList.of(testFilePath1, testFilePath2, testFilePath3);
TypeDescription schema = mergeSchema(conf, fileList);
System.out.println(schema);
try (Writer writer = OrcFile.createWriter(mergePath, OrcFile.writerOptions(conf)
.setSchema(schema))) {
VectorizedRowBatch writerBatch = schema.createRowBatch();
for (Path file : fileList) {
merge(file, conf, writer, writerBatch, schema);
}
}
}
private static TypeDescription mergeSchema(Configuration conf, List<Path> fileList) throws IOException {
List<TypeDescription> schemaList = new ArrayList<>();
for (Path path : fileList) {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
schemaList.add(reader.getSchema());
}
TypeDescription masterSchema = new TypeDescription(TypeDescription.Category.STRUCT);
for (TypeDescription td : schemaList) {
List<String> fieldNames = td.getFieldNames();
for (int f = 0; f < fieldNames.size(); f++) {
String field = fieldNames.get(f);
List<String> mergeFields = masterSchema.getFieldNames();
int indexOf = mergeFields.indexOf(field);
if (indexOf < 0) {
// add
masterSchema.addField(field, td.getChildren()
.get(f));
} else {
// check type at some point...
}
}
}
return masterSchema;
}
private static void merge(Path testFilePath1, Configuration conf, Writer writer, VectorizedRowBatch writerBatch,
TypeDescription masterSchema) throws IOException {
Reader reader = OrcFile.createReader(testFilePath1, OrcFile.readerOptions(conf));
int[] mapping = createMapping(masterSchema, reader.getSchema());
try (RecordReader rows = reader.rows()) {
VectorizedRowBatch readerBatch = reader.getSchema()
.createRowBatch();
while (rows.nextBatch(readerBatch)) {
for (int r = 0; r < readerBatch.size; ++r) {
for (int c = 0; c < mapping.length; c++) {
int index = mapping[c];
if (index == -1) {
writerBatch.cols[c].isNull[writerBatch.size] = true;
writerBatch.cols[c].noNulls = false;
} else {
writerBatch.cols[c] = readerBatch.cols[index];
}
}
writerBatch.size++;
}
writer.addRowBatch(writerBatch);
writerBatch.reset();
}
}
}
private static int[] createMapping(TypeDescription masterSchema, TypeDescription currentSchema) {
List<String> masterFieldNames = masterSchema.getFieldNames();
List<String> fieldNames = currentSchema.getFieldNames();
int[] mappings = new int[masterFieldNames.size()];
for (int f = 0; f < masterFieldNames.size(); f++) {
String name = masterFieldNames.get(f);
int indexOf = fieldNames.indexOf(name);
if (indexOf < 0) {
mappings[f] = -1;
} else {
mappings[f] = indexOf;
}
}
return mappings;
}
}