在 Spark 中读取两个不同的 ORC 模式文件

Read Two Different ORC Schema File In Spark

我们正在以 ORC 格式将处理输出保存到 HDFC。现在我们有多个模式的文件,我想读取所有文件并创建数据集。

一个选择是我将编写一些作业并将所有这些文件转换为单一模式,我想避免 bcz 数据太大,这是一次性解决方案,如果有一天模式将再次更改我必须重新生成所有数据

我的问题是有什么机制可以让我读取这些文件 假设我将在阅读本文时提供更高的模式,并且如果任何 orc 文件中不存在某些字段,reader 将自动分配空值。

我有一个 . The ORC schema merge is an open feature request,我们也像您评论中的其他用户一样切换到镶木地板。

仍然可以(不推荐,因为它很慢)逐个加载文件并将其保存到 .parquet,然后使用自动架构合并加载所有 .parquet 文件并将大内存保存到 .orc

AIBOTNET--

使用它可以将不同模式的 ORC 文件组合成单个 ORC 文件。我的模式是:

  1. 文件 1:first:int,第二个:整数
  2. 文件 2:first:int,第四个:字符串
  3. 文件 3:first:int,第三个:地图

如果您也需要文件生成器,我可以 post。

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.google.common.collect.ImmutableList;

public class ReaderSample {

  public static void main(String[] args) throws IOException {

    Path testFilePath1 = new Path("file1.orc");
    Path testFilePath2 = new Path("file2.orc");
    Path testFilePath3 = new Path("file3.orc");
    Path mergePath = new Path("merge.orc");
    Configuration conf = new Configuration();

    FileSystem fileSystem = mergePath.getFileSystem(conf);
    fileSystem.delete(mergePath, false);

    List<Path> fileList = ImmutableList.of(testFilePath1, testFilePath2, testFilePath3);

    TypeDescription schema = mergeSchema(conf, fileList);
    System.out.println(schema);
    try (Writer writer = OrcFile.createWriter(mergePath, OrcFile.writerOptions(conf)
                                                                .setSchema(schema))) {
      VectorizedRowBatch writerBatch = schema.createRowBatch();
      for (Path file : fileList) {
        merge(file, conf, writer, writerBatch, schema);
      }
    }
  }

  private static TypeDescription mergeSchema(Configuration conf, List<Path> fileList) throws IOException {
    List<TypeDescription> schemaList = new ArrayList<>();
    for (Path path : fileList) {
      Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
      schemaList.add(reader.getSchema());
    }

    TypeDescription masterSchema = new TypeDescription(TypeDescription.Category.STRUCT);
    for (TypeDescription td : schemaList) {
      List<String> fieldNames = td.getFieldNames();
      for (int f = 0; f < fieldNames.size(); f++) {
        String field = fieldNames.get(f);
        List<String> mergeFields = masterSchema.getFieldNames();
        int indexOf = mergeFields.indexOf(field);
        if (indexOf < 0) {
          // add
          masterSchema.addField(field, td.getChildren()
                                         .get(f));
        } else {
          // check type at some point...
        }
      }
    }
    return masterSchema;

  }

  private static void merge(Path testFilePath1, Configuration conf, Writer writer, VectorizedRowBatch writerBatch,
      TypeDescription masterSchema) throws IOException {
    Reader reader = OrcFile.createReader(testFilePath1, OrcFile.readerOptions(conf));

    int[] mapping = createMapping(masterSchema, reader.getSchema());

    try (RecordReader rows = reader.rows()) {
      VectorizedRowBatch readerBatch = reader.getSchema()
                                             .createRowBatch();
      while (rows.nextBatch(readerBatch)) {
        for (int r = 0; r < readerBatch.size; ++r) {
          for (int c = 0; c < mapping.length; c++) {
            int index = mapping[c];
            if (index == -1) {
              writerBatch.cols[c].isNull[writerBatch.size] = true;
              writerBatch.cols[c].noNulls = false;
            } else {
              writerBatch.cols[c] = readerBatch.cols[index];
            }
          }
          writerBatch.size++;
        }
        writer.addRowBatch(writerBatch);
        writerBatch.reset();
      }
    }
  }

  private static int[] createMapping(TypeDescription masterSchema, TypeDescription currentSchema) {
    List<String> masterFieldNames = masterSchema.getFieldNames();
    List<String> fieldNames = currentSchema.getFieldNames();
    int[] mappings = new int[masterFieldNames.size()];
    for (int f = 0; f < masterFieldNames.size(); f++) {
      String name = masterFieldNames.get(f);
      int indexOf = fieldNames.indexOf(name);
      if (indexOf < 0) {
        mappings[f] = -1;
      } else {
        mappings[f] = indexOf;
      }
    }
    return mappings;
  }

}