使用 Spark Streaming 上下文时如何将 Seq 转换为 RDD
How to convert Seq to RDD when working with spark streaming context
我正在使用 TestSuiteBase
创建一些带有 spark-streaming
的测试(使用 spark 流上下文 scc
)。然后我使用 output: Seq[Seq[(Double, Double)]]
创建虚拟数据。最后我想对 output
应用一些函数,但是这个函数接受 RDD[(Double, Double)]
,而不是 Seq[Seq[(Double, Double)]]
。
为了解决这个问题,我正在考虑使用 val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten)
,但是我应该如何以及在何处从 scc
中获取 spark 上下文 sc
?或者,也许有什么方法可以在不使用 Seq
?
的情况下直接在 RDD
中创建虚拟数据
class StreamingTestLR extends SparkFunSuite
with TestSuiteBase {
// use longer wait time to ensure job completion
override def maxWaitTimeMillis: Int = 20000
var ssc: StreamingContext = _
override def afterFunction() {
super.afterFunction()
if (ssc != null) {
ssc.stop()
}
}
//...
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
// THE PROBLEM IS HERE!!!
// val metrics = new SomeFuncThatAcceptsRDD(rdd)
}
更新
// Test if the prediction accuracy of increases when using hyper-parameter optimization
// in order to learn Y = 10*X1 + 10*X2 on streaming data
test("Test 1") {
// create model initialized with zero weights
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
// generate sequence of simulated data for testing
val numBatches = 10
val nPoints = 100
val testInput = (0 until numBatches).map { i =>
LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
}
val inputDStream = DStream[LabeledPoint]
withStreamingContext(setupStreams(testInput, inputDStream)) { ssc =>
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten)
// Instantiate metrics object
val metrics = new RegressionMetrics(rdd)
// Squared error
println(s"MSE = ${metrics.meanSquaredError}")
println(s"RMSE = ${metrics.rootMeanSquaredError}")
// R-squared
println(s"R-squared = ${metrics.r2}")
// Mean absolute error
println(s"MAE = ${metrics.meanAbsoluteError}")
// Explained variance
println(s"Explained variance = ${metrics.explainedVariance}")
}
}
试试这个:
class MyTestSuite extends TestSuiteBase with BeforeAndAfter {
test("my test") {
withTestServer(new TestServer()) { testServer =>
// Start the server
testServer.start()
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val rdd = ssc.sparkContext.parallelize(output.flatten)
// your code here
testServer.stop()
ssc.stop()
}
}
}
}
我正在使用 TestSuiteBase
创建一些带有 spark-streaming
的测试(使用 spark 流上下文 scc
)。然后我使用 output: Seq[Seq[(Double, Double)]]
创建虚拟数据。最后我想对 output
应用一些函数,但是这个函数接受 RDD[(Double, Double)]
,而不是 Seq[Seq[(Double, Double)]]
。
为了解决这个问题,我正在考虑使用 val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten)
,但是我应该如何以及在何处从 scc
中获取 spark 上下文 sc
?或者,也许有什么方法可以在不使用 Seq
?
RDD
中创建虚拟数据
class StreamingTestLR extends SparkFunSuite
with TestSuiteBase {
// use longer wait time to ensure job completion
override def maxWaitTimeMillis: Int = 20000
var ssc: StreamingContext = _
override def afterFunction() {
super.afterFunction()
if (ssc != null) {
ssc.stop()
}
}
//...
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
// THE PROBLEM IS HERE!!!
// val metrics = new SomeFuncThatAcceptsRDD(rdd)
}
更新
// Test if the prediction accuracy of increases when using hyper-parameter optimization
// in order to learn Y = 10*X1 + 10*X2 on streaming data
test("Test 1") {
// create model initialized with zero weights
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
// generate sequence of simulated data for testing
val numBatches = 10
val nPoints = 100
val testInput = (0 until numBatches).map { i =>
LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
}
val inputDStream = DStream[LabeledPoint]
withStreamingContext(setupStreams(testInput, inputDStream)) { ssc =>
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten)
// Instantiate metrics object
val metrics = new RegressionMetrics(rdd)
// Squared error
println(s"MSE = ${metrics.meanSquaredError}")
println(s"RMSE = ${metrics.rootMeanSquaredError}")
// R-squared
println(s"R-squared = ${metrics.r2}")
// Mean absolute error
println(s"MAE = ${metrics.meanAbsoluteError}")
// Explained variance
println(s"Explained variance = ${metrics.explainedVariance}")
}
}
试试这个:
class MyTestSuite extends TestSuiteBase with BeforeAndAfter {
test("my test") {
withTestServer(new TestServer()) { testServer =>
// Start the server
testServer.start()
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val rdd = ssc.sparkContext.parallelize(output.flatten)
// your code here
testServer.stop()
ssc.stop()
}
}
}
}