在 Scala 中成功和失败时如何传递数据

How to pass data in case of success and failure in Scala

我开发了一个代码,用预定义的格式解析传入的 JSON 数据。所以,它按预期工作。现在我的目标是将数据发送到各自的方法 RightLeft,它们被另一个 Process 函数使用,我正在调用 DB 函数。

   package KafkaAsSource

import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

case class Premium(id: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String,Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(sampleJsonString)
      val validationMessages = schema.validate(parsedJson).asScala
      validationMessages.foreach(msg => println(msg.getMessage))
    } match {
      case Success(x) => {
        println(" Good: " + x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  " + json)
        Left(json)
      }
    }
  }

  override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: " + data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: " + json)
      }
    }
  }
}

这给了我这样的错误:

type mismatch;
 found   : x.type (with underlying type Unit)
 required: T
        Right(x)  

在这种情况下,无论数据是否正确,总是调用Success部分。我哪里错了?

Try 只会在抛出异常时将其视为失败。如果传递给 Try 的块没有抛出,则根据定义它是 Success.

因此,假设您的 schema.validate(parsedJson) 没有抛出异常,而是 returns 验证失败的集合,有两种方法:

  • 您可以留在 Try 并在失败时抛出。实现此目的的一种简单方法是使用 require:虽然它通常用于表达先决条件,但它是一种快速且可读的方式来抛出
Try {
  // as before
  val validationMessages = schema.validate(parsedJson).asScala

  require(validationMessages.isEmpty)
  parsedJson
} match {
  // as before
}
  • 您可以省去 Try(尤其是当您确定该块中其他代码的 none 会抛出异常时)而直接转到 Either,使用类似于:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala

if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)

编辑:请注意,在您编辑的代码中,您添加了

validationMessages.foreach(msg => println(msg.getMessage))

替换您之前的 parsedJson,将您的 TryTry[T] 更改为 Try[Unit],在 match.