以编程方式在 apache spark 中创建 dstream
Programatically creating dstreams in apache spark
我正在围绕 Apache Spark Streaming 编写一些独立的集成测试。
我想测试我的代码是否可以在我的模拟测试数据中提取各种边缘情况。
当我使用常规 RDD(不是流式传输)执行此操作时。我可以使用我的内联数据并对其调用 "parallelize" 以将其转换为 spark RDD。
但是,我找不到这种创建流媒体的方法。理想情况下,我想偶尔调用一些 "push" 函数,让 tupple 神奇地出现在我的 dstream 中。
ATM 我通过使用 Apache Kafka 来做到这一点:我创建了一个临时队列,然后写入它。但这似乎有点矫枉过正。我宁愿直接从我的测试数据创建 test-dstream,而不必使用 Kafka 作为中介。
这里的关键是调用"store"命令。用你想要的任何东西替换商店的内容。
出于测试目的,您可以从 RDD 队列创建输入流。
在队列中推送更多 RDD 将模拟在批处理间隔内处理更多事件。
val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)
inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc
val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
除了 Raphael 解决方案之外,我认为您还喜欢可以一次处理一批或一切可用的方法。您需要在 queustream 的可选方法参数上相应地设置 oneAtATime 标志,如下所示:
val slideDuration = Milliseconds(100)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]")
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[String]]()
val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true)
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)
rddOneQueuesAtATimeDS.print(120)
rddFloodOfQueuesDS.print(120)
streamingContext.start()
for (i <- (1 to 10)) {
queueOfRDDs += sparkContext.makeRDD(simplePurchase(i))
queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3)))
Thread.sleep(slideDuration.milliseconds)
}
Thread.sleep(1000L)
我正在围绕 Apache Spark Streaming 编写一些独立的集成测试。 我想测试我的代码是否可以在我的模拟测试数据中提取各种边缘情况。 当我使用常规 RDD(不是流式传输)执行此操作时。我可以使用我的内联数据并对其调用 "parallelize" 以将其转换为 spark RDD。 但是,我找不到这种创建流媒体的方法。理想情况下,我想偶尔调用一些 "push" 函数,让 tupple 神奇地出现在我的 dstream 中。 ATM 我通过使用 Apache Kafka 来做到这一点:我创建了一个临时队列,然后写入它。但这似乎有点矫枉过正。我宁愿直接从我的测试数据创建 test-dstream,而不必使用 Kafka 作为中介。
这里的关键是调用"store"命令。用你想要的任何东西替换商店的内容。
出于测试目的,您可以从 RDD 队列创建输入流。 在队列中推送更多 RDD 将模拟在批处理间隔内处理更多事件。
val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)
inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc
val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
除了 Raphael 解决方案之外,我认为您还喜欢可以一次处理一批或一切可用的方法。您需要在 queustream 的可选方法参数上相应地设置 oneAtATime 标志,如下所示:
val slideDuration = Milliseconds(100)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]")
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[String]]()
val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true)
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)
rddOneQueuesAtATimeDS.print(120)
rddFloodOfQueuesDS.print(120)
streamingContext.start()
for (i <- (1 to 10)) {
queueOfRDDs += sparkContext.makeRDD(simplePurchase(i))
queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3)))
Thread.sleep(slideDuration.milliseconds)
}
Thread.sleep(1000L)