Hadoop CustomInputFormat NullPointerException

Hadoop CustomInputFormat NullPointerException

我为 Hadoop 开发了一个自定义的不可拆分 InputFormat,但是在调用记录 reader 时我一直得到 NullPointerException。奇怪的是,即使我更新代码、重建并使用 Hive 的 ADD JAR 命令添加 jar,我也不确定格式是否已更新,因为日志消息始终相同,即使我更改了它。以下是相关的代码片段和消息:

错误

2018-01-13 01:48:03,202 WARN  org.apache.hadoop.security.UserGroupInformation: [HiveServer2-Handler-Pool: Thread-70]: PriviledgedActionException as:user (auth:SIMPLE) cause:org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.NullPointerException
2018-01-13 01:48:03,202 WARN  org.apache.hive.service.cli.thrift.ThriftCLIService: [HiveServer2-Handler-Pool: Thread-70]: Error fetching results: 
org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.NullPointerException
    at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:463)
    at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:294)
    at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:769)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
    at org.apache.hive.service.cli.session.HiveSessionProxy.access[=10=]0(HiveSessionProxy.java:36)
    at org.apache.hive.service.cli.session.HiveSessionProxy.run(HiveSessionProxy.java:63)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
    at com.sun.proxy.$Proxy30.fetchResults(Unknown Source)
    at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:462)
    at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:694)
    at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
    at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
    at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
    at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:508)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:415)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2069)
    at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:458)
    ... 24 more
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:343)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:446)
    ... 28 more

自定义输入格式

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import java.io.DataInputStream;


import custom.util.Parser;

import org.apache.hadoop.mapred.RecordReader;


public class CustomInputFormat extends FileInputFormat<LongWritable, ObjectWritable> {
    public static final Log LOG = LogFactory.getLog(CustomInputFormat.class);

    @Override
    public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf config, Reporter reporter)
            throws IOException {
        FileSplit fileSplit = (FileSplit)split;
        Path path = fileSplit.getPath();
        long start = 0L;
        long length = fileSplit.getLength();
        return initCustomRecordReader(path, start, length, reporter, config);


    }
     public static CustomRecordReader initCustomRecordReader(Path path, long start, long length, Reporter reporter,Configuration conf) throws IOException {
        FileSystem fs = path.getFileSystem(conf);
        FSDataInputStream baseStream = fs.open(path);
        DataInputStream stream = baseStream;

        CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
        final CompressionCodec codec = compressionCodecs.getCodec(path);
        if (codec != null)
            stream = new DataInputStream(codec.createInputStream(stream));
        LOG.info("Reading FILE record: " + path.toUri().getPath());
        Parser parser = new Parser(stream);
        LOG.info("Initialized Parser");
        return new CustomRecordReader( baseStream, stream, reporter, start, length, parser);

     }


    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
}



} 

CustomRecordReader

import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptContext;

import custom.util.Parser;

import org.apache.hadoop.mapred.RecordReader;


public class CustomRecordReader implements RecordReader<LongWritable, ObjectWritable> {

    TaskAttemptContext context;
    Seekable baseStream;
    DataInputStream stream;
    Reporter reporter;
    Parser parser;

    private LongWritable key = new LongWritable();
    private ObjectWritable value = new ObjectWritable();
    long packetCount = 0;
    long start, end;




    /**
     * @param context
     * @param baseStream
     * @param stream
     * @param reporter
     * @param parser
     */
    public CustomRecordReader(Seekable baseStream, DataInputStream stream, Reporter reporter, long start, long end,
            Parser parser) {
        this.baseStream = baseStream;
        this.stream = stream;
        this.reporter = reporter;
        this.parser = parser;
        this.start = start;
        this.end = end;
    }
    @Override
    public boolean next(LongWritable key, ObjectWritable value) throws IOException {
        if (!this.parser.hasNext())
            return false;

        key.set(++packetCount);
        value.set(parser.next());

        reporter.setStatus("Read " + getPos() + " of " + end + " bytes");
        reporter.progress();

        return true;
    }
    @Override
    public LongWritable createKey() {
        return key;
    }
    @Override
    public ObjectWritable createValue() {
        return value;
    }

    @Override
    public long getPos() throws IOException {
        return baseStream.getPos();
    }
    @Override
    public void close() throws IOException {
        stream.close();

    }
    @Override
    public float getProgress() throws IOException {
        if (start == end)
            return 0;
        return Math.min(1.0f, (getPos() - start) / (float)(end - start));
    }




}

创建Table操作

CREATE EXTERNAL TABLE table1 (timestamp bigint,
protocol string,
src string,
dst int,
length int,
id bigint)
PARTITIONED BY (direction VARCHAR(64), minutes int)
ROW FORMAT SERDE 'custom.CustomDeserializer'
STORED AS INPUTFORMAT 'custom.CustomInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/user/user/input/raw';

SerDe

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;

import custom.util.Flow;

@SuppressWarnings("deprecation")
public class CustomDeserializer implements Deserializer {

    ObjectInspector inspector;
    ArrayList<Object> row;
    int numColumns;
    List<String> columnNames;

    public void initialize(Configuration conf, Properties tbl) throws SerDeException {
        String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
        columnNames = Arrays.asList(columnNameProperty.split(","));
        numColumns = columnNames.size();

        String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
        List<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);

        // Ensure we have the same number of column nameConstantss and types
        assert numColumns == columnTypes.size();

        List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>(numColumns);
        row = new ArrayList<Object>(numColumns);
        for (int c = 0; c < numColumns; c++) {
            ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(columnTypes.get(c));
            inspectors.add(oi);
            row.add(null);
        }
        inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);

    }

    public Object deserialize(Writable w) throws SerDeException {
        ObjectWritable obj = (ObjectWritable)w;
        Flow flow = (Flow)obj.get();

        for (int i = 0; i < numColumns; i++) {
            String columName = columnNames.get(i);
            Object value = flow.get(columName);
            row.set(i, value);
        }
        return row;
    }

    public ObjectInspector getObjectInspector() throws SerDeException {
        return inspector;
    }

    public SerDeStats getSerDeStats() {
        return new SerDeStats();
    }

}

任何人都可以告诉我我做错了什么,或者至少我是否可以通过任何方式更新 jar 以便我至少可以通过日志获取信息来帮助我?谢谢。

我发现哪里出了问题。事实证明,我认为 Hive 缓存了一些用于构建 table 的旧 jar。在调试具有相同包的不同版本的 jar 之前,需要先重新启动 Hive。