如何创建具有不同输入和输出类型的流以在图形内部使用?

How do I create a Flow with a different input and output types for use inside of a graph?

我正在通过在内部构建图表来制作自定义水槽。这是我的代码的广泛简化,以证明我的问题:

def mySink: Sink[Int, Unit] = Sink() { implicit builder =>

    val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
    val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
    val printSink = builder.add(Sink.foreach(elem => println(elem)))

    builder.addEdge(entrance.out, toString.in)
    builder.addEdge(toString.out, printSink.in)

    entrance.in
}

我遇到的问题是,虽然创建具有相同 input/output 类型且只有一个类型参数且没有值参数的 Flow 是有效的,例如:Flow[Int](这就是全部在文档中)仅提供两个类型参数和零值参数是无效的。

根据 reference documentation for the Flow object,我要查找的 apply 方法定义为

def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]

并说

Creates a Flow by passing a FlowGraph.Builder to the given create function.

The create function is expected to return a pair of Inlet and Outlet which correspond to the created Flows input and output ports.

当我试图制作我认为非常简单的流程时,我似乎需要处理另一个级别的图形构建器。是否有一种更简单、更简洁的方法来创建一个 Flow 来改变它的输入和输出类型,而不需要弄乱它的内部端口?如果这是解决此问题的正确方法,那么解决方案会是什么样子?

奖励:为什么制作一个不改变其输出输入类型的流很容易?

如果你想指定流的输入和输出类型,你确实需要使用你在文档中找到的应用方法。不过,使用它与您已经完成的几乎完全相同。

Flow[String, Message]() { implicit b =>
  import FlowGraph.Implicits._

  val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
  val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

  // connect the graph
  reverseString ~> mapStringToMsg

  // expose ports
  (reverseString.inlet, mapStringToMsg.outlet)
}

您 return 一个包含入口和出口的元组,而不是仅仅 return 入口。现在我们可以将此流程用于特定的 Source 或 Sink(例如在另一个构建器中,或直接与 runWith 一起使用)。