我可以将传入的数据流转换为数组吗?
Can I convert an incoming stream of data into an array?
我正在尝试学习流数据并使用提供的电信客户流失数据集对其进行操作 here。我写了一个方法来批量计算:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
object batchChurn{
def main(args: Array[String]): Unit = {
//setting spark context
val conf = new SparkConf().setAppName("churn")
val sc = new SparkContext(conf)
//loading and mapping data into RDD
val csv = sc.textFile("file://filename.csv")
val data = csv.map {line =>
val parts = line.split(",").map(_.trim)
val stringvec = Array(parts(1)) ++ parts.slice(4,20)
val label = parts(20).toDouble
val vec = stringvec.map(_.toDouble)
LabeledPoint(label, Vectors.dense(vec))
}
val splits = data.randomSplit(Array(0.7,0.3))
val (training, testing) = (splits(0),splits(1))
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 6
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 7
val maxBins = 32
val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val labelAndPreds = testing.map {point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
}
}
我对此没有任何问题。现在,我查看了 spark 网站上提供的 NetworkWordCount 示例,并稍微更改了代码以查看其行为方式。
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val data = lines.flatMap(_.split(","))
我的问题是:是否可以将此 DStream 转换为我可以输入到我的分析代码中的数组?目前,当我尝试使用 val data = lines.flatMap(_.split(","))
转换为数组时,它清楚地表明:error: value toArray is not a member of org.apache.spark.streaming.dstream.DStream[String]
您的 DStream 包含许多 RDD,您可以使用 foreachRDD
函数访问 RDD。
然后可以使用collect
函数将每个RDD转换为数组。
这已经显示在这里
For each RDD in a DStream how do I convert this to an array or some other typical Java data type?
DStream.foreachRDD 为每个时间间隔提供一个 RDD[String]
当然,你可以收集在一个数组中
val arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
另外请记住,由于 DStream 可能很大,您最终可能会在驱动程序中拥有比您想要的更多的数据。
为了限制您分析的数据,我会这样做
data.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
您不能将 DStream
的所有元素都放在一个数组中,因为这些元素将通过网络不断读取,并且您的数组必须是无限可扩展的。
由于算法原因,将此决策树模型适应流模式(其中训练和测试数据不断到达)并非微不足道——虽然提到收集的答案在技术上是正确的,但它们不是解决问题的合适解决方案你正在尝试做。
如果您想 运行 Spark 中流上的决策树,您可能需要查看 Hoeffding trees。
我正在尝试学习流数据并使用提供的电信客户流失数据集对其进行操作 here。我写了一个方法来批量计算:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
object batchChurn{
def main(args: Array[String]): Unit = {
//setting spark context
val conf = new SparkConf().setAppName("churn")
val sc = new SparkContext(conf)
//loading and mapping data into RDD
val csv = sc.textFile("file://filename.csv")
val data = csv.map {line =>
val parts = line.split(",").map(_.trim)
val stringvec = Array(parts(1)) ++ parts.slice(4,20)
val label = parts(20).toDouble
val vec = stringvec.map(_.toDouble)
LabeledPoint(label, Vectors.dense(vec))
}
val splits = data.randomSplit(Array(0.7,0.3))
val (training, testing) = (splits(0),splits(1))
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 6
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 7
val maxBins = 32
val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val labelAndPreds = testing.map {point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
}
}
我对此没有任何问题。现在,我查看了 spark 网站上提供的 NetworkWordCount 示例,并稍微更改了代码以查看其行为方式。
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val data = lines.flatMap(_.split(","))
我的问题是:是否可以将此 DStream 转换为我可以输入到我的分析代码中的数组?目前,当我尝试使用 val data = lines.flatMap(_.split(","))
转换为数组时,它清楚地表明:error: value toArray is not a member of org.apache.spark.streaming.dstream.DStream[String]
您的 DStream 包含许多 RDD,您可以使用 foreachRDD
函数访问 RDD。
然后可以使用collect
函数将每个RDD转换为数组。
这已经显示在这里
For each RDD in a DStream how do I convert this to an array or some other typical Java data type?
DStream.foreachRDD 为每个时间间隔提供一个 RDD[String] 当然,你可以收集在一个数组中
val arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
另外请记住,由于 DStream 可能很大,您最终可能会在驱动程序中拥有比您想要的更多的数据。
为了限制您分析的数据,我会这样做
data.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
您不能将 DStream
的所有元素都放在一个数组中,因为这些元素将通过网络不断读取,并且您的数组必须是无限可扩展的。
由于算法原因,将此决策树模型适应流模式(其中训练和测试数据不断到达)并非微不足道——虽然提到收集的答案在技术上是正确的,但它们不是解决问题的合适解决方案你正在尝试做。
如果您想 运行 Spark 中流上的决策树,您可能需要查看 Hoeffding trees。