为什么Akka TCP流服务器在connection.handlewith没有流时断开客户端?
Why does Akka TCP stream server disconnect client when there is no flow for the connection.handlewith?
我正在寻找对我在以下代码中看到的行为的解释。当
conn.handleWith 被注释掉,我用 netcat 建立的 TCP 客户端连接,连接,并在几秒钟内显示被对等方断开连接(即服务器断开连接)。当 conn.handleWith 出现在代码中时,我看不到断开连接。我最初虽然它与为服务器设置的空闲超时有关,但事实并非如此。
那么为什么服务器在没有流量处理连接的情况下断开客户端呢?
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
public class SimpleStream00 {
public static void main(String[] args) throws InterruptedException {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
// conn.handleWith(Flow.of(ByteString.class), actorSystem);
// Server does not drop the connection when previous line is uncommented
});
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888); // .idleTimeout(Duration.ofSeconds(60));
CompletionStage<ServerBinding> bindingFuture = source.to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
}
Akka Streams 的一般原则是,如果没有需求,流应该消耗尽可能少的资源。由于没有 handleWith
,您的流永远不会从连接发出对 ByteString
的需求信号,Akka 的 TCP 层断开连接以节省资源。
我正在寻找对我在以下代码中看到的行为的解释。当
conn.handleWith 被注释掉,我用 netcat 建立的 TCP 客户端连接,连接,并在几秒钟内显示被对等方断开连接(即服务器断开连接)。当 conn.handleWith 出现在代码中时,我看不到断开连接。我最初虽然它与为服务器设置的空闲超时有关,但事实并非如此。
那么为什么服务器在没有流量处理连接的情况下断开客户端呢?
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
public class SimpleStream00 {
public static void main(String[] args) throws InterruptedException {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
// conn.handleWith(Flow.of(ByteString.class), actorSystem);
// Server does not drop the connection when previous line is uncommented
});
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888); // .idleTimeout(Duration.ofSeconds(60));
CompletionStage<ServerBinding> bindingFuture = source.to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
}
Akka Streams 的一般原则是,如果没有需求,流应该消耗尽可能少的资源。由于没有 handleWith
,您的流永远不会从连接发出对 ByteString
的需求信号,Akka 的 TCP 层断开连接以节省资源。