Flume+Spark - 在 HDFS 中存储 DStream
Flume+Spark - Storing DStream in HDFS
我有 flume 流,我想通过 spark 将它存储在 HDFS 中。下面是我是 运行
的 spark 代码
object FlumePull {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumePollingEventCount <host> <port>")
System.exit(1)
}
val batchInterval = Milliseconds(60000)
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)
stream.map(x => x + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")
ssc.start()
ssc.awaitTermination()
}
}
当我开始我的 spsark 流媒体作业时,它确实将输出存储在 HDFS 中,但输出是这样的:
[root@sandbox ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
org.apache.spark.streaming.flume.SparkFlumeEvent@1b9bd2c9!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@33fd3a48!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@35fd67a2!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@f9ed85f!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@58f4cfc!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@307373e!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@4ebbc8ff!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@a8905bb!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29d73d64!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@71ff85b1!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@3ea261ef!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@16cbb209!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@17157890!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29e41c7!!!!
它正在存储 flume 事件而不是来自 Flume 的数据。它如何从中获取数据?
谢谢
您需要从 SparkFlumeEvent
中提取底层缓冲区并保存。例如,如果您的事件主体是 String
:
stream.map(x => new String(x.event.getBody.array) + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")
我有 flume 流,我想通过 spark 将它存储在 HDFS 中。下面是我是 运行
的 spark 代码object FlumePull {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumePollingEventCount <host> <port>")
System.exit(1)
}
val batchInterval = Milliseconds(60000)
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)
stream.map(x => x + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")
ssc.start()
ssc.awaitTermination()
}
}
当我开始我的 spsark 流媒体作业时,它确实将输出存储在 HDFS 中,但输出是这样的:
[root@sandbox ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
org.apache.spark.streaming.flume.SparkFlumeEvent@1b9bd2c9!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@33fd3a48!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@35fd67a2!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@f9ed85f!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@58f4cfc!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@307373e!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@4ebbc8ff!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@a8905bb!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29d73d64!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@71ff85b1!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@3ea261ef!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@16cbb209!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@17157890!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29e41c7!!!!
它正在存储 flume 事件而不是来自 Flume 的数据。它如何从中获取数据?
谢谢
您需要从 SparkFlumeEvent
中提取底层缓冲区并保存。例如,如果您的事件主体是 String
:
stream.map(x => new String(x.event.getBody.array) + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")