Java 线程 - 等待数据返回,但不要阻塞其他线程

Java Threading - Wait for data to be returned, but don't block other threads

我在 ExecutorPool 中有 7 个线程 运行 处理数据,偶尔需要来自另一个线程上的侦听器实例 运行 的数据。侦听器通过套接字向服务器发送请求,稍后,当结果为 returned 时,侦听器会将数据 return 发送到调用它的工作线程。我想阻塞工作线程,直到请求的数据被 returned,但我不想阻止侦听器从其他工作线程发出其他请求。我该怎么做?

如果一个线程将工作交给另一个线程,然后简单地等待结果,则不需要另一个线程来完成工作。您可能需要一个 class 来完成这项工作,但它是在同一个线程上调用的。如果多个线程使用同一个实例,则可能需要一些同步。但最重要的是:

您不需要侦听器线程。换成处理请求的组件,同步调用。

编辑

根据你自己的回答,你的问题比较清楚了。正如@JimN 建议的那样,您可能希望将 Future 分发给工作线程,并使其成为 CompletableFuture Listener 保持在 Map 中,该 Map 由请求 ID 键入,直到响应 returns.

示例代码:

public class WorkUnitProcessor implements Runnable {

    // ...

    @Override
    public void run() {
        while(true) {
            WorkUnit work = master.getNextWorkUnit();
            if(work == null) return;
            doWork(work);
        }
    }

    public void doWork(WorkUnit work) {

        //Do some work...

        try {
            DataRequest dataRequest = createRequest(work);

            Future<Response> future = server.getData(dataRequest);
            Response response = future.get();                       // this call blocks until the Response is available.

            //finish doing work

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            // handle e.getCause()
        }

    }

    // ...
}

public class Server implements DataSourceDrivenCallback {

    private final DataSource dataSource;

    private Map<Integer, CompletableFuture<Response>> openRequests = new ConcurrentHashMap<>();

    public Server(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void incomingDataCallback(int requestId, ChunkOfData requestedData) {
        CompletableFuture<Response> responseHolder = openRequests.remove(requestId);  // get the responseHolder
        if (responseHolder != null) {
            responseHolder.complete(toResponse(requestedData));                     // make the response available.
        }
    }

    public Future<Response> getData(DataRequest datarequest) {
        int requestId = dataSource.submitRequest(serializeAndTranslateRequest(datarequest));
        CompletableFuture<Response> future = new CompletableFuture<>();
        openRequests.put(requestId, future);
        return future;
    }

    // ...
}

我认为这可能有效。此处描述了我正在寻找的内容:

https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html

这是使线程休眠直到它被线程通知它正在等待的能力。看起来好用。

public class DataProcessor {
    private List<WorkUnit> work;
    private Server server;

    public DataProcessor(List<WorkUnit> work, int numprocessors) {
        this.work = work;
        setupProcessors(numprocessors);
        Server server = new Server();
    }

    private void setupProcessors(int numprocessors) {
        for(int i = 0; i < numprocessors; i++) {
            WorkUnitProcessor worker = new WorkUnitProcessor(this, server);
            worker.start();
        }
    }

    public synchronized WorkUnit getNextWorkUnit() {
        if(work.isEmpty()) return null;
        return work.remove(0);
    }
}

public class WorkUnitProcessor(Server server) {
    private DataProcessor master;
    private Server server;

    public WorkUnitProcessor(DataProcessor master) {
        this.master = master;
    }

    @Override
    public void run() {
        while(true) {
            WorkUnit work = master.getNextWorkUnit();
            if(work == null) return;
            doWork(work);
        }
    }

    public void doWork(WorkUnit work) {
        //Do some work...
        server.getData(datarequest, this);
        while(!datarequest.filled) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        //finish doing work
    }   
}

public class Server implements DataSourceDrivenCallback {
    private DataSource ds;
    private Map<Integer, OpenRequest> openrequests;


    public Server() {
        //setup socket and establish communication with server through DataSource object
        DataSource ds = new DataSource(<ID>, <Socket>);
    }

    public synchronized void getData(DataRequest datarequest, WorkUnitProcessor workerthread) {
        int requestid = ds.submitRequest(serializeAndTranslateRequest(datarequest));
        openrequests.add(new OpenRequest(workerthread, datarequest));
    }

    @Override
    public void incomingDataCallback(int requestid, ChunkOfData requesteddata) {
        OpenRequest request = openrequests.get(requestid);
        request.datarequest.storeData(requesteddata);
        request.workerthread.notify();
    }
}

public class OpenRequest {
    private WorkUnitProcessor workerthread;
    private DataRequest datarequest;
    //other details about request
}