Java 线程 - 等待数据返回,但不要阻塞其他线程
Java Threading - Wait for data to be returned, but don't block other threads
我在 ExecutorPool 中有 7 个线程 运行 处理数据,偶尔需要来自另一个线程上的侦听器实例 运行 的数据。侦听器通过套接字向服务器发送请求,稍后,当结果为 returned 时,侦听器会将数据 return 发送到调用它的工作线程。我想阻塞工作线程,直到请求的数据被 returned,但我不想阻止侦听器从其他工作线程发出其他请求。我该怎么做?
如果一个线程将工作交给另一个线程,然后简单地等待结果,则不需要另一个线程来完成工作。您可能需要一个 class 来完成这项工作,但它是在同一个线程上调用的。如果多个线程使用同一个实例,则可能需要一些同步。但最重要的是:
您不需要侦听器线程。换成处理请求的组件,同步调用。
编辑
根据你自己的回答,你的问题比较清楚了。正如@JimN 建议的那样,您可能希望将 Future
分发给工作线程,并使其成为 CompletableFuture
Listener 保持在 Map
中,该 Map
由请求 ID 键入,直到响应 returns.
示例代码:
public class WorkUnitProcessor implements Runnable {
// ...
@Override
public void run() {
while(true) {
WorkUnit work = master.getNextWorkUnit();
if(work == null) return;
doWork(work);
}
}
public void doWork(WorkUnit work) {
//Do some work...
try {
DataRequest dataRequest = createRequest(work);
Future<Response> future = server.getData(dataRequest);
Response response = future.get(); // this call blocks until the Response is available.
//finish doing work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// handle e.getCause()
}
}
// ...
}
public class Server implements DataSourceDrivenCallback {
private final DataSource dataSource;
private Map<Integer, CompletableFuture<Response>> openRequests = new ConcurrentHashMap<>();
public Server(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void incomingDataCallback(int requestId, ChunkOfData requestedData) {
CompletableFuture<Response> responseHolder = openRequests.remove(requestId); // get the responseHolder
if (responseHolder != null) {
responseHolder.complete(toResponse(requestedData)); // make the response available.
}
}
public Future<Response> getData(DataRequest datarequest) {
int requestId = dataSource.submitRequest(serializeAndTranslateRequest(datarequest));
CompletableFuture<Response> future = new CompletableFuture<>();
openRequests.put(requestId, future);
return future;
}
// ...
}
我认为这可能有效。此处描述了我正在寻找的内容:
https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
这是使线程休眠直到它被线程通知它正在等待的能力。看起来好用。
public class DataProcessor {
private List<WorkUnit> work;
private Server server;
public DataProcessor(List<WorkUnit> work, int numprocessors) {
this.work = work;
setupProcessors(numprocessors);
Server server = new Server();
}
private void setupProcessors(int numprocessors) {
for(int i = 0; i < numprocessors; i++) {
WorkUnitProcessor worker = new WorkUnitProcessor(this, server);
worker.start();
}
}
public synchronized WorkUnit getNextWorkUnit() {
if(work.isEmpty()) return null;
return work.remove(0);
}
}
public class WorkUnitProcessor(Server server) {
private DataProcessor master;
private Server server;
public WorkUnitProcessor(DataProcessor master) {
this.master = master;
}
@Override
public void run() {
while(true) {
WorkUnit work = master.getNextWorkUnit();
if(work == null) return;
doWork(work);
}
}
public void doWork(WorkUnit work) {
//Do some work...
server.getData(datarequest, this);
while(!datarequest.filled) {
try {
wait();
} catch (InterruptedException e) {}
}
//finish doing work
}
}
public class Server implements DataSourceDrivenCallback {
private DataSource ds;
private Map<Integer, OpenRequest> openrequests;
public Server() {
//setup socket and establish communication with server through DataSource object
DataSource ds = new DataSource(<ID>, <Socket>);
}
public synchronized void getData(DataRequest datarequest, WorkUnitProcessor workerthread) {
int requestid = ds.submitRequest(serializeAndTranslateRequest(datarequest));
openrequests.add(new OpenRequest(workerthread, datarequest));
}
@Override
public void incomingDataCallback(int requestid, ChunkOfData requesteddata) {
OpenRequest request = openrequests.get(requestid);
request.datarequest.storeData(requesteddata);
request.workerthread.notify();
}
}
public class OpenRequest {
private WorkUnitProcessor workerthread;
private DataRequest datarequest;
//other details about request
}
我在 ExecutorPool 中有 7 个线程 运行 处理数据,偶尔需要来自另一个线程上的侦听器实例 运行 的数据。侦听器通过套接字向服务器发送请求,稍后,当结果为 returned 时,侦听器会将数据 return 发送到调用它的工作线程。我想阻塞工作线程,直到请求的数据被 returned,但我不想阻止侦听器从其他工作线程发出其他请求。我该怎么做?
如果一个线程将工作交给另一个线程,然后简单地等待结果,则不需要另一个线程来完成工作。您可能需要一个 class 来完成这项工作,但它是在同一个线程上调用的。如果多个线程使用同一个实例,则可能需要一些同步。但最重要的是:
您不需要侦听器线程。换成处理请求的组件,同步调用。
编辑
根据你自己的回答,你的问题比较清楚了。正如@JimN 建议的那样,您可能希望将 Future
分发给工作线程,并使其成为 CompletableFuture
Listener 保持在 Map
中,该 Map
由请求 ID 键入,直到响应 returns.
示例代码:
public class WorkUnitProcessor implements Runnable {
// ...
@Override
public void run() {
while(true) {
WorkUnit work = master.getNextWorkUnit();
if(work == null) return;
doWork(work);
}
}
public void doWork(WorkUnit work) {
//Do some work...
try {
DataRequest dataRequest = createRequest(work);
Future<Response> future = server.getData(dataRequest);
Response response = future.get(); // this call blocks until the Response is available.
//finish doing work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// handle e.getCause()
}
}
// ...
}
public class Server implements DataSourceDrivenCallback {
private final DataSource dataSource;
private Map<Integer, CompletableFuture<Response>> openRequests = new ConcurrentHashMap<>();
public Server(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void incomingDataCallback(int requestId, ChunkOfData requestedData) {
CompletableFuture<Response> responseHolder = openRequests.remove(requestId); // get the responseHolder
if (responseHolder != null) {
responseHolder.complete(toResponse(requestedData)); // make the response available.
}
}
public Future<Response> getData(DataRequest datarequest) {
int requestId = dataSource.submitRequest(serializeAndTranslateRequest(datarequest));
CompletableFuture<Response> future = new CompletableFuture<>();
openRequests.put(requestId, future);
return future;
}
// ...
}
我认为这可能有效。此处描述了我正在寻找的内容:
https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
这是使线程休眠直到它被线程通知它正在等待的能力。看起来好用。
public class DataProcessor {
private List<WorkUnit> work;
private Server server;
public DataProcessor(List<WorkUnit> work, int numprocessors) {
this.work = work;
setupProcessors(numprocessors);
Server server = new Server();
}
private void setupProcessors(int numprocessors) {
for(int i = 0; i < numprocessors; i++) {
WorkUnitProcessor worker = new WorkUnitProcessor(this, server);
worker.start();
}
}
public synchronized WorkUnit getNextWorkUnit() {
if(work.isEmpty()) return null;
return work.remove(0);
}
}
public class WorkUnitProcessor(Server server) {
private DataProcessor master;
private Server server;
public WorkUnitProcessor(DataProcessor master) {
this.master = master;
}
@Override
public void run() {
while(true) {
WorkUnit work = master.getNextWorkUnit();
if(work == null) return;
doWork(work);
}
}
public void doWork(WorkUnit work) {
//Do some work...
server.getData(datarequest, this);
while(!datarequest.filled) {
try {
wait();
} catch (InterruptedException e) {}
}
//finish doing work
}
}
public class Server implements DataSourceDrivenCallback {
private DataSource ds;
private Map<Integer, OpenRequest> openrequests;
public Server() {
//setup socket and establish communication with server through DataSource object
DataSource ds = new DataSource(<ID>, <Socket>);
}
public synchronized void getData(DataRequest datarequest, WorkUnitProcessor workerthread) {
int requestid = ds.submitRequest(serializeAndTranslateRequest(datarequest));
openrequests.add(new OpenRequest(workerthread, datarequest));
}
@Override
public void incomingDataCallback(int requestid, ChunkOfData requesteddata) {
OpenRequest request = openrequests.get(requestid);
request.datarequest.storeData(requesteddata);
request.workerthread.notify();
}
}
public class OpenRequest {
private WorkUnitProcessor workerthread;
private DataRequest datarequest;
//other details about request
}