Java - 空兽人文件
Java - empty orc file
我正在尝试使用 orc-core 编写 orc 文件,以便稍后由 hive 读取。
正在写入的文件具有正确的行数,但列中没有内容。我可以看到两者都试图在配置单元中使用 select 查询读取文件,并且都使用 hive --orcfiledump -d
.
我尝试了指南中提供的示例,它写入了两个 long type 列,生成的文件被 hive 正确读取。我怀疑这与我正在写 string columns 的事实有关,但我仍然无法使其工作。
这就是我目前编写文件的方式:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).vector[row] = fields[j].getBytes();
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();
非常感谢任何建议或有用的参考。
谢谢大家
看来我需要对 API 文档进行深入审查。我错过了什么:
- 在初始化阶段对每个
BytesColumnVector
调用 initBuffer()
- 分配调用
setVal()
的列的值。这也可以使用 setRef()
来完成。据记载是两者中最快的,但我不知道是否适合我的具体情况,我会试试。
这是更新后的代码:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
bcv.initBuffer();
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).setVal(row, fields[j].getBytes());
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();
我正在尝试使用 orc-core 编写 orc 文件,以便稍后由 hive 读取。
正在写入的文件具有正确的行数,但列中没有内容。我可以看到两者都试图在配置单元中使用 select 查询读取文件,并且都使用 hive --orcfiledump -d
.
我尝试了指南中提供的示例,它写入了两个 long type 列,生成的文件被 hive 正确读取。我怀疑这与我正在写 string columns 的事实有关,但我仍然无法使其工作。
这就是我目前编写文件的方式:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).vector[row] = fields[j].getBytes();
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();
非常感谢任何建议或有用的参考。
谢谢大家
看来我需要对 API 文档进行深入审查。我错过了什么:
- 在初始化阶段对每个
BytesColumnVector
调用initBuffer()
- 分配调用
setVal()
的列的值。这也可以使用setRef()
来完成。据记载是两者中最快的,但我不知道是否适合我的具体情况,我会试试。
这是更新后的代码:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
bcv.initBuffer();
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).setVal(row, fields[j].getBytes());
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();