当我从 ReplaySubject 使用 Observable 时阻止 ChannelHandlerContext

Block ChannelHandlerContext when I use Observable from ReplaySubject

我正在编写客户端-服务器应用程序。我从数据库中获取数据并将其放入 rxjava2 的 ReplaySubject(ReplaySubject 是必需的,因为我需要保证每个客户端上的数据相同)当客户端连接订阅它时,我想将此数据发送给他但是当我尝试它时我的头 "possible way ^_^" 它阻塞了。所谓块,我的意思是它不发送数据,但是当我关闭服务器时,数据会立即显示在客户端。

我尝试在客户端和服务器端事件循环中添加一些线程(我在想可能是线程阻塞,因为我使用 'Infinite' 源所以要接收这个我需要另一个线程或类似的东西)。

服务器端渠道代码:

public
    class ClientHandler
        extends SimpleChannelInboundHandler<DataWrapper> {


    private final Observable<DataWrapper> data;

    public ClientHandler(Observable<DataWrapper> data) {
        this.data = data;
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // super.channelRegistered(ctx);
        final Channel channel = ctx.channel();
        Server
            .INSTANCE
            .appendToChannelGroup(channel);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // super.channelActive(ctx);
        // i believe there is something wrong
        data.subscribe(ctx::writeAndFlush);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    // rest skip
}

客户端:

public
    class DirectNetworkCommunicator
        extends SimpleChannelInboundHandler<DataWrapper> {


    private Observable<DataWrapper> generatedData;
    private ExecutorService fallbackThread;


    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
        this.fallbackThread = Executors.newSingleThreadExecutor();
        this.generatedData = generatedData;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        DataWrapper inComingData = (DataWrapper) msg;
        Adapter
            .INSTANCE
            .appendFromNettworkData(inComingData);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // super.channelReadComplete(ctx);
        ctx.flush();
    }
    // rest skip
}

所以我之前提到过,我希望它能在服务器关闭时接收数据,而不是在服务器关闭时接收数据^_^。如果这对 netty 4.1.37 最终版本有帮助。

好的,所以未来的人们会面临同样的问题,我自己找到了答案。 来自客户端的 Netty 使用后台线程作为通信的主要线程 意味着我已经等待主线程释放,然后它才能对 observable 进行操作。 希望它能帮助别人。