找到 java.util.concurrent.Future 需要 scala.concurrent.Future
Found java.util.concurrent.Future Required scala.concurrent.Future
相关:scala.concurrent.Future wrapper for java.util.concurrent.Future
这来自我的另一个问题:
我有一个 AKKA HTTP 应用程序,我想在路由的 onComplete 函数中向 Kafka 发送一个 message/ProducerRecord,如下所示:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
但是,onComplete(producer send producerRecord) 正在生成以下类型不匹配错误:
[错误] 发现:Future[org.apache.kafka.clients.producer.RecordMetadata](在 java.util.concurrent 中)
[错误]需要:未来[org.apache.kafka.clients.producer.RecordMetadata](在scala.concurrent中)
[错误] onCompleteRecordMetadata { _ =>
有什么办法可以解决这个问题,也许可以使用生产者作为接收器 (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) 而不是 java producer.send功能?
您可以利用 Cake's Scala based Kafka client,它将完成 运行 Java 期货的工作,并为您提供 Scala 期货。一旦你确定你创建的是 cakesolutions.kafka.KafkaProducer
而不是 org.apache.kafka.clients.producer.KafkaProducer
,你的代码的其余部分实际上应该保持不变。
或者,您可以利用 Reactive Kafka 解决这个问题,同时继续使用高级 Akka HTTP DSL。您可以通过 运行 您的生产者记录到 Kafka Sink 来做到这一点,这样:
val producerSink = Producer.plainSink(producerSettings)
...
// inside the route
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1", "some message")
onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
为了回答您的具体问题,scala-java8-compat 库提供了 java8 和 Scala Futures 之间的转换器。
具体来说,您可以使用FutureConverters.toScala(producer.send(producerRecord))
将java.util.concurrent.Future
转换为scala.concurrent.Future
但是,使用本身对 Scala 友好 API 的客户端库(如上面 Stefano 所建议的)可能会得到最好的结果。
相关:scala.concurrent.Future wrapper for java.util.concurrent.Future
这来自我的另一个问题:
我有一个 AKKA HTTP 应用程序,我想在路由的 onComplete 函数中向 Kafka 发送一个 message/ProducerRecord,如下所示:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
但是,onComplete(producer send producerRecord) 正在生成以下类型不匹配错误:
[错误] 发现:Future[org.apache.kafka.clients.producer.RecordMetadata](在 java.util.concurrent 中) [错误]需要:未来[org.apache.kafka.clients.producer.RecordMetadata](在scala.concurrent中) [错误] onCompleteRecordMetadata { _ =>
有什么办法可以解决这个问题,也许可以使用生产者作为接收器 (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) 而不是 java producer.send功能?
您可以利用 Cake's Scala based Kafka client,它将完成 运行 Java 期货的工作,并为您提供 Scala 期货。一旦你确定你创建的是 cakesolutions.kafka.KafkaProducer
而不是 org.apache.kafka.clients.producer.KafkaProducer
,你的代码的其余部分实际上应该保持不变。
或者,您可以利用 Reactive Kafka 解决这个问题,同时继续使用高级 Akka HTTP DSL。您可以通过 运行 您的生产者记录到 Kafka Sink 来做到这一点,这样:
val producerSink = Producer.plainSink(producerSettings)
...
// inside the route
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1", "some message")
onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
为了回答您的具体问题,scala-java8-compat 库提供了 java8 和 Scala Futures 之间的转换器。
具体来说,您可以使用FutureConverters.toScala(producer.send(producerRecord))
将java.util.concurrent.Future
转换为scala.concurrent.Future
但是,使用本身对 Scala 友好 API 的客户端库(如上面 Stefano 所建议的)可能会得到最好的结果。