在 Scala 中实现不使用 Akka 的 Actor 模型
Implement Actor model without Akka in Scala
我正在做我的小研究,在没有 Akka 的情况下实现 Actor
我在 Scala 中找到了 Actor 的一种实现。 (How to implement actor model without Akka?)
很简单。因为我没有足够的声誉来添加评论,所以我创建了这个问题。
我想知道我是否像下面这样使用 Actor。
1/ 如何从主线程中关闭该 actor?
2/ 如何添加类似Akka的功能,如parent actor、kill request、become method?
import scala.concurrent._
trait Actor[T] {
implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
def receive: T => Unit
def !(m: T) = Future { receive(m) }
}
这是我自己尝试改编上述代码片段时的示例
import scala.concurrent._
/**
* Created by hminle on 10/21/2016.
*/
trait Message
case class HelloMessage(hello: String) extends Message
case class GoodByeMessage(goodBye: String) extends Message
object State extends Enumeration {
type State = Value
val Waiting, Running, Terminating = Value
}
trait Actor[T] {
implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
private var state: State.State = State.Waiting
def handleMessage: T => Unit ={
if(state == State.Waiting) handleMessageWhenWaiting
else if(state == State.Running) handleMessageWhenRunning
else handleMessageWhenTerminating
}
def !(m: T) = Future {handleMessage(m)}
def handleMessageWhenWaiting: T => Unit
def handleMessageWhenRunning: T => Unit
def handleMessageWhenTerminating: T => Unit
def transitionTo(destinationState: State.State): Unit = {
this.state = destinationState
}
}
class Component1 extends Actor[Message]{
def handleMessageWhenRunning = {
case HelloMessage(hello) => {
println(Thread.currentThread().getName + hello)
}
case GoodByeMessage(goodBye) => {
println(Thread.currentThread().getName + goodBye)
transitionTo(State.Terminating)
}
}
def handleMessageWhenWaiting = {
case m => {
println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
transitionTo(State.Running)
}
}
def handleMessageWhenTerminating = {
case m => {
println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
//need to shutdown here
}
}
}
class Component2(component1: Actor[Message]) extends Actor[Message]{
def handleMessageWhenRunning = {
case HelloMessage(hello) => {
println(Thread.currentThread().getName + hello)
component1 ! HelloMessage("hello 1")
}
case GoodByeMessage(goodBye) => {
println(Thread.currentThread().getName + goodBye)
component1 ! GoodByeMessage("goodbye 1")
transitionTo(State.Terminating)
}
}
def handleMessageWhenWaiting = {
case m => {
println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
transitionTo(State.Running)
}
}
def handleMessageWhenTerminating = {
case m => {
println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
//need to shutdown here
}
}
}
object ActorExample extends App {
val a = new Component1
val b = new Component2(a)
b ! HelloMessage("hello World 2")
b ! HelloMessage("hello World 2, 2nd")
b ! GoodByeMessage("Good bye 2")
println(Thread.currentThread().getName)
}
您可以查看 scalaz
中的 Actor model
实现并从中汲取灵感,scalaz actor 中的源代码比 akka
中的源代码更容易理解。您可以自由选择架构:您可以像在 Akka 中一样使用基于 ConcurrentLinkedQueue 的邮箱,像在 scalaz 中一样对 AtomicReffernce 使用 CAS,在您的情况下您使用 Future 机制。 IMO,你必须写一个你的演员系统的上下文,所以解决你问题中的第一和第二项它是 ActorContext 的变体:
val contextStack = new ThreadLocal[List[ActorContext]]
关机看起来像这样:
1.
case Kill ⇒ throw new ActorKilledException("Kill")
case PoisonPill ⇒ self.stop()
2。为了存储父 actor 和类似任务,您必须在它们上存储引用:
def parent: ActorRef
很难说每种技术(CAS、邮箱)的优势,它可能是您研究的变体。
我正在做我的小研究,在没有 Akka 的情况下实现 Actor 我在 Scala 中找到了 Actor 的一种实现。 (How to implement actor model without Akka?)
很简单。因为我没有足够的声誉来添加评论,所以我创建了这个问题。 我想知道我是否像下面这样使用 Actor。
1/ 如何从主线程中关闭该 actor?
2/ 如何添加类似Akka的功能,如parent actor、kill request、become method?
import scala.concurrent._
trait Actor[T] {
implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
def receive: T => Unit
def !(m: T) = Future { receive(m) }
}
这是我自己尝试改编上述代码片段时的示例
import scala.concurrent._
/**
* Created by hminle on 10/21/2016.
*/
trait Message
case class HelloMessage(hello: String) extends Message
case class GoodByeMessage(goodBye: String) extends Message
object State extends Enumeration {
type State = Value
val Waiting, Running, Terminating = Value
}
trait Actor[T] {
implicit val context = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))
private var state: State.State = State.Waiting
def handleMessage: T => Unit ={
if(state == State.Waiting) handleMessageWhenWaiting
else if(state == State.Running) handleMessageWhenRunning
else handleMessageWhenTerminating
}
def !(m: T) = Future {handleMessage(m)}
def handleMessageWhenWaiting: T => Unit
def handleMessageWhenRunning: T => Unit
def handleMessageWhenTerminating: T => Unit
def transitionTo(destinationState: State.State): Unit = {
this.state = destinationState
}
}
class Component1 extends Actor[Message]{
def handleMessageWhenRunning = {
case HelloMessage(hello) => {
println(Thread.currentThread().getName + hello)
}
case GoodByeMessage(goodBye) => {
println(Thread.currentThread().getName + goodBye)
transitionTo(State.Terminating)
}
}
def handleMessageWhenWaiting = {
case m => {
println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
transitionTo(State.Running)
}
}
def handleMessageWhenTerminating = {
case m => {
println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
//need to shutdown here
}
}
}
class Component2(component1: Actor[Message]) extends Actor[Message]{
def handleMessageWhenRunning = {
case HelloMessage(hello) => {
println(Thread.currentThread().getName + hello)
component1 ! HelloMessage("hello 1")
}
case GoodByeMessage(goodBye) => {
println(Thread.currentThread().getName + goodBye)
component1 ! GoodByeMessage("goodbye 1")
transitionTo(State.Terminating)
}
}
def handleMessageWhenWaiting = {
case m => {
println(Thread.currentThread().getName + " I am waiting, I am not ready to run")
transitionTo(State.Running)
}
}
def handleMessageWhenTerminating = {
case m => {
println(Thread.currentThread().getName + " I am terminating, I cannot handle any message")
//need to shutdown here
}
}
}
object ActorExample extends App {
val a = new Component1
val b = new Component2(a)
b ! HelloMessage("hello World 2")
b ! HelloMessage("hello World 2, 2nd")
b ! GoodByeMessage("Good bye 2")
println(Thread.currentThread().getName)
}
您可以查看 scalaz
中的 Actor model
实现并从中汲取灵感,scalaz actor 中的源代码比 akka
中的源代码更容易理解。您可以自由选择架构:您可以像在 Akka 中一样使用基于 ConcurrentLinkedQueue 的邮箱,像在 scalaz 中一样对 AtomicReffernce 使用 CAS,在您的情况下您使用 Future 机制。 IMO,你必须写一个你的演员系统的上下文,所以解决你问题中的第一和第二项它是 ActorContext 的变体:
val contextStack = new ThreadLocal[List[ActorContext]]
关机看起来像这样:
1.
case Kill ⇒ throw new ActorKilledException("Kill")
case PoisonPill ⇒ self.stop()
2。为了存储父 actor 和类似任务,您必须在它们上存储引用:
def parent: ActorRef
很难说每种技术(CAS、邮箱)的优势,它可能是您研究的变体。