在连接配置的最大连接数后,如何防止传入连接的 Akka TCP 流连接?

How to prevent Akka TCP Stream of incoming connections from connecting after a configured max number of connections have connected?

[编辑]:问题已解决,Artur 提供的解决方案已作为编辑添加到最后。

我想实现的想法是,一个TCP服务器允许n个连接,如果我得到一个n+1个连接则不允许连接。

因此,我需要以某种方式取消连接,然后将该特定流连接到 Sink.cancelled()。

我拥有的是连接到自定义流的 IncomingConnection,该流根据连接计数对 IncomingConnection 进行分区。一旦超过最大连接数,分区逻辑会将其定向到连接到 Sink.cancelled.

的插座

本以为是立即取消连接,但它允许客户端连接,然后在一段时间后断开连接。

也许我 运行 遇到了与 的答案中提到的相同问题,其中未找到要处理的流程,并且它一直存在并断开连接。

我在找

  1. 一个干净的解决方案,在超过最大值时不允许传入连接。
  2. Sink.cancelled() 在做什么(如果它确实在做某事)。
package com.example;

import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;


public class SimpleStream03 {
    private static int connectionCount = 0;
    private static int maxConnectioCount = 2;

    public static void runServer() {

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");

        Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
                8888); 

        Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
            
            System.out.println("Handler Sink Connection Count " + connectionCount);
            System.out.println("Handler Sink Client connected from: " + conn.remoteAddress());

            conn.handleWith(Flow.of(ByteString.class), actorSystem);

        });

        Flow<IncomingConnection, IncomingConnection, NotUsed> connectioncountFlow = Flow
                .fromGraph(GraphDSL.create(builder -> {

                    SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
                    FlowShape<IncomingConnection, IncomingConnection> inFlowShape = builder
                            .add(Flow.of(IncomingConnection.class).map(conn -> {
                                connectionCount++;
                                return conn;
                            }));
                    UniformFanOutShape<IncomingConnection, IncomingConnection> partition = builder
                            .add(Partition.create(IncomingConnection.class, 2, param -> {
                                if (connectionCount > maxConnectioCount) {
                                    connectionCount = maxConnectioCount;
                                    System.out.println("Outlet 0 -> Sink.cancelled");
                                    return 0;
                                }
                                System.out.println("Outlet 1 -> forward to handler");
                                return 1;
                            }));

                    builder.from(inFlowShape).toFanOut(partition);
                    builder.from(partition.out(0)).to(sinkCancelled);
                    return new FlowShape<>(inFlowShape.in(), partition.out(1));

                }));

        CompletionStage<ServerBinding> bindingFuture = source.via(connectioncountFlow).to(handler).run(actorSystem);

        bindingFuture.handle((binding, throwable) -> {
            if (binding != null) {
                System.out.println("Server started, listening on: " + binding.localAddress());

            } else {
                System.err.println("Server could not bind to  : " + throwable.getMessage());
                actorSystem.terminate();
            }
            return NotUsed.getInstance();
        });

    }

    public static void main(String[] args) throws InterruptedException {
        SimpleStream03.runServer();

    }

}

输出确认分区正在工作并且 2 个连接正在到达主接收器处理程序。

Server started, listening on: /127.0.0.1:8888
Outlet 1 -> forward to handler
Handler Sink Connection Count 1
Handler Sink Client connected from: /127.0.0.1:60327
Outlet 1 -> forward to handler
Handler Sink Connection Count 2
Handler Sink Client connected from: /127.0.0.1:60330
Outlet 0 -> Sink.cancelled

编辑: 实施已接受的答案,以下更改可防止违反阈值后的传入连接。客户端看到 连接被对等方重置

        +---------------------------------------------------------+
        |                                                         |
        |                                         Fail Flow       |
        |                                        +-------------+  |
        |                                    +-->+Sink  |Source|  |
        |                                    |   |cancel|fail  |  |
        |                                    |   +-------------+  |
        |                      +----------+  |                    |
        |                      |          |  |                    |
        |  +----------+        |        O0+--+                    |
connections|FLOW      |        |          |  O0:count > threshold |
+-------+-->          +------->+ Partition|                       |
        |  |count++   |        |        O1+----------------------------->
        |  +----------+        |          |                       |
        |                      |          |  O1:count <= threshold|
        |                      +----------+                       |
        |                                                         |
        +---------------------------------------------------------+

替换

SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());

Sink<IncomingConnection, CompletionStage<Done>> connectionCancellingSink = Sink.foreach(ic -> ic
                            .handleWith(Flow.fromSinkAndSource(Sink.cancelled(), Source.failed(new Throwable("killed"))),
                                    actorSystem));// Sink.ignore and Sink.cancel give me the same expected result
SinkShape<IncomingConnection> sinkCancelledShape = builder.add(connectionCancellingSink);
                    

Sink.cancelled()立即取消上行(https://doc.akka.io/docs/akka/current/stream/operators/Sink/cancelled.html)。

然而,您的 Partition 是在将 eagerCancel 设置为 false

的情况下创建的

  /**
   * Create a new `Partition` operator with the specified input type, `eagerCancel` is `false`.
   *
   * @param clazz a type hint for this method
   * @param outputCount number of output ports
   * @param partitioner function deciding which output each element will be targeted
   */
  def create[T](
      @unused clazz: Class[T],
      outputCount: Int,
      partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] =
    new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false)

这意味着 Partition 只有在其所有下游连接都取消时才会取消。这不是你想要的。但是你也不想要 eagerCancel=true,因为这意味着超过限制的第一个连接将破坏整个 Partition,因此你所有的连接..基本上破坏了整个服务器。

也许从嵌套流的角度考虑这里的情况是有用的。顶层 Source<IncomingConnection> 表示已接受的 TCP 连接流。您不想取消该流。如果你这样做,你只是杀死了你的服务器。每个 IncomingConnection 代表一个单独的 TCP 连接。在这种连接上发生的字节交换也表示为流。您希望为超过阈值的每个连接取消此流。

为此,您可以像这样定义一个取消连接 Sink

Sink<IncomingConnection, CompletionStage<Done>>
      connectionCancellingSink =
      Sink.foreach(
        ic ->
          ic.handleWith(
            Flow.fromSinkAndSource(Sink.cancelled(), Source.empty()),
            actorSystem));

IncomingConnection 允许您使用 handleWith 方法附加处理程序。为此,您需要一个 Flow,因为您既从客户端消耗字节,也可能向客户端发送字节(传入的字节进入 Flow 以及您想要发送回客户端的任何内容,您需要在Flow) 的输出。在我们的例子中,我们只想立即取消该流。您可以使用 Flow.fromSinkAndSource 从... SinkSource 中得到 Flow。您可以利用它来插入 Sink.cancelledSource.empty。所以 Source.empty 意味着我们不会向连接发送任何字节并且 Sink.cancelled 将立即取消流并希望底层 TCP 连接。让我们试一试。

最后要做的是将我们新的取消 Sink 插入 Partition

SinkShape<IncomingConnection> sinkCancelled =
            builder.add(connectionCancellingSink);
//...the rest stays the same
 builder.from(partition.out(0))
              .to(sinkCancelled);
         

如果这样做,在第三次连接时您将看到以下消息:

Not aborting connection from 127.0.0.1:49874 because downstream cancelled stream without failure

所以 Sink.cancelled() 并没有真正触发你想要的。让我们重新定义取消 Flow:

Sink<IncomingConnection, CompletionStage<Done>>
      connectionCancellingSink =
      Sink.foreach(
        ic ->
          ic.handleWith(
            Flow.fromSinkAndSource(Sink.ignore(),
                                   Source.failed(new Throwable("killed"))),
            actorSystem));

现在,这是使用 Sink.ignore() 来忽略传入的字节,但通过 Source.failed(...) 使流失败。这将导致连接立即终止,并且 stracktrace 将打印在服务器输出上。如果你想保持安静,你可以创建没有堆栈跟踪的异常:

public static class TerminatedException extends Exception
  {
    public TerminatedException(String message)
    {
      super(message, null, false, false);
    }
  }

然后用它来使你的连接流失败

Flow.fromSinkAndSource(Sink.ignore(),
                       Source.failed(
                         new TerminatedException(("killed"))))
   

这样你会得到更干净的日志。