Apache Kafka:KafkaProducerActor 抛出异常 ASk 超时。
Apache Kafka: KafkaProducerActor throws exception ASk timeout.
我正在使用蛋糕溶液Akka client for scala and Kafka。当我创建一个 KafkaProducerActor
演员并尝试使用 ask
模式和 return 未来发送消息并执行一些操作时,但每次,我都面临 ask
超时异常。下面是我的代码:
class SimpleAkkaProducer (config: Config, system: ActorSystem) {
private val producerConf = KafkaProducer.
Conf(config,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
val actorRef = system.actorOf(KafkaProducerActor.props(producerConf))
def sendMessageWayOne(record: ProducerRecords[String, String]) = {
actorRef ! record
}
def sendMessageWayTwo(record: ProducerRecords[String, String]) = {
implicit val timeout = Timeout(100.seconds)
val future = (actorRef ? record).mapTo[String]
future onComplete {
case Success(data) => println(s" >>>>>>>>>>>> ${data}")
case Failure(ex) => ex.printStackTrace()
}
}
}
object SimpleAkkaProducer {
def main(args: Array[String]): Unit = {
val system = ActorSystem("KafkaProducerActor")
val config = ConfigFactory.defaultApplication()
val simpleAkkaProducer = new SimpleAkkaProducer(config, system)
val topic = config.getString("akka.topic")
val messageOne = ProducerRecords.fromKeyValues[String, String](topic,
Seq((Some("Topics"), "First Message")), None, None)
simpleAkkaProducer.sendMessageWayOne(messageOne)
simpleAkkaProducer.sendMessageWayTwo(messageOne)
}
}
以下是例外情况:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords".
at akka.pattern.PromiseActorRef$.$anonfun$apply(AskSupport.scala:604)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon.executeBucket(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
如果您将 ProducerRecords
中的 successResponse
和 failureResponse
值指定为 None
以外的值,则生产者 actor 仅响应发送者。 Kafka写入成功时返回successResponse
值给发送者,Kafka写入失败时返回failureResponse
值
示例:
val record = ProducerRecords.fromKeyValues[String, String](
topic = topic,
keyValues = Seq((Some("Topics"), "First Message")),
successResponse = Some("success"),
failureResponse = Some("failure")
)
val future = (actorRef ? record).mapTo[String]
future onComplete {
case Success("success") => println("Send succeeded!")
case Success("failure") => println("Send failed!")
case Success(data) => println(s"Send result: $data")
case Failure(ex) => ex.printStackTrace()
}
我正在使用蛋糕溶液Akka client for scala and Kafka。当我创建一个 KafkaProducerActor
演员并尝试使用 ask
模式和 return 未来发送消息并执行一些操作时,但每次,我都面临 ask
超时异常。下面是我的代码:
class SimpleAkkaProducer (config: Config, system: ActorSystem) {
private val producerConf = KafkaProducer.
Conf(config,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
val actorRef = system.actorOf(KafkaProducerActor.props(producerConf))
def sendMessageWayOne(record: ProducerRecords[String, String]) = {
actorRef ! record
}
def sendMessageWayTwo(record: ProducerRecords[String, String]) = {
implicit val timeout = Timeout(100.seconds)
val future = (actorRef ? record).mapTo[String]
future onComplete {
case Success(data) => println(s" >>>>>>>>>>>> ${data}")
case Failure(ex) => ex.printStackTrace()
}
}
}
object SimpleAkkaProducer {
def main(args: Array[String]): Unit = {
val system = ActorSystem("KafkaProducerActor")
val config = ConfigFactory.defaultApplication()
val simpleAkkaProducer = new SimpleAkkaProducer(config, system)
val topic = config.getString("akka.topic")
val messageOne = ProducerRecords.fromKeyValues[String, String](topic,
Seq((Some("Topics"), "First Message")), None, None)
simpleAkkaProducer.sendMessageWayOne(messageOne)
simpleAkkaProducer.sendMessageWayTwo(messageOne)
}
}
以下是例外情况:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords".
at akka.pattern.PromiseActorRef$.$anonfun$apply(AskSupport.scala:604)
at akka.actor.Scheduler$$anon.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon.executeBucket(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
如果您将 ProducerRecords
中的 successResponse
和 failureResponse
值指定为 None
以外的值,则生产者 actor 仅响应发送者。 Kafka写入成功时返回successResponse
值给发送者,Kafka写入失败时返回failureResponse
值
示例:
val record = ProducerRecords.fromKeyValues[String, String](
topic = topic,
keyValues = Seq((Some("Topics"), "First Message")),
successResponse = Some("success"),
failureResponse = Some("failure")
)
val future = (actorRef ? record).mapTo[String]
future onComplete {
case Success("success") => println("Send succeeded!")
case Success("failure") => println("Send failed!")
case Success(data) => println(s"Send result: $data")
case Failure(ex) => ex.printStackTrace()
}