Scala - 来自回调的 Return 值
Scala - Return value from Callbacks
我对 Scala 编程还很陌生。有人可以帮我从回调中获得 return 值吗?我如何从调用方法中 return 作为 JsObject 的回调值?我正在将 Play2 框架与演员系统一起使用。请让我知道我的 return 类型是否错误,与 SendToKafka 方法中的 JsObject 相比,我应该 return Future。
我有以下代码
override def SendToKafka(data: JsValue): Option[JsObject] = {
val props: Map[String, AnyRef] = Map(
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "CountryCounter",
"key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "http://localhost:8081"
)
val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/test.avsc")).mkString)
val gRecord: GenericRecord = new GenericData.Record(schema)
gRecord.put("emp_id", request.emp_id)
val producer = new KafkaProducer[Int, GenericRecord](props.asJava)
val record = new ProducerRecord("Emp", 1, gRecord)
val promise = Promise[RecordMetadata]()
producer.send(record, producerCallback(promise))
val f = promise.future
val returnValue : Some[JsObject] =null
val con = Future {
f onComplete {
case Success(r) => accessLogger.info("r" + r.offset())
case Failure(e) => accessLogger.info("e "+ e)
}
// I would like to return offset as JsObject or exception ( if any )
}
private def producerCallback(promise: Promise[RecordMetadata]): Callback = {
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val result = if (exception == null) {
//accessLogger.info("offset - " + metadata.offset())
// I would like to return this offset as JsObject
Success(metadata)
}
else {
accessLogger.error(exception.printStackTrace().toString)
Failure(exception)
// I would like to return exception (if any ) as JsObject
}
promise.complete(result)
}
}
}
因为promise
是Promise[RecordMetadata]
类型,而f
是promise.future
,所以f
是Future[RecordMetadata]
类型。在 promise.complete(result)
中,未来将保持 result
的任何状态。
Future 可能最终包含失败(即在您的回调中 Failure(exception)
),因此需要处理(下面使用 match/case)
Await.ready
可以用来等到 future 有一个 Success
或 Failure
——但是如果没有这样的阻塞调用,在同一个方法中,future 可能会赢'尚未完成。
import scala.concurrent.duration._
import scala.concurrent._
...
// arbitrary time -- set an appropriate wait time
val fReady: Future[RecordMetadata] = Await.ready(f, 4.seconds)
// After Await.ready is called, *up to* the duration (4s here) has elapsed and the future should have a result
// you probably need to change the return type to Either if you use this approach,
// or change this to Option type and ignore the failure, assuming that the exception is logged already
val result: Either[Throwable, Int] = fReady.value match {
case Some(Success(a)) => Right(a) // you can edit this to compute a JsValue from `a` if you want
case Some(Failure(b)) => Left(b)
case None => Left(new RuntimeException("Unexpected"))
}
// can be return type or edit this
result
我对 Scala 编程还很陌生。有人可以帮我从回调中获得 return 值吗?我如何从调用方法中 return 作为 JsObject 的回调值?我正在将 Play2 框架与演员系统一起使用。请让我知道我的 return 类型是否错误,与 SendToKafka 方法中的 JsObject 相比,我应该 return Future。
我有以下代码
override def SendToKafka(data: JsValue): Option[JsObject] = {
val props: Map[String, AnyRef] = Map(
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "CountryCounter",
"key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "http://localhost:8081"
)
val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/test.avsc")).mkString)
val gRecord: GenericRecord = new GenericData.Record(schema)
gRecord.put("emp_id", request.emp_id)
val producer = new KafkaProducer[Int, GenericRecord](props.asJava)
val record = new ProducerRecord("Emp", 1, gRecord)
val promise = Promise[RecordMetadata]()
producer.send(record, producerCallback(promise))
val f = promise.future
val returnValue : Some[JsObject] =null
val con = Future {
f onComplete {
case Success(r) => accessLogger.info("r" + r.offset())
case Failure(e) => accessLogger.info("e "+ e)
}
// I would like to return offset as JsObject or exception ( if any )
}
private def producerCallback(promise: Promise[RecordMetadata]): Callback = {
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val result = if (exception == null) {
//accessLogger.info("offset - " + metadata.offset())
// I would like to return this offset as JsObject
Success(metadata)
}
else {
accessLogger.error(exception.printStackTrace().toString)
Failure(exception)
// I would like to return exception (if any ) as JsObject
}
promise.complete(result)
}
}
}
因为promise
是Promise[RecordMetadata]
类型,而f
是promise.future
,所以f
是Future[RecordMetadata]
类型。在 promise.complete(result)
中,未来将保持 result
的任何状态。
Future 可能最终包含失败(即在您的回调中 Failure(exception)
),因此需要处理(下面使用 match/case)
Await.ready
可以用来等到 future 有一个 Success
或 Failure
——但是如果没有这样的阻塞调用,在同一个方法中,future 可能会赢'尚未完成。
import scala.concurrent.duration._
import scala.concurrent._
...
// arbitrary time -- set an appropriate wait time
val fReady: Future[RecordMetadata] = Await.ready(f, 4.seconds)
// After Await.ready is called, *up to* the duration (4s here) has elapsed and the future should have a result
// you probably need to change the return type to Either if you use this approach,
// or change this to Option type and ignore the failure, assuming that the exception is logged already
val result: Either[Throwable, Int] = fReady.value match {
case Some(Success(a)) => Right(a) // you can edit this to compute a JsValue from `a` if you want
case Some(Failure(b)) => Left(b)
case None => Left(new RuntimeException("Unexpected"))
}
// can be return type or edit this
result