使用 akka 流时的事件顺序
order of events when using akka streams
阅读 akka-streams 的文档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置问题的上下文。
def flowShape(user: User) = GraphDSL
.create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
implicit builder =>
implicit chatSource =>
import GraphDSL.Implicits._
val messageFromOutside = builder.add(Flow[String].map {
case msg: String => UserTextMessage(user, msg)
case _ => InvalidMessage
})
val merge = builder.add(Merge[ChatMessage](2))
// UPDATE --> this is where the change comes
// val merge = builder.add(Concat[ChatMessage](2))
// val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user))
val channelActorSink = Sink.actorRef(channelActor, UserLeft(user))
val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) }
actorAsSource ~> merge.in(0)
messageFromOutside.out ~> merge.in(1)
merge ~> channelActorSink
FlowShape(messageFromOutside.in, chatSource.out)
}
为了让事情简单化,我使用了带有非常简单的源和汇的流形状。像这样 --
val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice"))
val sink = Sink.foreach[ChatMessage] {
case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}")
case ul: UserLeft => println(s"${ul.user.username} just left the channel")
case uj: UserJoined => println(s"${uj.user.username} just joined the channel")
case _ => println(s"do not know what I just received")
}
val mychatchannel = new Channel(420, myactorsystem)
source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink)
现在,我的担忧来了。终端中打印的事件顺序根本不正确。而且我不确定如何解决它。这是我得到的输出 --
[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
输出中缺少第一条消息 hi
。 hi
消息似乎在打印 UserJoin message
之前发送。
我尝试使用 actorRefWithAck
(我在上面的代码中注释掉了它)来修复它(并在消息传递方面增加了一些安全性)。它给出了类似的输出。
[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message
很明显,似乎正在发生的事情是源在发送 UserJoin
消息之前发送消息。我怎样才能解决这个问题?从概念上讲,我想我希望 UserJoin message
在源具体化但实际发送第一条消息之前立即发送。这可能吗?
谢谢
把溪流想象成水管:有水就会流动。合并运算符不关心来自哪个边元素。如果你想对这些输入进行排序,你需要使用 Concat 告诉 Akka。
阅读 akka-streams 的文档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置问题的上下文。
def flowShape(user: User) = GraphDSL
.create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
implicit builder =>
implicit chatSource =>
import GraphDSL.Implicits._
val messageFromOutside = builder.add(Flow[String].map {
case msg: String => UserTextMessage(user, msg)
case _ => InvalidMessage
})
val merge = builder.add(Merge[ChatMessage](2))
// UPDATE --> this is where the change comes
// val merge = builder.add(Concat[ChatMessage](2))
// val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user))
val channelActorSink = Sink.actorRef(channelActor, UserLeft(user))
val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) }
actorAsSource ~> merge.in(0)
messageFromOutside.out ~> merge.in(1)
merge ~> channelActorSink
FlowShape(messageFromOutside.in, chatSource.out)
}
为了让事情简单化,我使用了带有非常简单的源和汇的流形状。像这样 --
val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice"))
val sink = Sink.foreach[ChatMessage] {
case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}")
case ul: UserLeft => println(s"${ul.user.username} just left the channel")
case uj: UserJoined => println(s"${uj.user.username} just joined the channel")
case _ => println(s"do not know what I just received")
}
val mychatchannel = new Channel(420, myactorsystem)
source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink)
现在,我的担忧来了。终端中打印的事件顺序根本不正确。而且我不确定如何解决它。这是我得到的输出 --
[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
输出中缺少第一条消息 hi
。 hi
消息似乎在打印 UserJoin message
之前发送。
我尝试使用 actorRefWithAck
(我在上面的代码中注释掉了它)来修复它(并在消息传递方面增加了一些安全性)。它给出了类似的输出。
[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message
很明显,似乎正在发生的事情是源在发送 UserJoin
消息之前发送消息。我怎样才能解决这个问题?从概念上讲,我想我希望 UserJoin message
在源具体化但实际发送第一条消息之前立即发送。这可能吗?
谢谢
把溪流想象成水管:有水就会流动。合并运算符不关心来自哪个边元素。如果你想对这些输入进行排序,你需要使用 Concat 告诉 Akka。