如何发送多条消息作为我的来源

How do I send multiple messages as my source

我只是在尝试当前以单个 TextMessage 作为源的示例流:

 // print each incoming strict text message
    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val helloSource: Source[Message, NotUsed] =
      Source.single(TextMessage("hello world!"))

    // the Future[Done] is the materialized value of Sink.foreach
    // and it is completed when the stream completes
    val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

我想发送 2 条消息,所以我尝试了这个:

val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))

val helloSource: Source[Message, NotUsed] =
  Source.combine(source2)

但是我得到这个错误:

 polymorphic expression cannot be instantiated to expected type;
[error]  found   : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error]  required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error]       Source.combine(source1, source2)
[error]                     ^
[error] one error found

我到底应该做什么?

Source.combine 是组合多个源的灵活方式,您需要指定组合它们的策略,如链接文档中所述。

在这种情况下,如果您希望一个有限源后跟另一个有限源,您可以使用 Concat 策略。

val helloSource: Source[Message, NotUsed] =
  Source.combine(source1, source2)(Concat(_))

作为更简单的替代方法,您可以在第一个来源上使用 concat 方法:

val helloSource: Source[Message, NotUsed] =
  source1.concat(source2)

但是,对于此示例,如果您有一组固定的硬编码元素,则避免创建多个源并仅从具有 Source.applyIterable 创建单个源会更简单:

val helloSource: Source[Message, NotUsed] =
  Source(Seq(TextMessage("hello"), TextMessage("world")))