hadoop custominputformat nullpointerexception

ecbunoof  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(425)

我开发了一个自定义的不可拆分表 InputFormat 对于hadoop,但我一直得到一个 NullPointerException 当调用记录读取器时。奇怪的是,即使在我更新代码、重建并添加带有配置单元的jar时 ADD JAR 命令,我不确定是否更新了get格式,因为日志消息总是相同的,即使我更改了它。以下是相关的代码片段和消息:

错误

2018-01-13 01:48:03,202 WARN  org.apache.hadoop.security.UserGroupInformation: [HiveServer2-Handler-Pool: Thread-70]: PriviledgedActionException as:user (auth:SIMPLE) cause:org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.NullPointerException
2018-01-13 01:48:03,202 WARN  org.apache.hive.service.cli.thrift.ThriftCLIService: [HiveServer2-Handler-Pool: Thread-70]: Error fetching results: 
org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.NullPointerException
    at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:463)
    at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:294)
    at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:769)
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
    at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
    at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
    at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
    at com.sun.proxy.$Proxy30.fetchResults(Unknown Source)
    at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:462)
    at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:694)
    at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
    at org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
    at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
    at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:508)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:415)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2069)
    at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:458)
    ... 24 more
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:343)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:446)
    ... 28 more

自定义输入格式

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import java.io.DataInputStream;

import custom.util.Parser;

import org.apache.hadoop.mapred.RecordReader;

public class CustomInputFormat extends FileInputFormat<LongWritable, ObjectWritable> {
    public static final Log LOG = LogFactory.getLog(CustomInputFormat.class);

    @Override
    public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf config, Reporter reporter)
            throws IOException {
        FileSplit fileSplit = (FileSplit)split;
        Path path = fileSplit.getPath();
        long start = 0L;
        long length = fileSplit.getLength();
        return initCustomRecordReader(path, start, length, reporter, config);

    }
     public static CustomRecordReader initCustomRecordReader(Path path, long start, long length, Reporter reporter,Configuration conf) throws IOException {
        FileSystem fs = path.getFileSystem(conf);
        FSDataInputStream baseStream = fs.open(path);
        DataInputStream stream = baseStream;

        CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
        final CompressionCodec codec = compressionCodecs.getCodec(path);
        if (codec != null)
            stream = new DataInputStream(codec.createInputStream(stream));
        LOG.info("Reading FILE record: " + path.toUri().getPath());
        Parser parser = new Parser(stream);
        LOG.info("Initialized Parser");
        return new CustomRecordReader( baseStream, stream, reporter, start, length, parser);

     }

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
}

}

自定义RecordReader

import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptContext;

import custom.util.Parser;

import org.apache.hadoop.mapred.RecordReader;

public class CustomRecordReader implements RecordReader<LongWritable, ObjectWritable> {

    TaskAttemptContext context;
    Seekable baseStream;
    DataInputStream stream;
    Reporter reporter;
    Parser parser;

    private LongWritable key = new LongWritable();
    private ObjectWritable value = new ObjectWritable();
    long packetCount = 0;
    long start, end;

    /**
     * @param context
     * @param baseStream
     * @param stream
     * @param reporter
     * @param parser
     */
    public CustomRecordReader(Seekable baseStream, DataInputStream stream, Reporter reporter, long start, long end,
            Parser parser) {
        this.baseStream = baseStream;
        this.stream = stream;
        this.reporter = reporter;
        this.parser = parser;
        this.start = start;
        this.end = end;
    }
    @Override
    public boolean next(LongWritable key, ObjectWritable value) throws IOException {
        if (!this.parser.hasNext())
            return false;

        key.set(++packetCount);
        value.set(parser.next());

        reporter.setStatus("Read " + getPos() + " of " + end + " bytes");
        reporter.progress();

        return true;
    }
    @Override
    public LongWritable createKey() {
        return key;
    }
    @Override
    public ObjectWritable createValue() {
        return value;
    }

    @Override
    public long getPos() throws IOException {
        return baseStream.getPos();
    }
    @Override
    public void close() throws IOException {
        stream.close();

    }
    @Override
    public float getProgress() throws IOException {
        if (start == end)
            return 0;
        return Math.min(1.0f, (getPos() - start) / (float)(end - start));
    }

}

创建表操作

CREATE EXTERNAL TABLE table1 (timestamp bigint,
protocol string,
src string,
dst int,
length int,
id bigint)
PARTITIONED BY (direction VARCHAR(64), minutes int)
ROW FORMAT SERDE 'custom.CustomDeserializer'
STORED AS INPUTFORMAT 'custom.CustomInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/user/user/input/raw';

塞德

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;

import custom.util.Flow;

@SuppressWarnings("deprecation")
public class CustomDeserializer implements Deserializer {

    ObjectInspector inspector;
    ArrayList<Object> row;
    int numColumns;
    List<String> columnNames;

    public void initialize(Configuration conf, Properties tbl) throws SerDeException {
        String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
        columnNames = Arrays.asList(columnNameProperty.split(","));
        numColumns = columnNames.size();

        String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
        List<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);

        // Ensure we have the same number of column nameConstantss and types
        assert numColumns == columnTypes.size();

        List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>(numColumns);
        row = new ArrayList<Object>(numColumns);
        for (int c = 0; c < numColumns; c++) {
            ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(columnTypes.get(c));
            inspectors.add(oi);
            row.add(null);
        }
        inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);

    }

    public Object deserialize(Writable w) throws SerDeException {
        ObjectWritable obj = (ObjectWritable)w;
        Flow flow = (Flow)obj.get();

        for (int i = 0; i < numColumns; i++) {
            String columName = columnNames.get(i);
            Object value = flow.get(columName);
            row.set(i, value);
        }
        return row;
    }

    public ObjectInspector getObjectInspector() throws SerDeException {
        return inspector;
    }

    public SerDeStats getSerDeStats() {
        return new SerDeStats();
    }

}

有谁能帮我,告诉我做错了什么,或者至少有什么方法可以让jar更新,这样我至少可以用日志获取信息?谢谢您。

sqyvllje

sqyvllje1#

我发现了问题所在。结果,我认为hive缓存了一些用来构建表的旧jar。在调试具有相同包的jar的不同版本之前,需要首先重新启动配置单元。

相关问题