Spark 将 JavaPairDStream 流式传输到文本文件
Spark streaming JavaPairDStream to text file
我对 Spark streaming 很陌生,我一直在保存我的输出。
我的问题是,如何将我的 JavaPairDStream 的输出保存在文本文件中,该文本文件仅使用 DStream 中的元素为每个文件更新?
例如,以 wordCount 为例,
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
我将使用 wordCounts.print()
、
得到以下输出
(Hello,1)
(World,1)
我想将最后几行写入一个文本文件,该文件每批刷新一次 wordCounts
。
我试过以下方法,
mappedRDD.dstream().saveAsTextFiles("output","txt");
这会在每次批处理时生成一堆包含多个无意义文件的目录。
另一种方法是,
mappedRDD.foreachRDD(new Function2<JavaPairDStream<String, Integer>, Time, Void>() {
public Void Call(JavaPairDStream<String, Integer> rdd, Time time)
{
//Something over rdd to save its content on a file???
return null;
}
});
非常感谢您的帮助。
谢谢
你可以像下面那样做。 Here 与 saveAsTextFile 输出多个文件有关 post。
wordCounts.foreachRDD(rdd ->{
if(!rdd.isEmpty()){
rdd.coalesce(1).saveAsTextFile("c:\temp\count\");
}
});
我对 Spark streaming 很陌生,我一直在保存我的输出。
我的问题是,如何将我的 JavaPairDStream 的输出保存在文本文件中,该文本文件仅使用 DStream 中的元素为每个文件更新?
例如,以 wordCount 为例,
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
我将使用 wordCounts.print()
、
(Hello,1)
(World,1)
我想将最后几行写入一个文本文件,该文件每批刷新一次 wordCounts
。
我试过以下方法,
mappedRDD.dstream().saveAsTextFiles("output","txt");
这会在每次批处理时生成一堆包含多个无意义文件的目录。
另一种方法是,
mappedRDD.foreachRDD(new Function2<JavaPairDStream<String, Integer>, Time, Void>() {
public Void Call(JavaPairDStream<String, Integer> rdd, Time time)
{
//Something over rdd to save its content on a file???
return null;
}
});
非常感谢您的帮助。
谢谢
你可以像下面那样做。 Here 与 saveAsTextFile 输出多个文件有关 post。
wordCounts.foreachRDD(rdd ->{
if(!rdd.isEmpty()){
rdd.coalesce(1).saveAsTextFile("c:\temp\count\");
}
});