mapMaterializedValue 对 Source.actorRef 什么都不做
mapMaterializedValue doing nothing with Source.actorRef
我正在尝试使用 Source.actorRef 向 actor bindet 发送消息,但这部分代码:
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
只打印这样的东西:
Before mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
After mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
所以。不知何故 mapMaterializedValue 没有做任何事情。肯定没有消息发送给演员。 ref - None 是出于某种原因吗?
另外,我post所有的代码。它是 websockets 上的简单信使(一对一消息)之类的情节。我现在只是研究 Akka 流,所以这段代码真的不完美。我准备好听取任何批评或建议。
主要服务器对象:
package treplol.server
import treplol.common._
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import scala.io.StdIn
import java.util.UUID
object WsServer extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def createSource(uuid: UUID): Source[String, ActorRef] = {
val src = Source.actorRef[String](0, OverflowStrategy.fail)
sources(uuid) = src
src
}
val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] =
collection.mutable.HashMap[UUID, Source[String, ActorRef]]()
val userSources: collection.mutable.HashMap[String, UUID] =
collection.mutable.HashMap[String, UUID]()
def flow: Flow[Message, Message, Any] = {
val uuid: UUID = UUID.randomUUID()
val incomingSource: Source[String, ActorRef] = createSource(uuid)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
val mapMsgToLttr = b.add(
Flow[Message].collect { case TextMessage.Strict(txt) => txt }
.map[Letter] { txt =>
WsSerializer.decode(txt) match {
case Auth(from) =>
userSources(from) = uuid
Letter("0", from, "Authorized!")
case ltr: Letter => ltr
}
}
)
val processLttr = b.add(
Flow[Letter].map[String] { letter =>
userSources.get(letter.to) flatMap sources.get match {
case Some(src) =>
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
""
case None => "Not authorized!"
}
}
)
val mapStrToMsg = b.add(
Flow[String].map[TextMessage] (str => TextMessage.Strict(str))
)
mapMsgToLttr ~> processLttr ~> merge
incomingSource ~> merge ~> mapStrToMsg
FlowShape(mapMsgToLttr.in, mapStrToMsg.out)
})
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
常用套餐:
package treplol
package object common {
trait WsMessage
case class Letter(from: String, to: String, text: String) extends WsMessage
case class Auth(from: String) extends WsMessage
object WsSerializer {
import org.json4s.{Extraction, _}
import org.json4s.jackson.JsonMethods.{compact, parse}
import org.json4s.jackson.Serialization
implicit val formats = {
Serialization.formats(NoTypeHints)
}
case class WsData(typeOf: String, data: String)
object WsDataType {
val LETTER = "letter"
val AUTH = "auth"
}
class WrongIncomingData extends Throwable
def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match {
case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter]
case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth]
case _ => throw new WrongIncomingData
}
def encode(wsMessage: WsMessage): String = {
val typeOf = wsMessage match {
case _: Letter => WsDataType.LETTER
case _: Auth => WsDataType.AUTH
case _ => throw new WrongIncomingData
}
compact(Extraction.decompose(
WsData(typeOf, compact(Extraction.decompose(wsMessage)))
))
}
}
}
build.sbt
name := "treplol"
version := "0.0"
scalaVersion := "2.12.1"
resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.16",
"com.typesafe.akka" %% "akka-stream" % "2.4.16",
"com.typesafe.akka" %% "akka-http" % "10.0.3",
"org.json4s" %% "json4s-jackson" % "3.5.0"
)
提前谢谢大家!
根据文档,mapMaterializedValue
组合器
Transform only the materialized value of this Source, leaving all
other properties as they were.
物化值仅在任何图形阶段(在本例中为源)为 运行 后可用。您永远不会 运行在您的代码中使用您的源代码。
请注意,您用来处理 WebSocket 消息的 Flow[Message, Message, Any]
实际上是由 Akka-HTTP 基础设施 运行 提供的,因此您无需手动执行。但是,您在 processLttr
正文中创建的 Source
未附加到图表的其余部分,因此不是 运行.
请参阅 docs 了解有关 运行 宁图和实体化的更多信息。
感谢 Stefano!
但似乎无法通过这种方式实现我想要的。但我更深入地挖掘并使用了custom stream processing and integration with actors。使用这种技术,我可以将消息从外部推送到某个流。 (此功能仍处于实验阶段!)
我正在尝试使用 Source.actorRef 向 actor bindet 发送消息,但这部分代码:
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
只打印这样的东西:
Before mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
After mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
所以。不知何故 mapMaterializedValue 没有做任何事情。肯定没有消息发送给演员。 ref - None 是出于某种原因吗?
另外,我post所有的代码。它是 websockets 上的简单信使(一对一消息)之类的情节。我现在只是研究 Akka 流,所以这段代码真的不完美。我准备好听取任何批评或建议。
主要服务器对象:
package treplol.server
import treplol.common._
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import scala.io.StdIn
import java.util.UUID
object WsServer extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def createSource(uuid: UUID): Source[String, ActorRef] = {
val src = Source.actorRef[String](0, OverflowStrategy.fail)
sources(uuid) = src
src
}
val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] =
collection.mutable.HashMap[UUID, Source[String, ActorRef]]()
val userSources: collection.mutable.HashMap[String, UUID] =
collection.mutable.HashMap[String, UUID]()
def flow: Flow[Message, Message, Any] = {
val uuid: UUID = UUID.randomUUID()
val incomingSource: Source[String, ActorRef] = createSource(uuid)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
val mapMsgToLttr = b.add(
Flow[Message].collect { case TextMessage.Strict(txt) => txt }
.map[Letter] { txt =>
WsSerializer.decode(txt) match {
case Auth(from) =>
userSources(from) = uuid
Letter("0", from, "Authorized!")
case ltr: Letter => ltr
}
}
)
val processLttr = b.add(
Flow[Letter].map[String] { letter =>
userSources.get(letter.to) flatMap sources.get match {
case Some(src) =>
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
""
case None => "Not authorized!"
}
}
)
val mapStrToMsg = b.add(
Flow[String].map[TextMessage] (str => TextMessage.Strict(str))
)
mapMsgToLttr ~> processLttr ~> merge
incomingSource ~> merge ~> mapStrToMsg
FlowShape(mapMsgToLttr.in, mapStrToMsg.out)
})
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
常用套餐:
package treplol
package object common {
trait WsMessage
case class Letter(from: String, to: String, text: String) extends WsMessage
case class Auth(from: String) extends WsMessage
object WsSerializer {
import org.json4s.{Extraction, _}
import org.json4s.jackson.JsonMethods.{compact, parse}
import org.json4s.jackson.Serialization
implicit val formats = {
Serialization.formats(NoTypeHints)
}
case class WsData(typeOf: String, data: String)
object WsDataType {
val LETTER = "letter"
val AUTH = "auth"
}
class WrongIncomingData extends Throwable
def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match {
case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter]
case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth]
case _ => throw new WrongIncomingData
}
def encode(wsMessage: WsMessage): String = {
val typeOf = wsMessage match {
case _: Letter => WsDataType.LETTER
case _: Auth => WsDataType.AUTH
case _ => throw new WrongIncomingData
}
compact(Extraction.decompose(
WsData(typeOf, compact(Extraction.decompose(wsMessage)))
))
}
}
}
build.sbt
name := "treplol"
version := "0.0"
scalaVersion := "2.12.1"
resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.16",
"com.typesafe.akka" %% "akka-stream" % "2.4.16",
"com.typesafe.akka" %% "akka-http" % "10.0.3",
"org.json4s" %% "json4s-jackson" % "3.5.0"
)
提前谢谢大家!
根据文档,mapMaterializedValue
组合器
Transform only the materialized value of this Source, leaving all other properties as they were.
物化值仅在任何图形阶段(在本例中为源)为 运行 后可用。您永远不会 运行在您的代码中使用您的源代码。
请注意,您用来处理 WebSocket 消息的 Flow[Message, Message, Any]
实际上是由 Akka-HTTP 基础设施 运行 提供的,因此您无需手动执行。但是,您在 processLttr
正文中创建的 Source
未附加到图表的其余部分,因此不是 运行.
请参阅 docs 了解有关 运行 宁图和实体化的更多信息。
感谢 Stefano!
但似乎无法通过这种方式实现我想要的。但我更深入地挖掘并使用了custom stream processing and integration with actors。使用这种技术,我可以将消息从外部推送到某个流。 (此功能仍处于实验阶段!)