Akka Camel:消息路由到不正确的 SEDA 路由
Akka Camel: Messages routed to incorrect SEDA route
我的简化 Akka Camel
应用程序设置如下:
AppleProducer -> seda:appleRoute -> AppleConsumer
OrangeProducer -> seda:orangeRoute -> OrangeConsumer
不过我看到的是 Apple
事件间歇性地被 OrangeConsumer
消耗,反之亦然。
运行 下面的这个例子(可能有几次)重新创建了它。
我不明白这怎么会间歇性发生。我做错了什么?
object TestApp extends App {
implicit val system = ActorSystem()
val camel = CamelExtension(system)
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! new Apple("1")
orangeProducer ! new Orange("1")
appleProducer ! new Apple("2")
orangeProducer ! new Orange("2")
appleProducer ! new Apple("3")
orangeProducer ! new Orange("3")
appleProducer ! new Apple("4")
orangeProducer ! new Orange("4")
appleProducer ! new Apple("5")
orangeProducer ! new Orange("5")
appleProducer ! new Apple("6")
orangeProducer ! new Orange("6")
}
class MyProducer(route: String) extends Actor with ActorLogging {
def receive = {
case payload: Any =>
val template = CamelExtension(context.system).template
template.setDefaultEndpointUri(route)
template.sendBody(payload)
}
}
class MyAppleConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Apple] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class MyOrangeConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Orange] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class Apple(id: String)
class Orange(id: String)
我想我最终设法解决了这个问题。
该问题与SEDA无关。相反,似乎多个 MyProducer
实例返回相同的 DefaultProducerTemplate
。
因此,设置 defaultEndpointUri
时偶尔会出现竞争条件
对我来说,解决方案是只创建一个 MyProducer
actor 实例,以确保我们不会遇到这种竞争条件
我建议扩展特征 Producer
而不是为 MyProducer
使用模板,就像为 MyAppleConsumer
和 [=16 使用 Consumer
一样=].
class MyProducer(route: String) extends Producer with OneWay {
def endpointUri = route
}
可在此处找到更多信息:http://doc.akka.io/docs/akka/snapshot/scala/camel.html
我相信你应该能够像这样简化你的代码(免责声明:未编译或测试!):
case class Apple(id: String)
case class Orange(id: String)
object TestApp extends App {
implicit val system = ActorSystem()
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! Apple("1")
orangeProducer ! Orange("1")
appleProducer ! Apple("2")
orangeProducer ! Orange("2")
appleProducer ! Apple("3")
orangeProducer ! Orange("3")
appleProducer ! Apple("4")
orangeProducer ! Orange("4")
appleProducer ! Apple("5")
orangeProducer ! Orange("5")
appleProducer ! Apple("6")
orangeProducer ! Orange("6")
}
class MyProducer(route: String) extends Producer with OneWay with ActorLogging {
def endpointUri = route
}
class MyConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case CamelMessage(body : Apple, headers) =>
log.info("Received event {}", body)
case CamelMessage(body : Orange, headers) =>
log.info("Received event {}", body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
我的简化 Akka Camel
应用程序设置如下:
AppleProducer -> seda:appleRoute -> AppleConsumer
OrangeProducer -> seda:orangeRoute -> OrangeConsumer
不过我看到的是 Apple
事件间歇性地被 OrangeConsumer
消耗,反之亦然。
运行 下面的这个例子(可能有几次)重新创建了它。
我不明白这怎么会间歇性发生。我做错了什么?
object TestApp extends App {
implicit val system = ActorSystem()
val camel = CamelExtension(system)
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! new Apple("1")
orangeProducer ! new Orange("1")
appleProducer ! new Apple("2")
orangeProducer ! new Orange("2")
appleProducer ! new Apple("3")
orangeProducer ! new Orange("3")
appleProducer ! new Apple("4")
orangeProducer ! new Orange("4")
appleProducer ! new Apple("5")
orangeProducer ! new Orange("5")
appleProducer ! new Apple("6")
orangeProducer ! new Orange("6")
}
class MyProducer(route: String) extends Actor with ActorLogging {
def receive = {
case payload: Any =>
val template = CamelExtension(context.system).template
template.setDefaultEndpointUri(route)
template.sendBody(payload)
}
}
class MyAppleConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Apple] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class MyOrangeConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case event: CamelMessage if event.body.isInstanceOf[Orange] =>
log.info("Received event {}", event.body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}
class Apple(id: String)
class Orange(id: String)
我想我最终设法解决了这个问题。
该问题与SEDA无关。相反,似乎多个 MyProducer
实例返回相同的 DefaultProducerTemplate
。
因此,设置 defaultEndpointUri
对我来说,解决方案是只创建一个 MyProducer
actor 实例,以确保我们不会遇到这种竞争条件
我建议扩展特征 Producer
而不是为 MyProducer
使用模板,就像为 MyAppleConsumer
和 [=16 使用 Consumer
一样=].
class MyProducer(route: String) extends Producer with OneWay {
def endpointUri = route
}
可在此处找到更多信息:http://doc.akka.io/docs/akka/snapshot/scala/camel.html
我相信你应该能够像这样简化你的代码(免责声明:未编译或测试!):
case class Apple(id: String)
case class Orange(id: String)
object TestApp extends App {
implicit val system = ActorSystem()
val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer")
val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer")
appleProducer ! Apple("1")
orangeProducer ! Orange("1")
appleProducer ! Apple("2")
orangeProducer ! Orange("2")
appleProducer ! Apple("3")
orangeProducer ! Orange("3")
appleProducer ! Apple("4")
orangeProducer ! Orange("4")
appleProducer ! Apple("5")
orangeProducer ! Orange("5")
appleProducer ! Apple("6")
orangeProducer ! Orange("6")
}
class MyProducer(route: String) extends Producer with OneWay with ActorLogging {
def endpointUri = route
}
class MyConsumer(route: String) extends Consumer with ActorLogging {
override def endpointUri: String = route
override def receive: Receive = {
case CamelMessage(body : Apple, headers) =>
log.info("Received event {}", body)
case CamelMessage(body : Orange, headers) =>
log.info("Received event {}", body)
case _ => throw new IllegalArgumentException("Invalid entity")
}
}