spark java api 有像 hadoop MultipleOutputs / FSDataOutputStream 这样的 class 吗?
do spark java api has the class like hadoop MultipleOutputs / FSDataOutputStream?
我正在尝试在reduce部分输出一些特定的记录,这取决于键值记录的值。在 hadoop mapreduce 中可以使用像
这样的代码
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration ();
FileSystem fs = FileSystem.get (conf);
int taskID = context.getTaskAttemptID().getTaskID().getId();
hdfsOutWriter = fs.create (new Path (fileName + taskID), true); // FSDataOutputStream
}
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
boolean isSpecificRecord = false;
ArrayList <String> valueList = new ArrayList <String> ();
for (Text val : value) {
String element = val.toString ();
if (filterFunction (element)) return;
if (specificFunction (element)) isSpecificRecord = true;
valueList.add (element);
}
String returnValue = anyFunction (valueList);
String specificInfo = anyFunction2 (valueList);
if (isSpecificRecord) hdfsOutWriter.writeBytes (key.toString () + "\t" + specificInfo);
context.write (key, new Text (returnValue));
}
我想 运行 在 spark 集群上执行这个过程,可以 spark java api 像上面的代码那样做吗?
只是想知道如何模拟:
yoursRDD.mapPartitions(iter => {
val fs = FileSystem.get(new Configuration())
val ds = fs.create(new Path("outfileName_" + TaskContext.get.partitionId))
ds.writeBytes("Put yours results")
ds.close()
iter
})
我正在尝试在reduce部分输出一些特定的记录,这取决于键值记录的值。在 hadoop mapreduce 中可以使用像
这样的代码public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration ();
FileSystem fs = FileSystem.get (conf);
int taskID = context.getTaskAttemptID().getTaskID().getId();
hdfsOutWriter = fs.create (new Path (fileName + taskID), true); // FSDataOutputStream
}
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
boolean isSpecificRecord = false;
ArrayList <String> valueList = new ArrayList <String> ();
for (Text val : value) {
String element = val.toString ();
if (filterFunction (element)) return;
if (specificFunction (element)) isSpecificRecord = true;
valueList.add (element);
}
String returnValue = anyFunction (valueList);
String specificInfo = anyFunction2 (valueList);
if (isSpecificRecord) hdfsOutWriter.writeBytes (key.toString () + "\t" + specificInfo);
context.write (key, new Text (returnValue));
}
我想 运行 在 spark 集群上执行这个过程,可以 spark java api 像上面的代码那样做吗?
只是想知道如何模拟:
yoursRDD.mapPartitions(iter => {
val fs = FileSystem.get(new Configuration())
val ds = fs.create(new Path("outfileName_" + TaskContext.get.partitionId))
ds.writeBytes("Put yours results")
ds.close()
iter
})