Akka 是否原生支持集成模式?
Does Akka natively support Integration Patterns?
我是 Akka 的新手,我想弄清楚它是否内置了对 Enterprise Integration Patterns (EIP) or whether I need to delegate this type of routing/integration out to a framework like Camel 的支持。
在我的用例中,我有一个从源(文件)读取二进制样本的参与者;这个演员叫做Sampler
。 Sampler
然后将 Sample
个实例(消息)传递给一个名为 SampleProcessors
的参与者字段。每个样本处理器对给定的 Sample
做一些不同的事情。根据处理器处理 Sample
的结果,它可能需要路由到 1+ 其他 SampleProcessor
,或者可能所有处理都已结束。根据确切的 SampleProcessor
和给定 Sample
的确切性质,可能需要将 Sample
多播到其他收件人 SampleProcessors
的列表。
这对我来说就像骆驼。
所以我问:
- Akka 是否内置了对路由、广播、多播和其他 EIP 的支持(如果有,它们是什么,记录在何处)?
- 或者,我应该尝试将 actor 系统与 Camel 集成,在这种情况下,它会是什么样子?我知道有一个 Camel-Akka 组件,但我相信这只是为了将 Camel 总线与演员系统集成(而我想要一个服务总线 inside 我的演员系统)
- 或者,我应该在这里做我自己的 EIP/actor 布线吗?
Akka 本身不支持 EIP,但有多种实现方式。
无论如何,如果你想要一些方便的 DSL,有一个比 EIP 更好的主意 - 正如 GoF-patterns 所做的那样,你可以用功能组合 + 仿函数替换(实现)大部分 EIP 模式(map
) 和单子 (flatMap
)。换句话说,您可以将输入流视为无限集合。所以,
- 处理器变成函数
- 管道成为仿函数,例如
val output1 = input.map(processor1).map(processor2)
路由器和过滤器成为...单子(filter
基于flatMap
):
val fork1 = output1.filter(routingCondition1).map(...)
val fork2 = output1.filter(routingCondition2).map(...)
拆分为flatMap
:input.flatMap(x => Stream(x.submsg1, x.submsg2))
- 聚合器成为变形,又名
fold
(累加器通常应由一些存储支持)
Akka 已经实现了这种基于流的工作流,它被称为 Akka Streams, which is an implementation of Reactive Streams, see also this
和 that 篇文章。
另一种选择是按原样使用 Akka,actor 保证顺序处理,因此您可以通过创建 actor 链来实现管道:
class Processor1(next: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x => next ! process(x)
}
}
val processor2 = system.actorOf(Props[Processor2])
val processor1 = system.actorOf(Props[Processor1], processor2)
如果您需要路由 - 只需两个 "next"
class Router(next1: ActorRef, next2: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x if cond1 => next1 ! process(x)
case x if cond2 => next2 ! process(x)
}
}
如果您需要保证路线之间没有竞争 - 请参阅 。当然,你放弃了整个 DSL 的想法,直接使用演员。
P.S。是的,您仍然可以使用 Camel 作为端点——Akka 有一些 support 用于此。你可以使用 Akka 作为服务激活器。
我是 Akka 的新手,我想弄清楚它是否内置了对 Enterprise Integration Patterns (EIP) or whether I need to delegate this type of routing/integration out to a framework like Camel 的支持。
在我的用例中,我有一个从源(文件)读取二进制样本的参与者;这个演员叫做Sampler
。 Sampler
然后将 Sample
个实例(消息)传递给一个名为 SampleProcessors
的参与者字段。每个样本处理器对给定的 Sample
做一些不同的事情。根据处理器处理 Sample
的结果,它可能需要路由到 1+ 其他 SampleProcessor
,或者可能所有处理都已结束。根据确切的 SampleProcessor
和给定 Sample
的确切性质,可能需要将 Sample
多播到其他收件人 SampleProcessors
的列表。
这对我来说就像骆驼。
所以我问:
- Akka 是否内置了对路由、广播、多播和其他 EIP 的支持(如果有,它们是什么,记录在何处)?
- 或者,我应该尝试将 actor 系统与 Camel 集成,在这种情况下,它会是什么样子?我知道有一个 Camel-Akka 组件,但我相信这只是为了将 Camel 总线与演员系统集成(而我想要一个服务总线 inside 我的演员系统)
- 或者,我应该在这里做我自己的 EIP/actor 布线吗?
Akka 本身不支持 EIP,但有多种实现方式。
无论如何,如果你想要一些方便的 DSL,有一个比 EIP 更好的主意 - 正如 GoF-patterns 所做的那样,你可以用功能组合 + 仿函数替换(实现)大部分 EIP 模式(map
) 和单子 (flatMap
)。换句话说,您可以将输入流视为无限集合。所以,
- 处理器变成函数
- 管道成为仿函数,例如
val output1 = input.map(processor1).map(processor2)
路由器和过滤器成为...单子(
filter
基于flatMap
):val fork1 = output1.filter(routingCondition1).map(...)
val fork2 = output1.filter(routingCondition2).map(...)
拆分为
flatMap
:input.flatMap(x => Stream(x.submsg1, x.submsg2))
- 聚合器成为变形,又名
fold
(累加器通常应由一些存储支持)
Akka 已经实现了这种基于流的工作流,它被称为 Akka Streams, which is an implementation of Reactive Streams, see also this 和 that 篇文章。
另一种选择是按原样使用 Akka,actor 保证顺序处理,因此您可以通过创建 actor 链来实现管道:
class Processor1(next: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x => next ! process(x)
}
}
val processor2 = system.actorOf(Props[Processor2])
val processor1 = system.actorOf(Props[Processor1], processor2)
如果您需要路由 - 只需两个 "next"
class Router(next1: ActorRef, next2: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x if cond1 => next1 ! process(x)
case x if cond2 => next2 ! process(x)
}
}
如果您需要保证路线之间没有竞争 - 请参阅
P.S。是的,您仍然可以使用 Camel 作为端点——Akka 有一些 support 用于此。你可以使用 Akka 作为服务激活器。