如何确定 Apache Spark 中的偏移量?
How do I determine an offset in Apache Spark?
我正在搜索一些数据文件 (~20GB)。我想在该数据中找到一些特定的术语并标记匹配项的偏移量。有没有办法让 Spark 识别我正在操作的数据块的偏移量?
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import java.util.regex.*;
public class Grep {
public static void main( String args[] ) {
SparkConf conf = new SparkConf().setMaster( "spark://ourip:7077" );
JavaSparkContext jsc = new JavaSparkContext( conf );
JavaRDD<String> data = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
JavaRDD<String> filterData = data.filter( new Function<String, Boolean>() {
// I'd like to do something here to get the offset in the original file of the string "babe ruth"
public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching
});
long matches = filterData.count(); // count the hits
// execute the RDD filter
System.out.println( "Lines with search terms: " + matches );
);
} // end main
} // end class Grep
我想在 "filter" 操作中做一些事情来计算 "babe ruth" 在原始文件中的偏移量。我可以获得当前行中 "babe ruth" 的偏移量,但是告诉我该行在文件中的偏移量的过程或函数是什么?
您可以使用 wholeTextFiles(String path, int minPartitions)
方法从 JavaSparkContext
到 return a JavaPairRDD<String,String>
其中键是文件名,值是包含 a 的全部内容的字符串文件(因此,该 RDD 中的每条记录代表一个文件)。从这里开始,只需 运行 一个 map()
即可对每个值调用 indexOf(String searchString)
。这应该 return 每个文件中出现相关字符串的第一个索引。
(编辑:)
因此,以分布式方式为一个文件找到偏移量(根据下面评论中的用例)是可能的。下面是一个在 Scala 中工作的例子。
val searchString = *search string*
val rdd1 = sc.textFile(*input file*, *num partitions*)
// Zip RDD lines with their indices
val zrdd1 = rdd1.zipWithIndex()
// Find the first RDD line that contains the string in question
val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first()
// Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset)
val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 }
val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString)
请注意,您还需要在此之上手动添加任何换行符,因为它们没有被考虑在内(输入格式使用换行符作为记录之间的分界)。新行数只是包含搜索字符串的行之前的行数,因此添加起来很简单。
不幸的是,我对 Java API 并不完全熟悉,而且测试起来也不是很容易,所以我不确定下面的代码是否有效,但我确实做到了(另外,我使用 Java 1.7 但 1.8 使用 lambda 表达式压缩了很多代码。):
String searchString = *search string*;
JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt");
JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex();
Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); }
}).first();
JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); }
});
Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() {
public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); }
}).reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
}) + firstFind.productElement(0).indexOf(searchString);
这只能在您的输入是 一个 文件时完成(因为否则,zipWithIndex()
不能保证文件内的偏移量)但此方法适用于任意数量分区的 RDD,因此请随意将文件分成任意数量的块。
Spark common Hadoop Input Format可以使用。要从文件中读取字节偏移量,您可以使用 Hadoop 中的 class TextInputFormat (org.apache.hadoop.mapreduce.lib.input)。它已经与 Spark 捆绑在一起。
它将读取文件作为 key(字节偏移量)和 value(文本行):
An InputFormat for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Keys are the position in the file, and values are the line of text.
在Spark中可以通过调用newAPIHadoopFile()
来使用
SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);
// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
"file_path", // input path
TextInputFormat.class, // used input format class
LongWritable.class, // class of the value
Text.class, // class of the value
new Configuration());
JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
// you will get each line from as a tuple (offset, text)
long pos = tuple._1().get(); // extract offset
String line = tuple._2().toString(); // extract text
return pos + " " + line;
}
});
我正在搜索一些数据文件 (~20GB)。我想在该数据中找到一些特定的术语并标记匹配项的偏移量。有没有办法让 Spark 识别我正在操作的数据块的偏移量?
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import java.util.regex.*;
public class Grep {
public static void main( String args[] ) {
SparkConf conf = new SparkConf().setMaster( "spark://ourip:7077" );
JavaSparkContext jsc = new JavaSparkContext( conf );
JavaRDD<String> data = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
JavaRDD<String> filterData = data.filter( new Function<String, Boolean>() {
// I'd like to do something here to get the offset in the original file of the string "babe ruth"
public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching
});
long matches = filterData.count(); // count the hits
// execute the RDD filter
System.out.println( "Lines with search terms: " + matches );
);
} // end main
} // end class Grep
我想在 "filter" 操作中做一些事情来计算 "babe ruth" 在原始文件中的偏移量。我可以获得当前行中 "babe ruth" 的偏移量,但是告诉我该行在文件中的偏移量的过程或函数是什么?
您可以使用 wholeTextFiles(String path, int minPartitions)
方法从 JavaSparkContext
到 return a JavaPairRDD<String,String>
其中键是文件名,值是包含 a 的全部内容的字符串文件(因此,该 RDD 中的每条记录代表一个文件)。从这里开始,只需 运行 一个 map()
即可对每个值调用 indexOf(String searchString)
。这应该 return 每个文件中出现相关字符串的第一个索引。
(编辑:)
因此,以分布式方式为一个文件找到偏移量(根据下面评论中的用例)是可能的。下面是一个在 Scala 中工作的例子。
val searchString = *search string*
val rdd1 = sc.textFile(*input file*, *num partitions*)
// Zip RDD lines with their indices
val zrdd1 = rdd1.zipWithIndex()
// Find the first RDD line that contains the string in question
val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first()
// Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset)
val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 }
val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString)
请注意,您还需要在此之上手动添加任何换行符,因为它们没有被考虑在内(输入格式使用换行符作为记录之间的分界)。新行数只是包含搜索字符串的行之前的行数,因此添加起来很简单。
不幸的是,我对 Java API 并不完全熟悉,而且测试起来也不是很容易,所以我不确定下面的代码是否有效,但我确实做到了(另外,我使用 Java 1.7 但 1.8 使用 lambda 表达式压缩了很多代码。):
String searchString = *search string*;
JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt");
JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex();
Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); }
}).first();
JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); }
});
Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() {
public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); }
}).reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
}) + firstFind.productElement(0).indexOf(searchString);
这只能在您的输入是 一个 文件时完成(因为否则,zipWithIndex()
不能保证文件内的偏移量)但此方法适用于任意数量分区的 RDD,因此请随意将文件分成任意数量的块。
Spark common Hadoop Input Format可以使用。要从文件中读取字节偏移量,您可以使用 Hadoop 中的 class TextInputFormat (org.apache.hadoop.mapreduce.lib.input)。它已经与 Spark 捆绑在一起。
它将读取文件作为 key(字节偏移量)和 value(文本行):
An InputFormat for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Keys are the position in the file, and values are the line of text.
在Spark中可以通过调用newAPIHadoopFile()
SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);
// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
"file_path", // input path
TextInputFormat.class, // used input format class
LongWritable.class, // class of the value
Text.class, // class of the value
new Configuration());
JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
// you will get each line from as a tuple (offset, text)
long pos = tuple._1().get(); // extract offset
String line = tuple._2().toString(); // extract text
return pos + " " + line;
}
});