在连接配置的最大连接数后,如何防止传入连接的 Akka TCP 流连接?
How to prevent Akka TCP Stream of incoming connections from connecting after a configured max number of connections have connected?
[编辑]:问题已解决,Artur 提供的解决方案已作为编辑添加到最后。
我想实现的想法是,一个TCP服务器允许n个连接,如果我得到一个n+1个连接则不允许连接。
因此,我需要以某种方式取消连接,然后将该特定流连接到 Sink.cancelled()。
我拥有的是连接到自定义流的 IncomingConnection,该流根据连接计数对 IncomingConnection 进行分区。一旦超过最大连接数,分区逻辑会将其定向到连接到 Sink.cancelled.
的插座
本以为是立即取消连接,但它允许客户端连接,然后在一段时间后断开连接。
也许我 运行 遇到了与 的答案中提到的相同问题,其中未找到要处理的流程,并且它一直存在并断开连接。
我在找
- 一个干净的解决方案,在超过最大值时不允许传入连接。
- Sink.cancelled() 在做什么(如果它确实在做某事)。
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;
public class SimpleStream03 {
private static int connectionCount = 0;
private static int maxConnectioCount = 2;
public static void runServer() {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888);
Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Handler Sink Connection Count " + connectionCount);
System.out.println("Handler Sink Client connected from: " + conn.remoteAddress());
conn.handleWith(Flow.of(ByteString.class), actorSystem);
});
Flow<IncomingConnection, IncomingConnection, NotUsed> connectioncountFlow = Flow
.fromGraph(GraphDSL.create(builder -> {
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
FlowShape<IncomingConnection, IncomingConnection> inFlowShape = builder
.add(Flow.of(IncomingConnection.class).map(conn -> {
connectionCount++;
return conn;
}));
UniformFanOutShape<IncomingConnection, IncomingConnection> partition = builder
.add(Partition.create(IncomingConnection.class, 2, param -> {
if (connectionCount > maxConnectioCount) {
connectionCount = maxConnectioCount;
System.out.println("Outlet 0 -> Sink.cancelled");
return 0;
}
System.out.println("Outlet 1 -> forward to handler");
return 1;
}));
builder.from(inFlowShape).toFanOut(partition);
builder.from(partition.out(0)).to(sinkCancelled);
return new FlowShape<>(inFlowShape.in(), partition.out(1));
}));
CompletionStage<ServerBinding> bindingFuture = source.via(connectioncountFlow).to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
public static void main(String[] args) throws InterruptedException {
SimpleStream03.runServer();
}
}
输出确认分区正在工作并且 2 个连接正在到达主接收器处理程序。
Server started, listening on: /127.0.0.1:8888
Outlet 1 -> forward to handler
Handler Sink Connection Count 1
Handler Sink Client connected from: /127.0.0.1:60327
Outlet 1 -> forward to handler
Handler Sink Connection Count 2
Handler Sink Client connected from: /127.0.0.1:60330
Outlet 0 -> Sink.cancelled
编辑:
实施已接受的答案,以下更改可防止违反阈值后的传入连接。客户端看到 连接被对等方重置
+---------------------------------------------------------+
| |
| Fail Flow |
| +-------------+ |
| +-->+Sink |Source| |
| | |cancel|fail | |
| | +-------------+ |
| +----------+ | |
| | | | |
| +----------+ | O0+--+ |
connections|FLOW | | | O0:count > threshold |
+-------+--> +------->+ Partition| |
| |count++ | | O1+----------------------------->
| +----------+ | | |
| | | O1:count <= threshold|
| +----------+ |
| |
+---------------------------------------------------------+
替换
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
有
Sink<IncomingConnection, CompletionStage<Done>> connectionCancellingSink = Sink.foreach(ic -> ic
.handleWith(Flow.fromSinkAndSource(Sink.cancelled(), Source.failed(new Throwable("killed"))),
actorSystem));// Sink.ignore and Sink.cancel give me the same expected result
SinkShape<IncomingConnection> sinkCancelledShape = builder.add(connectionCancellingSink);
Sink.cancelled()
立即取消上行(https://doc.akka.io/docs/akka/current/stream/operators/Sink/cancelled.html)。
然而,您的 Partition
是在将 eagerCancel 设置为 false
的情况下创建的
/**
* Create a new `Partition` operator with the specified input type, `eagerCancel` is `false`.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param partitioner function deciding which output each element will be targeted
*/
def create[T](
@unused clazz: Class[T],
outputCount: Int,
partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false)
这意味着 Partition
只有在其所有下游连接都取消时才会取消。这不是你想要的。但是你也不想要 eagerCancel=true
,因为这意味着超过限制的第一个连接将破坏整个 Partition
,因此你所有的连接..基本上破坏了整个服务器。
也许从嵌套流的角度考虑这里的情况是有用的。顶层 Source<IncomingConnection>
表示已接受的 TCP 连接流。您不想取消该流。如果你这样做,你只是杀死了你的服务器。每个 IncomingConnection
代表一个单独的 TCP 连接。在这种连接上发生的字节交换也表示为流。您希望为超过阈值的每个连接取消此流。
为此,您可以像这样定义一个取消连接 Sink
:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.cancelled(), Source.empty()),
actorSystem));
IncomingConnection
允许您使用 handleWith
方法附加处理程序。为此,您需要一个 Flow
,因为您既从客户端消耗字节,也可能向客户端发送字节(传入的字节进入 Flow
以及您想要发送回客户端的任何内容,您需要在Flow
) 的输出。在我们的例子中,我们只想立即取消该流。您可以使用 Flow.fromSinkAndSource
从... Sink
和 Source
中得到 Flow
。您可以利用它来插入 Sink.cancelled
和 Source.empty
。所以 Source.empty
意味着我们不会向连接发送任何字节并且 Sink.cancelled
将立即取消流并希望底层 TCP 连接。让我们试一试。
最后要做的是将我们新的取消 Sink
插入 Partition
SinkShape<IncomingConnection> sinkCancelled =
builder.add(connectionCancellingSink);
//...the rest stays the same
builder.from(partition.out(0))
.to(sinkCancelled);
如果这样做,在第三次连接时您将看到以下消息:
Not aborting connection from 127.0.0.1:49874 because downstream cancelled stream without failure
所以 Sink.cancelled()
并没有真正触发你想要的。让我们重新定义取消 Flow
:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(new Throwable("killed"))),
actorSystem));
现在,这是使用 Sink.ignore()
来忽略传入的字节,但通过 Source.failed(...)
使流失败。这将导致连接立即终止,并且 stracktrace 将打印在服务器输出上。如果你想保持安静,你可以创建没有堆栈跟踪的异常:
public static class TerminatedException extends Exception
{
public TerminatedException(String message)
{
super(message, null, false, false);
}
}
然后用它来使你的连接流失败
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(
new TerminatedException(("killed"))))
这样你会得到更干净的日志。
[编辑]:问题已解决,Artur 提供的解决方案已作为编辑添加到最后。
我想实现的想法是,一个TCP服务器允许n个连接,如果我得到一个n+1个连接则不允许连接。
因此,我需要以某种方式取消连接,然后将该特定流连接到 Sink.cancelled()。
我拥有的是连接到自定义流的 IncomingConnection,该流根据连接计数对 IncomingConnection 进行分区。一旦超过最大连接数,分区逻辑会将其定向到连接到 Sink.cancelled.
的插座本以为是立即取消连接,但它允许客户端连接,然后在一段时间后断开连接。
也许我 运行 遇到了与
我在找
- 一个干净的解决方案,在超过最大值时不允许传入连接。
- Sink.cancelled() 在做什么(如果它确实在做某事)。
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;
public class SimpleStream03 {
private static int connectionCount = 0;
private static int maxConnectioCount = 2;
public static void runServer() {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888);
Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Handler Sink Connection Count " + connectionCount);
System.out.println("Handler Sink Client connected from: " + conn.remoteAddress());
conn.handleWith(Flow.of(ByteString.class), actorSystem);
});
Flow<IncomingConnection, IncomingConnection, NotUsed> connectioncountFlow = Flow
.fromGraph(GraphDSL.create(builder -> {
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
FlowShape<IncomingConnection, IncomingConnection> inFlowShape = builder
.add(Flow.of(IncomingConnection.class).map(conn -> {
connectionCount++;
return conn;
}));
UniformFanOutShape<IncomingConnection, IncomingConnection> partition = builder
.add(Partition.create(IncomingConnection.class, 2, param -> {
if (connectionCount > maxConnectioCount) {
connectionCount = maxConnectioCount;
System.out.println("Outlet 0 -> Sink.cancelled");
return 0;
}
System.out.println("Outlet 1 -> forward to handler");
return 1;
}));
builder.from(inFlowShape).toFanOut(partition);
builder.from(partition.out(0)).to(sinkCancelled);
return new FlowShape<>(inFlowShape.in(), partition.out(1));
}));
CompletionStage<ServerBinding> bindingFuture = source.via(connectioncountFlow).to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
public static void main(String[] args) throws InterruptedException {
SimpleStream03.runServer();
}
}
输出确认分区正在工作并且 2 个连接正在到达主接收器处理程序。
Server started, listening on: /127.0.0.1:8888
Outlet 1 -> forward to handler
Handler Sink Connection Count 1
Handler Sink Client connected from: /127.0.0.1:60327
Outlet 1 -> forward to handler
Handler Sink Connection Count 2
Handler Sink Client connected from: /127.0.0.1:60330
Outlet 0 -> Sink.cancelled
编辑: 实施已接受的答案,以下更改可防止违反阈值后的传入连接。客户端看到 连接被对等方重置
+---------------------------------------------------------+
| |
| Fail Flow |
| +-------------+ |
| +-->+Sink |Source| |
| | |cancel|fail | |
| | +-------------+ |
| +----------+ | |
| | | | |
| +----------+ | O0+--+ |
connections|FLOW | | | O0:count > threshold |
+-------+--> +------->+ Partition| |
| |count++ | | O1+----------------------------->
| +----------+ | | |
| | | O1:count <= threshold|
| +----------+ |
| |
+---------------------------------------------------------+
替换
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
有
Sink<IncomingConnection, CompletionStage<Done>> connectionCancellingSink = Sink.foreach(ic -> ic
.handleWith(Flow.fromSinkAndSource(Sink.cancelled(), Source.failed(new Throwable("killed"))),
actorSystem));// Sink.ignore and Sink.cancel give me the same expected result
SinkShape<IncomingConnection> sinkCancelledShape = builder.add(connectionCancellingSink);
Sink.cancelled()
立即取消上行(https://doc.akka.io/docs/akka/current/stream/operators/Sink/cancelled.html)。
然而,您的 Partition
是在将 eagerCancel 设置为 false
/**
* Create a new `Partition` operator with the specified input type, `eagerCancel` is `false`.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param partitioner function deciding which output each element will be targeted
*/
def create[T](
@unused clazz: Class[T],
outputCount: Int,
partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false)
这意味着 Partition
只有在其所有下游连接都取消时才会取消。这不是你想要的。但是你也不想要 eagerCancel=true
,因为这意味着超过限制的第一个连接将破坏整个 Partition
,因此你所有的连接..基本上破坏了整个服务器。
也许从嵌套流的角度考虑这里的情况是有用的。顶层 Source<IncomingConnection>
表示已接受的 TCP 连接流。您不想取消该流。如果你这样做,你只是杀死了你的服务器。每个 IncomingConnection
代表一个单独的 TCP 连接。在这种连接上发生的字节交换也表示为流。您希望为超过阈值的每个连接取消此流。
为此,您可以像这样定义一个取消连接 Sink
:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.cancelled(), Source.empty()),
actorSystem));
IncomingConnection
允许您使用 handleWith
方法附加处理程序。为此,您需要一个 Flow
,因为您既从客户端消耗字节,也可能向客户端发送字节(传入的字节进入 Flow
以及您想要发送回客户端的任何内容,您需要在Flow
) 的输出。在我们的例子中,我们只想立即取消该流。您可以使用 Flow.fromSinkAndSource
从... Sink
和 Source
中得到 Flow
。您可以利用它来插入 Sink.cancelled
和 Source.empty
。所以 Source.empty
意味着我们不会向连接发送任何字节并且 Sink.cancelled
将立即取消流并希望底层 TCP 连接。让我们试一试。
最后要做的是将我们新的取消 Sink
插入 Partition
SinkShape<IncomingConnection> sinkCancelled =
builder.add(connectionCancellingSink);
//...the rest stays the same
builder.from(partition.out(0))
.to(sinkCancelled);
如果这样做,在第三次连接时您将看到以下消息:
Not aborting connection from 127.0.0.1:49874 because downstream cancelled stream without failure
所以 Sink.cancelled()
并没有真正触发你想要的。让我们重新定义取消 Flow
:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(new Throwable("killed"))),
actorSystem));
现在,这是使用 Sink.ignore()
来忽略传入的字节,但通过 Source.failed(...)
使流失败。这将导致连接立即终止,并且 stracktrace 将打印在服务器输出上。如果你想保持安静,你可以创建没有堆栈跟踪的异常:
public static class TerminatedException extends Exception
{
public TerminatedException(String message)
{
super(message, null, false, false);
}
}
然后用它来使你的连接流失败
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(
new TerminatedException(("killed"))))
这样你会得到更干净的日志。