在 MapReduce 中读取由于 /n 而分成两行的记录

Reading a record broken down into two lines because of /n in MapReduce

我正在尝试编写一个自定义 reader,它为我提供读取具有定义字段数的记录(位于两行中)的目的。

例如

1,2,3,4("," can be there or not)
,5,6,7,8

我的要求是读取记录并将其作为单个记录推送到映射器中,如 {1,2,3,4,5,6,7,8}。请提供一些意见。

更新:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        key = new LongWritable();
    }

    //Current offset is the key
    key.set(pos); 

    if(value == null) {
        value = new Text();
    }

    int newSize = 0;
    int numFields = 0;
    Text temp = new Text();
    boolean firstRead = true;

    while(numFields < reqFields) {
        while(pos < end) {
            //Read up to the '\n' character and store it in 'temp'
            newSize = in.readLine(  temp, 
                                    maxLineLength, 
                                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                                             maxLineLength));

            //If 0 bytes were read, then we are at the end of the split
            if(newSize == 0) {
                break;
            }

            //Otherwise update 'pos' with the number of bytes read
            pos += newSize;

            //If the line is not too long, check number of fields
            if(newSize < maxLineLength) {
                break;
            }

            //Line too long, try again
            LOG.info("Skipped line of size " + newSize + " at pos " + 
                        (pos - newSize));
        }

        //Exit, since we're at the end of split
        if(newSize == 0) {
            break;
        }
        else {
            String record = temp.toString();
            StringTokenizer fields = new StringTokenizer(record,"|");

            numFields += fields.countTokens();

            //Reset 'value' if this is the first append
            if(firstRead) {
                value = new Text();
                firstRead = false;
            }

            if(numFields != reqFields) {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
            else {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    }
    else {
        return true;
    }
}

}

这是我正在尝试处理的 nextKeyValue 方法。但是映射器仍然没有获得正确的值。 reqFields 为 4

看看TextInputFormat是如何实现的。看看它的超类 FileInputFormat 也是如此。您必须继承 FileInputFormat 的 Either TextInputFormat 并实现您自己的记录处理。

实现任何类型的文件输入格式时要注意的是:

框架将拆分文件并为您提供您必须读取的文件片段的起始偏移量和字节长度。很可能会将文件拆分到一些记录中。这就是为什么如果该记录未完全包含在拆分中,您的 reader 必须跳过拆分开头的记录字节,以及读取拆分的最后一个字节以读取整个最后一条记录的原因如果那个没有完全包含在拆分中。

例如,TextInoutFormat 将 \n 字符视为记录分隔符,因此当它进行拆分时,它会跳过字节直到第一个 \n 字符并读取拆分的末尾直到 \n 字符。

至于代码示例:

您需要问自己以下问题:假设您打开文件,寻找到一个随机位置并开始向前阅读。 你如何检测记录的开始?我在你的代码中没有看到任何处理它的东西,没有它,你不能写出好的输入格式,因为你不不知道记录边界是什么。

现在,通过使 isSplittable(JobContext,Path) 方法 return 为 false,仍然可以使输入格式从头到尾读取整个文件。这使得文件完全由单个映射任务读取,从而降低了并行度。

您的内部 while 循环似乎有问题,因为它正在检查太长的行并跳过它们。鉴于您的记录是使用多行编写的,因此在阅读记录时可能会合并一条记录的一部分和另一条记录的另一部分。

必须使用 StringTokenizer 对字符串进行标记化,而不是拆分。代码已更新为新的实现。