服务器在 RSocket Fire and Forget 情况下未接收数据

Server not receiving data in RSocket Fire and Forget case

我有以下 RSocket 服务器

@Log4j2
public class NativeRsocketServerFnF {
  public static void main(String[] args) {
    RSocketFactory.receive()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .acceptor((setup, clientHandleRsocket) -> {
          return Mono.just(
              new AbstractRSocket() {
                @Override
                public Mono<Void> fireAndForget(Payload payload) {
                  CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
                  payload.release();
                  log.info("> from client: {}", message);
                  return Mono.empty();
                }
              }
          );
        })
        .transport(TcpServerTransport.create(8000))
        .start()
        .block()
        .onClose()
        .block();
  }
}

和以下 RSocket 客户端

@Log4j2
public class NativeRsocketClientFnF {
  public static void main(String[] args) {
    RSocketFactory.connect()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .transport(TcpClientTransport.create(8000))
        .start()
        .flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
        .block();
  }
}

如您所见,我正在尝试将 "ping" 作为负载数据从客户端发送到服务器

当我第一次启动服务器和客户端时,我看到 > from client: ping

如果我再次重启客户端,我在服务器上看不到任何消息。服务器连断点都打不中

我的理解是 Fire and Forget 只是简单地发送数据并且不会费心等待并查看服务器是否成功处理了数据,但在我的情况下,服务器本身在后续运行时不会接收数据客户(和新客户一样好)

有什么我遗漏的吗?

我正在使用 rsocket-core & rsocket-transport-netty

的版本 1.0.0-RC5

OS: Ubuntu 16.04

您对即发即弃模式的理解是正确的。

问题是 io.rsocket.RSocketRequester#fireAndForget 在数据发送到网络之前成功完成。当您阻止它并退出您的客户端应用程序时,连接将关闭。

io.rsocket.RSocketRequester#handleFireAndForget

    return emptyUnicastMono()
        .doOnSubscribe(
            new OnceConsumer<Subscription>() {
              @Override
              public void acceptOnce(@Nonnull Subscription subscription) {
                ByteBuf requestFrame =
                    RequestFireAndForgetFrameFlyweight.encode(
                        allocator,
                        streamId,
                        false,
                        payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
                        payload.sliceData().retain());
                payload.release();

                sendProcessor.onNext(requestFrame);
              }
            });

sendProcessor 是中级通量处理器。实际连接使用,向网络发送请求帧可能会有延迟。

您可以在客户端调用 .block() 之前添加 .then(Mono.delay(Duration.ofMillis(100))) 以进行测试。我认为大多数真正的应用程序在调用 fireAndForget 后不会立即退出。