如何使用 spark 将流数据从 apache flume 过滤和转换为 rdd/data freame 以将其写入 table
How to filter and convert Stream data from apache flume to rdd/data freame using spark to write it to a table
嗨,我是 flume/Spark/Spark 流媒体的新手。我已经配置 flume 并使用 netcat 成功地将数据流式传输到 Spark。
我的要求是检查日志文件中的流数据(flume 流)中的错误并获取错误行(流中出现的行中的单词 "ERROR")和将其设为DF写入oracle。
我在下面的过滤器中遇到异常并转换为 DF 代码。请帮我解决这个问题
import org.apache.spark.streaming.flume.FlumeUtils
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.flume._
import org.apache.spark._
import org.apache.spark.streaming._
import spark.implicits._
val hostName = "10.90.3.78"
val port = 9999.toInt
val sparkStreamingContext = new StreamingContext(sc,Seconds(10))
val stream = FlumeUtils.createPollingStream(sparkStreamingContext,hostName,port)
val mappedlines = stream.map( e => new String(e.event.getBody.array()))
.filter(rec => rec.contains("ERROR"))
.map(line => line.split("ERROR"))
val arr = mappedlines.foreachRDD({status=>val DF = status.toDF()})
println(arr)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
我已经使用 Foreach 解决了它并将 RDD 转换为 DF。
它起作用了,我已经成功地将错误行插入到数据库中。
嗨,我是 flume/Spark/Spark 流媒体的新手。我已经配置 flume 并使用 netcat 成功地将数据流式传输到 Spark。
我的要求是检查日志文件中的流数据(flume 流)中的错误并获取错误行(流中出现的行中的单词 "ERROR")和将其设为DF写入oracle。
我在下面的过滤器中遇到异常并转换为 DF 代码。请帮我解决这个问题
import org.apache.spark.streaming.flume.FlumeUtils
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext,Seconds}
import org.apache.spark.streaming.flume._
import org.apache.spark._
import org.apache.spark.streaming._
import spark.implicits._
val hostName = "10.90.3.78"
val port = 9999.toInt
val sparkStreamingContext = new StreamingContext(sc,Seconds(10))
val stream = FlumeUtils.createPollingStream(sparkStreamingContext,hostName,port)
val mappedlines = stream.map( e => new String(e.event.getBody.array()))
.filter(rec => rec.contains("ERROR"))
.map(line => line.split("ERROR"))
val arr = mappedlines.foreachRDD({status=>val DF = status.toDF()})
println(arr)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
我已经使用 Foreach 解决了它并将 RDD 转换为 DF。 它起作用了,我已经成功地将错误行插入到数据库中。