Spark 没有注册输出操作,所以没有什么可执行的,但我正在写入一个文件
Spark No output operations registered, so nothing to execute but I'm writing to a file
val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(1))
val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
.at(Regions.US_EAST_1)
.withTimeout(5))
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
val sqlSession = SparkSession
.builder()
.getOrCreate()
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val outputPath = "/tmp/spark/presto"
partitions.map(messages => {
val sqsMsg = Json.parse(messages)
System.out.println(sqsMsg)
val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
System.out.println(bucketName)
System.out.println(key)
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try{
val str = txfm.parseLine(line)
val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
}
catch {
case e: Throwable => {log.info(line); "";}
}
}).filter(line => line != "{}")
})
})
streamContext.start()
streamContext.awaitTermination()
我的工作真的很简单,我们从 SQS 获取 S3 密钥。该文件的内容是 nginx 日志,我们使用我们的解析器解析它,它是工作文件。 LogLine2Json
它将日志转换为 JSON 格式,然后我们将其写入 orc
格式。
但是我遇到了这个错误
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparrowOrc$.main(sparrowOrc.scala:159)
at SparrowOrc.main(sparrowOrc.scala)
我知道 Spark 需要一个动作,否则它将无法运行。但是我有这段代码可以写入 orc 文件。我不确定是否还需要做其他事情?
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
首先map
不是动作。这是一种转变。 Spark 没有理由执行这段代码。
接下来,您应该避免转换中的副作用,并且如果要求输出的正确性,则永远不要使用这些副作用。
最后,在分布式系统中使用标准 io
函数通常毫无意义。
总的来说,您应该查看 DStream
接收器的现有选项,如果 none 这些选项适合您的场景,请使用操作编写您自己的选项(foreach
、foreachPartition
).
val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(1))
val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
.at(Regions.US_EAST_1)
.withTimeout(5))
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
val sqlSession = SparkSession
.builder()
.getOrCreate()
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val outputPath = "/tmp/spark/presto"
partitions.map(messages => {
val sqsMsg = Json.parse(messages)
System.out.println(sqsMsg)
val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
System.out.println(bucketName)
System.out.println(key)
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try{
val str = txfm.parseLine(line)
val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
}
catch {
case e: Throwable => {log.info(line); "";}
}
}).filter(line => line != "{}")
})
})
streamContext.start()
streamContext.awaitTermination()
我的工作真的很简单,我们从 SQS 获取 S3 密钥。该文件的内容是 nginx 日志,我们使用我们的解析器解析它,它是工作文件。 LogLine2Json
它将日志转换为 JSON 格式,然后我们将其写入 orc
格式。
但是我遇到了这个错误
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparrowOrc$.main(sparrowOrc.scala:159)
at SparrowOrc.main(sparrowOrc.scala)
我知道 Spark 需要一个动作,否则它将无法运行。但是我有这段代码可以写入 orc 文件。我不确定是否还需要做其他事情?
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
首先map
不是动作。这是一种转变。 Spark 没有理由执行这段代码。
接下来,您应该避免转换中的副作用,并且如果要求输出的正确性,则永远不要使用这些副作用。
最后,在分布式系统中使用标准 io
函数通常毫无意义。
总的来说,您应该查看 DStream
接收器的现有选项,如果 none 这些选项适合您的场景,请使用操作编写您自己的选项(foreach
、foreachPartition
).