如何通过 Java 库在 Akka 中创建自定义均匀扇出形状

How to create a custom Uniform Fan Out Shape in Akka through the Java library

所以我正在构建一个 Akka 自定义组件来学习框架。我需要创建一个 UnifromFanOut 形状自定义组件。所以我尝试扩展形状,但问题是如何创建具有 1 个输入和 3 个输出的形状。 class 需要一个 seq 对象,它是 Scala 的一部分,但我对整个形状本身是否正确感到有点困惑。我刚开始学习 Akka,所以我可以理解我的方法是否错误。此外,我练习的目的是创建一个自定义组件,我知道我可以通过 GraphDSL 完成逻辑,但它需要是一个自定义组件。我的问题是如何正确创建此形状。 (文档不是自定义组件的最佳文档)

    public final Inlet<DeviceInfo> in = Inlet.create("Map.in");
    public final Outlet<DeviceInfo> temp_out = Outlet.create("Map.out");
    public final Outlet<DeviceInfo> humidity_out = Outlet.create("Map.out");
    public final Outlet<DeviceInfo> illumination_out = Outlet.create("Map.out");
    //Does not work
    private final UniformFanOutShape<DeviceInfo, DeviceInfo> shape = UniformFanOutShape.apply(in, Arrays.asList(temp_out, humidity_out));

    @Override
    public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() {     
        return shape;
    }

[编辑] 长话短说 您基本上需要解决从 Java 调用 Scala vararg 方法的问题。您可以使用 scala.collection.mutable.ArrayBuffer

这样做
    ArrayBuffer<Outlet<DeviceInfo>> arrayBuffer = new ArrayBuffer<>();
    arrayBuffer.append(temp_out);
    arrayBuffer.append(humidity_out);
    arrayBuffer.append(illumination_out);

    final UniformFanOutShape<DeviceInfo, DeviceInfo> shape =
      UniformFanOutShape.apply(in, arrayBuffer.toSeq());

来源:How to use Scala varargs from Java code

[/编辑]

(the documentation isn't the best for custom components)

我不同意。文档非常全面 (https://doc.akka.io/docs/akka/current/stream/stream-customize.html)

您不应扩展 UniformFanOutShape<DeviceInfo, DeviceInfo>,而应扩展 GraphStage<UniformFanOutShape<DeviceInfo, DeviceInfo>>

然后您需要覆盖两个方法:

 public UniformFanOutShape<DeviceInfo, DeviceInfo> shape() {
    //implement
 }

 @Override
 public GraphStageLogic createLogic(Attributes inheritedAttributes) {
   //implement
 }

文档向您展示了如何创建形状以及如何实现 GraphStageLogic 以及有关处理背压等的详细信息。

文档显示了 SourceShapeSinkShapeFlowShape 的示例。如果您了解这些示例,但需要更多指导,您可以查看源代码。在 akka.stream.scaladsl.Graph.scala 中,您可以找到 Akka Streams 提供的所有内置 GraphStage 的实现。例如 BroadcastUniformFanOutShape 阶段的一个例子,它的代码是这样开始的:

final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]]

实施不到 100 行代码,并遵循与文档显示相同的模式。代码是 Scala,因为 Akka 是用该语言实现的,带有薄层 Java 层,旨在从 Java 代码中使用。因此,例如还有 akka.stream.javadsl.Graph.scala,但您会看到那里的代码委托给 scaladsl 包中的实现。你会在整个 Akka 代码中看到这种模式。所以简而言之,您需要了解一些 Scala 才能理解实现,但在这种情况下,它与 Java 实现的外观非常相似。