反序列化大小大于 95Kb 的对象时出错,对于小于 95Kb 的对象工作正常

error while deserializing object of size greater than 95Kb, working fine for less than 95Kb objects

当我反序列化大于 95KB 的对象时,我得到一个 StreamCorruptedException,但代码对于小于 95KB 的对象工作正常。这是我的代码:

<!-- language: lang-java -->

package hadoop;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

   public class HadoopClient {

    static ArrayList<Long> list = new ArrayList<Long>();

    public long readFileFromHDFS(String source) throws IOException {

        Configuration obj = new Configuration();
        obj.set("fs.default.name", "hdfs://127.0.0.1:9000");

        FileSystem fs = FileSystem.get(obj);
        Path sourcePath = new Path(fs.getHomeDirectory() + source + ".txt");
        FSDataInputStream in = fs.open(sourcePath);

        byte[] b = new byte[in.available()];

        final long startTime = System.nanoTime();
        in.read(b, 0, in.available());
        final long endTime = System.nanoTime();

        in.close();
        fs.close();

        TestObject objj = null;
        try {

            ByteArrayInputStream bi = new ByteArrayInputStream(b);
            ObjectInputStream si = new ObjectInputStream(bi);
            objj = (TestObject) si.readObject();
            objj.printHello();

        } catch (Exception e) {
            System.out.println(e);
        }
        return endTime - startTime;

    }


    public long copyBuffertoHDFS(byte[] array, String destFileName, boolean replace, short replicationFacotr, long blockSize, int bufferSize) throws IOException {

        Configuration obj = new Configuration();

        obj.set("fs.default.name", "hdfs://127.0.0.1:9000");

        FileSystem fs = FileSystem.get(obj);

        String s = fs.getHomeDirectory() + destFileName + ".txt";
        Path outFile = new Path(s);

        final long startTime = System.nanoTime();
        FSDataOutputStream out = fs.create(outFile, replace, bufferSize, replicationFacotr, blockSize);

        final long endTime = System.nanoTime();

        out.write(array);
        out.close();
        return endTime - startTime;
    }


    public static void main(String[] args) throws Exception {

        HadoopClient hadoopJava = new HadoopClient();

        short replicationFactor;
        long blockSize;
        int bufferSize;
        int noOfBytes;
        int noOfEntries;
        boolean replacement;
        String str = "";


        for (int testCases = 0; testCases < args.length; testCases += 6) {

            blockSize = Integer.parseInt(args[0 + testCases]);

            replicationFactor = Short.parseShort(args[1 + testCases]);

            bufferSize = Integer.parseInt(args[2 + testCases]);

            noOfBytes = Integer.parseInt(args[3 + testCases]);

            noOfEntries = Integer.parseInt(args[4 + testCases]);

            replacement = Boolean.parseBoolean(args[5 + testCases]);

            TestObject testObject = new TestObject();
            testObject.setString(noOfBytes);

            str = hadoopJava.toStringMethod(testObject);

            hadoopJava.publishByteArrayTimer(str.getBytes("windows-1252"), noOfEntries, replacement, replicationFactor, blockSize, bufferSize);
            hadoopJava.retrieveByteArrayTimer(noOfEntries);

            Collections.sort(list);
            for (Long ll : list) {
                System.out.println(ll);
            }
            System.out.println("");
        }

    }

    public String toStringMethod(TestObject test) {
        String serializedObject = "";
        try {
            ByteArrayOutputStream bo = new ByteArrayOutputStream();
            ObjectOutputStream so = new ObjectOutputStream(bo);
            so.writeObject(test);
            so.flush();
            so.close();
            serializedObject = bo.toString("windows-1252");
            bo.flush();
            bo.close();
        } catch (Exception e) {
            System.out.println(e);
        }
        return serializedObject;

    }

    public void publishByteArrayTimer(byte[] array, int numberOfInsertions, boolean replace, short replicationFactor, long blockSize, int bufferSize) throws IOException, InterruptedException {

        long timeTaken = 0;

        for (int fileName = 0; fileName < numberOfInsertions; fileName++) {
            timeTaken = copyBuffertoHDFS(array, String.valueOf(fileName), replace, replicationFactor, blockSize, bufferSize);
            list.add(timeTaken / 1000);
            TimeUnit.MICROSECONDS.sleep(10000);
        }
    }

    public void retrieveByteArrayTimer(Integer numberOfInsertions) throws IOException {

        long timeTaken = 0;
        for (int fileName = 0; fileName < numberOfInsertions; fileName++) {

            timeTaken += readFileFromHDFS(String.valueOf(fileName));

        }
    }
}

    class TestObject implements Serializable {
        char chars[];
        String str;

        public String setString(int numberOfBytes) {
            numberOfBytes = numberOfBytes / 2;
            chars = new char[numberOfBytes];
            Arrays.fill(chars, 'a');
            str = new String(chars);
            return str;
        }

        public String getString() {
            return str;
        }

        public void printHello() {
            System.out.println("hello tester");
        }
    }

这是错误轨迹:

    java.io.StreamCorruptedException: invalid type code: 00

请帮忙。

我附上错误跟踪的屏幕截图:

我给出的示例输入如下:

你好,我还附上了错误的完整堆栈跟踪:

java.io.StreamCorruptedException: invalid type code: 00
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
    at hadoop.HadoopClient.readFileFromHDFS(HadoopClient.java:47)
    at hadoop.HadoopClient.retrieveByteArrayTimer(HadoopClient.java:159)
    at hadoop.HadoopClient.main(HadoopClient.java:114)

请帮忙 谢谢...

很可能是你的问题:

byte[] b = new byte[in.available()];
in.read(b, 0, in.available());

一般情况下,假设您通过此代码读取所有数据是错误的。

您可以使用 Apache commons-io 中的方法:org.apache.commons.io.IOUtils#toByteArray(java.io.InputStream) 或:

 ByteArrayOutputStream bos = new ByteArrayOutputStream(in.available());
 byte[] buf = new byte[4096*16];
 int c;
 while((c=in.read(buf))!=-1){
     bos.write(buf, 0, c);
 }
 byte[] data = bos.toByteArray();

您可以将 FSDataInputStream 直接传递给 ObjectInputStream,而无需使用字节数组。

public long readFileFromHDFS(String source) throws IOException {

    Configuration obj = new Configuration();
    obj.set("fs.default.name", "hdfs://127.0.0.1:9000");

    FileSystem fs = FileSystem.get(obj);
    Path sourcePath = new Path(fs.getHomeDirectory() + source + ".txt");
    FSDataInputStream in = fs.open(sourcePath);
    TestObject objj = null;

    final long startTime = System.nanoTime();
    try {
        ObjectInputStream si = new ObjectInputStream(in);
        objj = (TestObject) si.readObject();
        objj.printHello();
        si.close();
    } catch (Exception e) {
        System.out.println(e);
    } finally {
        in.close();
        fs.close();            
    }

    return System.nanoTime() - startTime;
}