如何从 JavaStreamingContext 生成 JavaPairInputDStream?
How to generate JavaPairInputDStream from JavaStreamingContext?
我正在学习 Apache Spark 流式处理并尝试从 JavaStreamingContext
生成 JavaPairInputDStream
。下面是我的代码:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
.......
.......
SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(3));
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
JavaPairRDD<String, String> pairs1 = jssc.sparkContext().parallelizePairs(data1);
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<String, Integer>("K1", 123));
data2.add(new Tuple2<String, Integer>("K2", 456));
data2.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, String> pairs2 = jssc.sparkContext().parallelizePairs(data1);
Queue<JavaPairRDD<String, String>> inputQueue = new LinkedList<>(Arrays.asList(pairs1, pairs2));
JavaPairInputDStream<String, String> lines = jssc.queueStream(inputQueue, true);
但是我的应用程序的最后一行抛出这个异常:
The method queueStream(Queue<JavaRDD<T>>, boolean)
in the type JavaStreamingContext
is not applicable for the arguments (Queue<JavaPairRDD<String,String>>
, boolean
)
我不知道如何使用 JavaStreamingContext 生成 JavaPairInputDStream。
如果您检查 API for queueStream
方法 JavaStreamingContext
class,它接受 java.util.Queue<JavaRDD<T>>
作为队列参数。我修改了您的程序以获取 Queue<JavaRDD<T>
队列。 queueStream
方法 returns JavaInputDStream<T>
类型,这里是如何将其转换为 JavaPairDStream<String,String>
。 JavaPairDStream
class 是 JavaPairInputDStream
class 的超 class。希望这有帮助。
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkStreamTest {
public static void main(String[] args) throws Exception {
SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
//first data list
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
//javaRDD1
JavaRDD<Tuple2<String, String>> javaRDD1 = jssc.sparkContext().parallelize(data1);
//second data list
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
data2.add(new Tuple2<String, String>("K1", "123"));
data2.add(new Tuple2<String, String>("K2", "256"));
data2.add(new Tuple2<String, String>("K7", "0"));
//javaRDD2
JavaRDD<Tuple2<String, String>> javaRDD2 = jssc.sparkContext().parallelize(data2);
//Queue
Queue<JavaRDD<Tuple2<String, String>>> inputQueue = new LinkedList<JavaRDD<Tuple2<String, String>>>();
inputQueue.add(javaRDD1);
inputQueue.add(javaRDD2);
//stream
JavaInputDStream<Tuple2<String, String>> javaDStream = jssc.queueStream(inputQueue, true);
JavaPairDStream<String,String> javaPairDStream = javaDStream.mapToPair(tuple -> new Tuple2(tuple._1().toLowerCase(),tuple._2()));
//print
javaPairDStream.print();
//start
jssc.start();
jssc.awaitTermination();
}
}
我正在学习 Apache Spark 流式处理并尝试从 JavaStreamingContext
生成 JavaPairInputDStream
。下面是我的代码:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
.......
.......
SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(3));
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
JavaPairRDD<String, String> pairs1 = jssc.sparkContext().parallelizePairs(data1);
List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<String, Integer>("K1", 123));
data2.add(new Tuple2<String, Integer>("K2", 456));
data2.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, String> pairs2 = jssc.sparkContext().parallelizePairs(data1);
Queue<JavaPairRDD<String, String>> inputQueue = new LinkedList<>(Arrays.asList(pairs1, pairs2));
JavaPairInputDStream<String, String> lines = jssc.queueStream(inputQueue, true);
但是我的应用程序的最后一行抛出这个异常:
The method
queueStream(Queue<JavaRDD<T>>, boolean)
in the typeJavaStreamingContext
is not applicable for the arguments (Queue<JavaPairRDD<String,String>>
,boolean
)
我不知道如何使用 JavaStreamingContext 生成 JavaPairInputDStream。
如果您检查 API for queueStream
方法 JavaStreamingContext
class,它接受 java.util.Queue<JavaRDD<T>>
作为队列参数。我修改了您的程序以获取 Queue<JavaRDD<T>
队列。 queueStream
方法 returns JavaInputDStream<T>
类型,这里是如何将其转换为 JavaPairDStream<String,String>
。 JavaPairDStream
class 是 JavaPairInputDStream
class 的超 class。希望这有帮助。
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkStreamTest {
public static void main(String[] args) throws Exception {
SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
//first data list
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
//javaRDD1
JavaRDD<Tuple2<String, String>> javaRDD1 = jssc.sparkContext().parallelize(data1);
//second data list
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
data2.add(new Tuple2<String, String>("K1", "123"));
data2.add(new Tuple2<String, String>("K2", "256"));
data2.add(new Tuple2<String, String>("K7", "0"));
//javaRDD2
JavaRDD<Tuple2<String, String>> javaRDD2 = jssc.sparkContext().parallelize(data2);
//Queue
Queue<JavaRDD<Tuple2<String, String>>> inputQueue = new LinkedList<JavaRDD<Tuple2<String, String>>>();
inputQueue.add(javaRDD1);
inputQueue.add(javaRDD2);
//stream
JavaInputDStream<Tuple2<String, String>> javaDStream = jssc.queueStream(inputQueue, true);
JavaPairDStream<String,String> javaPairDStream = javaDStream.mapToPair(tuple -> new Tuple2(tuple._1().toLowerCase(),tuple._2()));
//print
javaPairDStream.print();
//start
jssc.start();
jssc.awaitTermination();
}
}