Spark 没有注册输出操作,所以没有什么可执行的,但我正在写入一个文件

Spark No output operations registered, so nothing to execute but I'm writing to a file

val sc = new SparkContext(conf)

val streamContext = new StreamingContext(sc, Seconds(1))

val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
  .at(Regions.US_EAST_1)
  .withTimeout(5))


val jsonRows = sqs.mapPartitions(partitions => {
  val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))

  val txfm = new LogLine2Json
  val log = Logger.getLogger("parseLog")
  val sqlSession = SparkSession
    .builder()
    .getOrCreate()

  val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
  val parsedDate = parsedFormat.format(new java.util.Date())
  val outputPath = "/tmp/spark/presto"

  partitions.map(messages => {
    val sqsMsg = Json.parse(messages)
    System.out.println(sqsMsg)

    val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
    val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
    System.out.println(bucketName)
    System.out.println(key)
    val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
    val stream = obj.getObjectContent()
    scala.io.Source.fromInputStream(stream).getLines().map(line => {
        try{
          val str = txfm.parseLine(line)
          val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
          jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
        }
        catch {
          case e: Throwable => {log.info(line); "";}
        }
      }).filter(line => line != "{}")
    })
})

streamContext.start()
streamContext.awaitTermination()

我的工作真的很简单,我们从 SQS 获取 S3 密钥。该文件的内容是 nginx 日志,我们使用我们的解析器解析它,它是工作文件。 LogLine2Json 它将日志转换为 JSON 格式,然后我们将其写入 orc 格式。

但是我遇到了这个错误

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.liftedTree1(StreamingContext.scala:573)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at SparrowOrc$.main(sparrowOrc.scala:159)
    at SparrowOrc.main(sparrowOrc.scala)

我知道 Spark 需要一个动作,否则它将无法运行。但是我有这段代码可以写入 orc 文件。我不确定是否还需要做其他事情?

jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)

首先map不是动作。这是一种转变。 Spark 没有理由执行这段代码。

接下来,您应该避免转换中的副作用,并且如果要求输出的正确性,则永远不要使用这些副作用。

最后,在分布式系统中使用标准 io 函数通常毫无意义。

总的来说,您应该查看 DStream 接收器的现有选项,如果 none 这些选项适合您的场景,请使用操作编写您自己的选项(foreachforeachPartition).