Flume+Spark - 在 HDFS 中存储 DStream

Flume+Spark - Storing DStream in HDFS

我有 flume 流,我想通过 spark 将它存储在 HDFS 中。下面是我是 运行

的 spark 代码
object FlumePull {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    val batchInterval = Milliseconds(60000)
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)

    stream.map(x => x + "!!!!")
          .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")

    ssc.start()
    ssc.awaitTermination()
  }
}

当我开始我的 spsark 流媒体作业时,它确实将输出存储在 HDFS 中,但输出是这样的:

[root@sandbox ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
org.apache.spark.streaming.flume.SparkFlumeEvent@1b9bd2c9!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@33fd3a48!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@35fd67a2!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@f9ed85f!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@58f4cfc!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@307373e!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@4ebbc8ff!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@a8905bb!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29d73d64!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@71ff85b1!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@3ea261ef!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@16cbb209!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@17157890!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29e41c7!!!!

它正在存储 flume 事件而不是来自 Flume 的数据。它如何从中获取数据?

谢谢

您需要从 SparkFlumeEvent 中提取底层缓冲区并保存。例如,如果您的事件主体是 String:

stream.map(x => new String(x.event.getBody.array) + "!!!!")
      .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")