按分隔符拆分 Spark 流
Splitting Spark stream by delimiter
我正在尝试根据分隔符拆分我的 Spark 流并将这些块中的每一个保存到一个新文件中。
我的每个 RDD 似乎都根据分隔符进行了分区。
我很难为每个 RDD 配置一个分隔符消息,或者无法将每个分区单独保存到新的 part-000...
文件。
如有任何帮助,我们将不胜感激。谢谢
val sparkConf = new SparkConf().setAppName("DataSink").setMaster("local[8]").set("spark.files.overwrite","false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
class RouteConsumer extends Actor with ActorHelper with Consumer {
def endpointUri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q"
def receive = {
case msg: CamelMessage =>
val m = msg.withBodyAs[String]
store(m.body)
}
}
val dstream = ssc.actorStream[String](Props(new RouteConsumer()), "SparkReceiverActor")
val splitStream = dstream.flatMap(_.split("MSH|^~\&"))
splitStream.foreachRDD( rdd => rdd.saveAsTextFile("file:///home/user/spark/data") )
ssc.start()
ssc.awaitTermination()
您无法控制哪个 part-NNNNN
(分区)文件获得哪个输出,但您可以写入不同的目录。 "easiest" 进行这种列拆分的方法是使用单独的 map 语句(如 SELECT
语句),类似这样,假设拆分后您将拥有 n
个数组元素:
...
val dstream2 = dstream.map(_.split("...")) // like above, but with map
dstream2.cache() // very important for what follows, repeated reads of this...
val dstreams = new Array[DStream[String]](n)
for (i <- 0 to n-1) {
dstreams[i] = dstream2.map(array => array[i] /* or similar */)
dstreams[i].saveAsTextFiles(rootDir+"/"+i)
}
ssc.start()
ssc.awaitTermination()
我正在尝试根据分隔符拆分我的 Spark 流并将这些块中的每一个保存到一个新文件中。
我的每个 RDD 似乎都根据分隔符进行了分区。
我很难为每个 RDD 配置一个分隔符消息,或者无法将每个分区单独保存到新的 part-000...
文件。
如有任何帮助,我们将不胜感激。谢谢
val sparkConf = new SparkConf().setAppName("DataSink").setMaster("local[8]").set("spark.files.overwrite","false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
class RouteConsumer extends Actor with ActorHelper with Consumer {
def endpointUri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q"
def receive = {
case msg: CamelMessage =>
val m = msg.withBodyAs[String]
store(m.body)
}
}
val dstream = ssc.actorStream[String](Props(new RouteConsumer()), "SparkReceiverActor")
val splitStream = dstream.flatMap(_.split("MSH|^~\&"))
splitStream.foreachRDD( rdd => rdd.saveAsTextFile("file:///home/user/spark/data") )
ssc.start()
ssc.awaitTermination()
您无法控制哪个 part-NNNNN
(分区)文件获得哪个输出,但您可以写入不同的目录。 "easiest" 进行这种列拆分的方法是使用单独的 map 语句(如 SELECT
语句),类似这样,假设拆分后您将拥有 n
个数组元素:
...
val dstream2 = dstream.map(_.split("...")) // like above, but with map
dstream2.cache() // very important for what follows, repeated reads of this...
val dstreams = new Array[DStream[String]](n)
for (i <- 0 to n-1) {
dstreams[i] = dstream2.map(array => array[i] /* or similar */)
dstreams[i].saveAsTextFiles(rootDir+"/"+i)
}
ssc.start()
ssc.awaitTermination()