服务器在 RSocket Fire and Forget 情况下未接收数据
Server not receiving data in RSocket Fire and Forget case
我有以下 RSocket 服务器
@Log4j2
public class NativeRsocketServerFnF {
public static void main(String[] args) {
RSocketFactory.receive()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.acceptor((setup, clientHandleRsocket) -> {
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
payload.release();
log.info("> from client: {}", message);
return Mono.empty();
}
}
);
})
.transport(TcpServerTransport.create(8000))
.start()
.block()
.onClose()
.block();
}
}
和以下 RSocket 客户端
@Log4j2
public class NativeRsocketClientFnF {
public static void main(String[] args) {
RSocketFactory.connect()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.transport(TcpClientTransport.create(8000))
.start()
.flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
.block();
}
}
如您所见,我正在尝试将 "ping" 作为负载数据从客户端发送到服务器
当我第一次启动服务器和客户端时,我看到 > from client: ping
如果我再次重启客户端,我在服务器上看不到任何消息。服务器连断点都打不中
我的理解是 Fire and Forget 只是简单地发送数据并且不会费心等待并查看服务器是否成功处理了数据,但在我的情况下,服务器本身在后续运行时不会接收数据客户(和新客户一样好)
有什么我遗漏的吗?
我正在使用 rsocket-core
& rsocket-transport-netty
的版本 1.0.0-RC5
OS: Ubuntu 16.04
您对即发即弃模式的理解是正确的。
问题是 io.rsocket.RSocketRequester#fireAndForget
在数据发送到网络之前成功完成。当您阻止它并退出您的客户端应用程序时,连接将关闭。
见io.rsocket.RSocketRequester#handleFireAndForget
return emptyUnicastMono()
.doOnSubscribe(
new OnceConsumer<Subscription>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();
sendProcessor.onNext(requestFrame);
}
});
sendProcessor
是中级通量处理器。实际连接使用,向网络发送请求帧可能会有延迟。
您可以在客户端调用 .block()
之前添加 .then(Mono.delay(Duration.ofMillis(100)))
以进行测试。我认为大多数真正的应用程序在调用 fireAndForget 后不会立即退出。
我有以下 RSocket 服务器
@Log4j2
public class NativeRsocketServerFnF {
public static void main(String[] args) {
RSocketFactory.receive()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.acceptor((setup, clientHandleRsocket) -> {
return Mono.just(
new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
payload.release();
log.info("> from client: {}", message);
return Mono.empty();
}
}
);
})
.transport(TcpServerTransport.create(8000))
.start()
.block()
.onClose()
.block();
}
}
和以下 RSocket 客户端
@Log4j2
public class NativeRsocketClientFnF {
public static void main(String[] args) {
RSocketFactory.connect()
.frameDecoder(ZERO_COPY)
.errorConsumer(log::error)
.transport(TcpClientTransport.create(8000))
.start()
.flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
.block();
}
}
如您所见,我正在尝试将 "ping" 作为负载数据从客户端发送到服务器
当我第一次启动服务器和客户端时,我看到 > from client: ping
如果我再次重启客户端,我在服务器上看不到任何消息。服务器连断点都打不中
我的理解是 Fire and Forget 只是简单地发送数据并且不会费心等待并查看服务器是否成功处理了数据,但在我的情况下,服务器本身在后续运行时不会接收数据客户(和新客户一样好)
有什么我遗漏的吗?
我正在使用 rsocket-core
& rsocket-transport-netty
1.0.0-RC5
OS: Ubuntu 16.04
您对即发即弃模式的理解是正确的。
问题是 io.rsocket.RSocketRequester#fireAndForget
在数据发送到网络之前成功完成。当您阻止它并退出您的客户端应用程序时,连接将关闭。
见io.rsocket.RSocketRequester#handleFireAndForget
return emptyUnicastMono()
.doOnSubscribe(
new OnceConsumer<Subscription>() {
@Override
public void acceptOnce(@Nonnull Subscription subscription) {
ByteBuf requestFrame =
RequestFireAndForgetFrameFlyweight.encode(
allocator,
streamId,
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());
payload.release();
sendProcessor.onNext(requestFrame);
}
});
sendProcessor
是中级通量处理器。实际连接使用,向网络发送请求帧可能会有延迟。
您可以在客户端调用 .block()
之前添加 .then(Mono.delay(Duration.ofMillis(100)))
以进行测试。我认为大多数真正的应用程序在调用 fireAndForget 后不会立即退出。