如何将小型 ORC 文件合并或合并为大型 ORC 文件?
How do I Combine or Merge Small ORC files into Larger ORC file?
SO 和网络上的大多数 questions/answers 讨论使用 Hive 将一堆小的 ORC 文件组合成一个更大的文件,但是,我的 ORC 文件是按天分隔的日志文件,我需要保留他们分开。我只想每天 "roll-up" ORC 文件(HDFS 中的目录)。
我最有可能需要在 Java 中编写解决方案并且遇到了 OrcFileMergeOperator 这可能是我需要使用的,但现在下结论还为时过早。
解决此问题的最佳方法是什么?
你不需要重新发明轮子。
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE
可用于将较小的 ORC 文件合并为较大的文件,因为 Hive 0.14.0.
合并发生在条带级别,这避免了对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部 table(分区是目录),然后将它们全部合并,指定 PARTITION (day_column)
作为分区规范。
这里有很好的答案,但是 none 允许我 运行 一个 cron 作业,这样我就可以进行每日汇总。我们每天都有 journald 日志文件写入 HDFS,我不想 运行 我每天进来时都在 Hive 中查询。
我最终做的事情对我来说似乎更直接。我编写了一个 Java 程序,它使用 ORC 库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的 Writer,它是 "combined" 文件(以“.”开头,因此它对 Hive 隐藏,否则 Hive 将失败)。然后程序打开列表中的每个文件并读取内容并写出到组合文件中。读取所有文件后,它会删除这些文件。我还一次向 运行 一个目录添加了功能,以备不时之需。
注意:您将需要一个架构文件。 Journald 日志可以在 json "journalctl -o json" 中输出,然后您可以使用 Apache ORC 工具生成一个模式文件,或者您可以手动生成一个。 ORC 的自动生成很好,但手动总是更好。
注意:要按原样使用此代码,您需要一个有效的密钥表并在类路径中添加 -Dkeytab=。
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.cloudera.org.joda.time.LocalDate;
public class OrcFileRollUp {
private final static String SCHEMA = "journald.schema";
private final static String UTF_8 = "UTF-8";
private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
private static final String keytabLocation = System.getProperty("keytab");
private static final String kerberosUser = "<userName>";
private static Writer writer;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
InetAddress myHost = InetAddress.getLocalHost();
String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);
int currentDay = LocalDate.now().getDayOfMonth();
int currentMonth = LocalDate.now().getMonthOfYear();
int currentYear = LocalDate.now().getYear();
Path path = new Path(HDFS_BASE_LOGS_DIR);
FileSystem fileSystem = path.getFileSystem(conf);
System.out.println("The URI is: " + fileSystem.getUri());
//Get Hosts:
List<String> allHostsPath = getHosts(path, fileSystem);
TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
.replaceAll("\n", ""));
//Open each file for reading and write contents
for(int i = 0; i < allHostsPath.size(); i++) {
String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working
//Create list of files from directory and today's date OR pass a directory in via the command line in format
//hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
String directory = "";
Path outFilePath;
Path argsPath;
List<String> orcFiles;
if(args.length == 0) {
directory = currentYear + "/" + currentMonth + "/" + currentDay;
outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
try {
orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
} catch (Exception e) {
continue;
}
} else {
outFilePath = new Path(args[0] + "/" + outFile);
argsPath = new Path(args[0]);
try {
orcFiles = getAllFilePath(argsPath, fileSystem);
} catch (Exception e) {
continue;
}
}
//Create List of files in the directory
FileSystem fs = outFilePath.getFileSystem(conf);
//Writer MUST be below ^^ or the combination file will be deleted as well.
if(fs.exists(outFilePath)) {
System.out.println(outFilePath + " exists, delete before continuing.");
} else {
writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
.setSchema(schema));
}
for(int j = 0; j < orcFiles.size(); j++ ) {
Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
RecordReader rows = reader.rows();
while (rows.nextBatch(batch)) {
if (batch != null) {
writer.addRowBatch(batch);
}
}
rows.close();
fs.delete(new Path(orcFiles.get(j)), false);
}
//Close File
writer.close();
//Remove leading "." from ORC file to make visible to Hive
outFile = fileSystem.getFileStatus(outFilePath)
.getPath()
.getName();
if (outFile.startsWith(".")) {
outFile = outFile.substring(1);
int lastIndexOf = outFile.lastIndexOf(".working");
outFile = outFile.substring(0, lastIndexOf);
}
Path parent = outFilePath.getParent();
fileSystem.rename(outFilePath, new Path(parent, outFile));
if(args.length != 0)
break;
}
}
private static String getSchema(String resource) throws IOException {
try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
return IOUtils.toString(input, UTF_8);
}
}
public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> hostsList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
hostsList.add(fileStat.getPath().toString());
}
return hostsList;
}
private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> fileList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
if (fileStat.isDirectory()) {
fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
} else {
fileList.add(fileStat.getPath()
.toString());
}
}
for(int i = 0; i< fileList.size(); i++) {
if(!fileList.get(i).endsWith(".orc"))
fileList.remove(i);
}
return fileList;
}
}
这是 Python 中的一个小脚本,使用 PyORC 将小的 ORC 文件连接在一起。我知道它没有直接回答你的问题,因为它不在 Java 中,但我发现它比当前的解决方案或使用 Hive 更简单。
import pyorc
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
args = parser.parse_args()
schema = str(pyorc.Reader(args.files[0]).schema)
with pyorc.Writer(args.output, schema) as writer:
for i, f in enumerate(args.files):
reader = pyorc.Reader(f)
if str(reader.schema) != schema:
raise RuntimeError(
"Inconsistent ORC schemas.\n"
"\tFirst file schema: {}\n"
"\tFile #{} schema: {}"
.format(schema, i, str(reader.schema))
)
for line in reader:
writer.write(line)
if __name__ == '__main__':
main()
SO 和网络上的大多数 questions/answers 讨论使用 Hive 将一堆小的 ORC 文件组合成一个更大的文件,但是,我的 ORC 文件是按天分隔的日志文件,我需要保留他们分开。我只想每天 "roll-up" ORC 文件(HDFS 中的目录)。
我最有可能需要在 Java 中编写解决方案并且遇到了 OrcFileMergeOperator 这可能是我需要使用的,但现在下结论还为时过早。
解决此问题的最佳方法是什么?
你不需要重新发明轮子。
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE
可用于将较小的 ORC 文件合并为较大的文件,因为 Hive 0.14.0.
合并发生在条带级别,这避免了对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部 table(分区是目录),然后将它们全部合并,指定 PARTITION (day_column)
作为分区规范。
这里有很好的答案,但是 none 允许我 运行 一个 cron 作业,这样我就可以进行每日汇总。我们每天都有 journald 日志文件写入 HDFS,我不想 运行 我每天进来时都在 Hive 中查询。
我最终做的事情对我来说似乎更直接。我编写了一个 Java 程序,它使用 ORC 库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的 Writer,它是 "combined" 文件(以“.”开头,因此它对 Hive 隐藏,否则 Hive 将失败)。然后程序打开列表中的每个文件并读取内容并写出到组合文件中。读取所有文件后,它会删除这些文件。我还一次向 运行 一个目录添加了功能,以备不时之需。
注意:您将需要一个架构文件。 Journald 日志可以在 json "journalctl -o json" 中输出,然后您可以使用 Apache ORC 工具生成一个模式文件,或者您可以手动生成一个。 ORC 的自动生成很好,但手动总是更好。
注意:要按原样使用此代码,您需要一个有效的密钥表并在类路径中添加 -Dkeytab=。
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.cloudera.org.joda.time.LocalDate;
public class OrcFileRollUp {
private final static String SCHEMA = "journald.schema";
private final static String UTF_8 = "UTF-8";
private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
private static final String keytabLocation = System.getProperty("keytab");
private static final String kerberosUser = "<userName>";
private static Writer writer;
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
InetAddress myHost = InetAddress.getLocalHost();
String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);
int currentDay = LocalDate.now().getDayOfMonth();
int currentMonth = LocalDate.now().getMonthOfYear();
int currentYear = LocalDate.now().getYear();
Path path = new Path(HDFS_BASE_LOGS_DIR);
FileSystem fileSystem = path.getFileSystem(conf);
System.out.println("The URI is: " + fileSystem.getUri());
//Get Hosts:
List<String> allHostsPath = getHosts(path, fileSystem);
TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
.replaceAll("\n", ""));
//Open each file for reading and write contents
for(int i = 0; i < allHostsPath.size(); i++) {
String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working
//Create list of files from directory and today's date OR pass a directory in via the command line in format
//hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
String directory = "";
Path outFilePath;
Path argsPath;
List<String> orcFiles;
if(args.length == 0) {
directory = currentYear + "/" + currentMonth + "/" + currentDay;
outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
try {
orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
} catch (Exception e) {
continue;
}
} else {
outFilePath = new Path(args[0] + "/" + outFile);
argsPath = new Path(args[0]);
try {
orcFiles = getAllFilePath(argsPath, fileSystem);
} catch (Exception e) {
continue;
}
}
//Create List of files in the directory
FileSystem fs = outFilePath.getFileSystem(conf);
//Writer MUST be below ^^ or the combination file will be deleted as well.
if(fs.exists(outFilePath)) {
System.out.println(outFilePath + " exists, delete before continuing.");
} else {
writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
.setSchema(schema));
}
for(int j = 0; j < orcFiles.size(); j++ ) {
Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
RecordReader rows = reader.rows();
while (rows.nextBatch(batch)) {
if (batch != null) {
writer.addRowBatch(batch);
}
}
rows.close();
fs.delete(new Path(orcFiles.get(j)), false);
}
//Close File
writer.close();
//Remove leading "." from ORC file to make visible to Hive
outFile = fileSystem.getFileStatus(outFilePath)
.getPath()
.getName();
if (outFile.startsWith(".")) {
outFile = outFile.substring(1);
int lastIndexOf = outFile.lastIndexOf(".working");
outFile = outFile.substring(0, lastIndexOf);
}
Path parent = outFilePath.getParent();
fileSystem.rename(outFilePath, new Path(parent, outFile));
if(args.length != 0)
break;
}
}
private static String getSchema(String resource) throws IOException {
try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
return IOUtils.toString(input, UTF_8);
}
}
public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> hostsList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
hostsList.add(fileStat.getPath().toString());
}
return hostsList;
}
private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> fileList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
if (fileStat.isDirectory()) {
fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
} else {
fileList.add(fileStat.getPath()
.toString());
}
}
for(int i = 0; i< fileList.size(); i++) {
if(!fileList.get(i).endsWith(".orc"))
fileList.remove(i);
}
return fileList;
}
}
这是 Python 中的一个小脚本,使用 PyORC 将小的 ORC 文件连接在一起。我知道它没有直接回答你的问题,因为它不在 Java 中,但我发现它比当前的解决方案或使用 Hive 更简单。
import pyorc
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
args = parser.parse_args()
schema = str(pyorc.Reader(args.files[0]).schema)
with pyorc.Writer(args.output, schema) as writer:
for i, f in enumerate(args.files):
reader = pyorc.Reader(f)
if str(reader.schema) != schema:
raise RuntimeError(
"Inconsistent ORC schemas.\n"
"\tFirst file schema: {}\n"
"\tFile #{} schema: {}"
.format(schema, i, str(reader.schema))
)
for line in reader:
writer.write(line)
if __name__ == '__main__':
main()