Akka超时异常但消息实际发送
Akka Timeout exception but messages actually sent
我正在使用具有以下技术的 Scala 2.13 堆栈:
- 玩!框架 2.8
- akka 键入 2.6.3
- 阿尔帕卡卡夫卡 2.0.3
Akka-stream 作业从 Kafka 读取事件,要求 actor 计算一些东西,并根据给定的响应生成新事件返回给 Kafka。
问题是使用询问模式发送的消息似乎被 QuestionActor
(下图)仅当其邮箱至少收集了两条消息并且每条消息仅收集了一条消息时才被消耗收到.
奇怪的行为是:
t1
ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)
t2
ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)
t3
ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)
我正在尝试理解 为什么我观察到这种行为以及我做错了什么。
akka-stream Kafka 管道是:
Consumer
.plainSource(consumerSettings, subscription)
.map(DeserializeEvents.fromService)
.filter(_.eventType == classOf[Item].getName)
.via(askFlowExplicit)
.withAttributes(ActorAttributes.supervisionStrategy(decider()))
.map(
response =>
new ProducerRecord[String, OutputItem](
topics,
OutputItem(response.getClass.getName, response)
)
)
.log("Kafka Pipeline")
.runWith(Producer.plainSink(producerSettings))
决策者是一种监督策略,在 Serialisation
和 Timeout
异常时恢复工作; askFlowExplicit
向外部参与者声明一个询问请求,并且 - 特此 - 我碰到了我的问题。
val askFlowExplicit =
ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
case (envelope, replyTo) =>
val item = Serdes.deserialize[Item](envelope.payload)
Question(item.trID, item.id, item.user, replyTo)
}
管道在 Play 上启动!申请 bootstrap
@Singleton
class ApplicationStart @Inject()(
configuration: Configuration,
questionActor: ActorRef[QuestionActor.Question]
) {
private implicit val logger = Logger.apply(getClass)
implicit val mat = context
AlpakkaPipeline.run(configuration, questionActor)
}
actor 是属于同一个 actor 系统的简单类型的 actor,并且 - 现在 - 它只是将来自流的请求转发到另一个服务。
class QuestionActor(
configuration: Configuration,
context: ActorContext[Question],
itemService: ItemService
) extends AbstractBehavior[Question](context) {
import QuestionActor._
implicit val ec: ExecutionContextExecutor = context.executionContext
private implicit val timeout: Timeout = ...
override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
case (context, Question(trID, id, user, sender)) =>
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
object QuestionActor {
final case class Question(
trID: String,
id: Int,
user: Option[UUID],
replyTo: ActorRef[Answer]
)
def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
Behaviors.setup { context =>
context.setLoggerName(classOf[QuestionActor])
implicit val log: Logger = context.log
new QuestionActor(configuration, context)
}
}
它是使用 运行time DI 和 Play 构建的!
class BootstrapModule(environment: Environment, configuration: Configuration)
extends AbstractModule
with AkkaGuiceSupport {
override def configure(): Unit = {
bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
.toProvider(classOf[QuestionActorProvider])
.asEagerSingleton()
bind(classOf[ApplicationStart]).asEagerSingleton()
}
}
private class Question @Inject()(
actorSystem: ActorSystem,
itemService: ItemService,
configuration: Configuration
) extends Provider[ActorRef[Question]] {
def get(): ActorRef[Question] = {
val behavior = QuestionActor(itemService, configuration)
actorSystem.spawn(behavior, "question-actor")
}
}
我试过的
- 将调度程序更改为
QuestionActor
- 将邮箱更改为
QuestionActor
- 运行 来自
QuestionActor
的管道
- 从 actor 构造函数发送相同的消息(给自己),观察到相同的行为:另一个消息将触发 actor 使用前者,请求后者超时。
我没有的
- 将调度程序更改为 Akka 流管道
在我看来,它现在是一个线程问题,但我不知道从这里该何去何从。
非常感谢任何帮助。提前谢谢你。
问题是您正在组合提供 onMessage
的 AbstractBehavior
并在其中定义新的 Behaviors.receive[Question]
行为。您必须使用其中之一。
删除 Behaviors.receive
如下
override def onMessage(msg: Question): Behavior[Question] = {
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
AbstractBehavior.onMessage
是行为的实现。因此,您通过方法参数收到一条消息,您应该处理它并 return 在您的情况下返回一个新的 Behaviour
、Behaviours.same
。
但是您没有处理消息,而是使用 Behaviors.receive
创建了一个新的 Behaviour
并将 Future 的回调注册到原始的第一条消息。因此,您会在第二条消息到达时看到日志语句,这会触发新行为。
如果你想使用 FP 样式定义,你必须只坚持 Behaviors.xxx
辅助方法。如果您选择 OOP 样式,那么您将扩展 AbstractBehavior
。但你不应该两者都做。
我正在使用具有以下技术的 Scala 2.13 堆栈:
- 玩!框架 2.8
- akka 键入 2.6.3
- 阿尔帕卡卡夫卡 2.0.3
Akka-stream 作业从 Kafka 读取事件,要求 actor 计算一些东西,并根据给定的响应生成新事件返回给 Kafka。
问题是使用询问模式发送的消息似乎被 QuestionActor
(下图)仅当其邮箱至少收集了两条消息并且每条消息仅收集了一条消息时才被消耗收到.
奇怪的行为是:
t1
ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)
t2
ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)
t3
ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)
我正在尝试理解 为什么我观察到这种行为以及我做错了什么。
akka-stream Kafka 管道是:
Consumer
.plainSource(consumerSettings, subscription)
.map(DeserializeEvents.fromService)
.filter(_.eventType == classOf[Item].getName)
.via(askFlowExplicit)
.withAttributes(ActorAttributes.supervisionStrategy(decider()))
.map(
response =>
new ProducerRecord[String, OutputItem](
topics,
OutputItem(response.getClass.getName, response)
)
)
.log("Kafka Pipeline")
.runWith(Producer.plainSink(producerSettings))
决策者是一种监督策略,在 Serialisation
和 Timeout
异常时恢复工作; askFlowExplicit
向外部参与者声明一个询问请求,并且 - 特此 - 我碰到了我的问题。
val askFlowExplicit =
ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
case (envelope, replyTo) =>
val item = Serdes.deserialize[Item](envelope.payload)
Question(item.trID, item.id, item.user, replyTo)
}
管道在 Play 上启动!申请 bootstrap
@Singleton
class ApplicationStart @Inject()(
configuration: Configuration,
questionActor: ActorRef[QuestionActor.Question]
) {
private implicit val logger = Logger.apply(getClass)
implicit val mat = context
AlpakkaPipeline.run(configuration, questionActor)
}
actor 是属于同一个 actor 系统的简单类型的 actor,并且 - 现在 - 它只是将来自流的请求转发到另一个服务。
class QuestionActor(
configuration: Configuration,
context: ActorContext[Question],
itemService: ItemService
) extends AbstractBehavior[Question](context) {
import QuestionActor._
implicit val ec: ExecutionContextExecutor = context.executionContext
private implicit val timeout: Timeout = ...
override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
case (context, Question(trID, id, user, sender)) =>
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
object QuestionActor {
final case class Question(
trID: String,
id: Int,
user: Option[UUID],
replyTo: ActorRef[Answer]
)
def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
Behaviors.setup { context =>
context.setLoggerName(classOf[QuestionActor])
implicit val log: Logger = context.log
new QuestionActor(configuration, context)
}
}
它是使用 运行time DI 和 Play 构建的!
class BootstrapModule(environment: Environment, configuration: Configuration)
extends AbstractModule
with AkkaGuiceSupport {
override def configure(): Unit = {
bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
.toProvider(classOf[QuestionActorProvider])
.asEagerSingleton()
bind(classOf[ApplicationStart]).asEagerSingleton()
}
}
private class Question @Inject()(
actorSystem: ActorSystem,
itemService: ItemService,
configuration: Configuration
) extends Provider[ActorRef[Question]] {
def get(): ActorRef[Question] = {
val behavior = QuestionActor(itemService, configuration)
actorSystem.spawn(behavior, "question-actor")
}
}
我试过的
- 将调度程序更改为
QuestionActor
- 将邮箱更改为
QuestionActor
- 运行 来自
QuestionActor
的管道
- 从 actor 构造函数发送相同的消息(给自己),观察到相同的行为:另一个消息将触发 actor 使用前者,请求后者超时。
我没有的
- 将调度程序更改为 Akka 流管道
在我看来,它现在是一个线程问题,但我不知道从这里该何去何从。 非常感谢任何帮助。提前谢谢你。
问题是您正在组合提供 onMessage
的 AbstractBehavior
并在其中定义新的 Behaviors.receive[Question]
行为。您必须使用其中之一。
删除 Behaviors.receive
如下
override def onMessage(msg: Question): Behavior[Question] = {
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
AbstractBehavior.onMessage
是行为的实现。因此,您通过方法参数收到一条消息,您应该处理它并 return 在您的情况下返回一个新的 Behaviour
、Behaviours.same
。
但是您没有处理消息,而是使用 Behaviors.receive
创建了一个新的 Behaviour
并将 Future 的回调注册到原始的第一条消息。因此,您会在第二条消息到达时看到日志语句,这会触发新行为。
如果你想使用 FP 样式定义,你必须只坚持 Behaviors.xxx
辅助方法。如果您选择 OOP 样式,那么您将扩展 AbstractBehavior
。但你不应该两者都做。