如何发送多条消息作为我的来源
How do I send multiple messages as my source
我只是在尝试当前以单个 TextMessage 作为源的示例流:
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
我想发送 2 条消息,所以我尝试了这个:
val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))
val helloSource: Source[Message, NotUsed] =
Source.combine(source2)
但是我得到这个错误:
polymorphic expression cannot be instantiated to expected type;
[error] found : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error] required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error] Source.combine(source1, source2)
[error] ^
[error] one error found
我到底应该做什么?
Source.combine
是组合多个源的灵活方式,您需要指定组合它们的策略,如链接文档中所述。
在这种情况下,如果您希望一个有限源后跟另一个有限源,您可以使用 Concat
策略。
val helloSource: Source[Message, NotUsed] =
Source.combine(source1, source2)(Concat(_))
作为更简单的替代方法,您可以在第一个来源上使用 concat
方法:
val helloSource: Source[Message, NotUsed] =
source1.concat(source2)
但是,对于此示例,如果您有一组固定的硬编码元素,则避免创建多个源并仅从具有 Source.apply
的 Iterable
创建单个源会更简单:
val helloSource: Source[Message, NotUsed] =
Source(Seq(TextMessage("hello"), TextMessage("world")))
我只是在尝试当前以单个 TextMessage 作为源的示例流:
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
我想发送 2 条消息,所以我尝试了这个:
val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))
val helloSource: Source[Message, NotUsed] =
Source.combine(source2)
但是我得到这个错误:
polymorphic expression cannot be instantiated to expected type;
[error] found : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error] required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error] Source.combine(source1, source2)
[error] ^
[error] one error found
我到底应该做什么?
Source.combine
是组合多个源的灵活方式,您需要指定组合它们的策略,如链接文档中所述。
在这种情况下,如果您希望一个有限源后跟另一个有限源,您可以使用 Concat
策略。
val helloSource: Source[Message, NotUsed] =
Source.combine(source1, source2)(Concat(_))
作为更简单的替代方法,您可以在第一个来源上使用 concat
方法:
val helloSource: Source[Message, NotUsed] =
source1.concat(source2)
但是,对于此示例,如果您有一组固定的硬编码元素,则避免创建多个源并仅从具有 Source.apply
的 Iterable
创建单个源会更简单:
val helloSource: Source[Message, NotUsed] =
Source(Seq(TextMessage("hello"), TextMessage("world")))