在 Scala 中成功和失败时如何传递数据
How to pass data in case of success and failure in Scala
我开发了一个代码,用预定义的格式解析传入的 JSON
数据。所以,它按预期工作。现在我的目标是将数据发送到各自的方法 Right
和 Left
,它们被另一个 Process
函数使用,我正在调用 DB
函数。
package KafkaAsSource
import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
case class Premium(id: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String,Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
validationMessages.foreach(msg => println(msg.getMessage))
} match {
case Success(x) => {
println(" Good: " + x)
Right(x)
}
case Failure(err) => {
println("Bad: " + json)
Left(json)
}
}
}
override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
}
}
}
}
这给了我这样的错误:
type mismatch;
found : x.type (with underlying type Unit)
required: T
Right(x)
在这种情况下,无论数据是否正确,总是调用Success
部分。我哪里错了?
Try
只会在抛出异常时将其视为失败。如果传递给 Try
的块没有抛出,则根据定义它是 Success
.
因此,假设您的 schema.validate(parsedJson)
没有抛出异常,而是 returns 验证失败的集合,有两种方法:
- 您可以留在
Try
并在失败时抛出。实现此目的的一种简单方法是使用 require
:虽然它通常用于表达先决条件,但它是一种快速且可读的方式来抛出
Try {
// as before
val validationMessages = schema.validate(parsedJson).asScala
require(validationMessages.isEmpty)
parsedJson
} match {
// as before
}
- 您可以省去
Try
(尤其是当您确定该块中其他代码的 none 会抛出异常时)而直接转到 Either
,使用类似于:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)
编辑:请注意,在您编辑的代码中,您添加了
validationMessages.foreach(msg => println(msg.getMessage))
替换您之前的 parsedJson
,将您的 Try
从 Try[T]
更改为 Try[Unit]
,在 match
.
我开发了一个代码,用预定义的格式解析传入的 JSON
数据。所以,它按预期工作。现在我的目标是将数据发送到各自的方法 Right
和 Left
,它们被另一个 Process
函数使用,我正在调用 DB
函数。
package KafkaAsSource
import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
case class Premium(id: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String,Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
validationMessages.foreach(msg => println(msg.getMessage))
} match {
case Success(x) => {
println(" Good: " + x)
Right(x)
}
case Failure(err) => {
println("Bad: " + json)
Left(json)
}
}
}
override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
}
}
}
}
这给了我这样的错误:
type mismatch;
found : x.type (with underlying type Unit)
required: T
Right(x)
在这种情况下,无论数据是否正确,总是调用Success
部分。我哪里错了?
Try
只会在抛出异常时将其视为失败。如果传递给 Try
的块没有抛出,则根据定义它是 Success
.
因此,假设您的 schema.validate(parsedJson)
没有抛出异常,而是 returns 验证失败的集合,有两种方法:
- 您可以留在
Try
并在失败时抛出。实现此目的的一种简单方法是使用require
:虽然它通常用于表达先决条件,但它是一种快速且可读的方式来抛出
Try {
// as before
val validationMessages = schema.validate(parsedJson).asScala
require(validationMessages.isEmpty)
parsedJson
} match {
// as before
}
- 您可以省去
Try
(尤其是当您确定该块中其他代码的 none 会抛出异常时)而直接转到Either
,使用类似于:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)
编辑:请注意,在您编辑的代码中,您添加了
validationMessages.foreach(msg => println(msg.getMessage))
替换您之前的 parsedJson
,将您的 Try
从 Try[T]
更改为 Try[Unit]
,在 match
.