在 Java 中获取计划的非阻塞操作的结果
Get results of scheduled non-blocking operations in Java
我正在尝试以预定的非阻塞方式执行一些阻塞操作(比如 HTTP 请求)。假设我有 10 个请求,一个请求需要 3 秒,但我不想等待 3 秒,而是等待 1 秒,然后发送下一个请求。所有执行完成后,我想将所有结果收集在一个列表中,并 return 给用户。
下面是我的场景原型(线程休眠用作阻塞操作,而不是 HTTP 请求。)
public static List<Integer> getResults(List<Integer> inputs) throws InterruptedException, ExecutionException {
List<Integer> results = new LinkedList<Integer>();
Queue<Callable<Integer>> tasks = new LinkedList<Callable<Integer>>();
List<Future<Integer>> futures = new LinkedList<Future<Integer>>();
for (Integer input : inputs) {
Callable<Integer> task = new Callable<Integer>() {
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return input + 1000;
}
};
tasks.add(task);
}
ExecutorService es = Executors.newCachedThreadPool();
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Callable<Integer> task = tasks.poll();
if (task == null) {
ses.shutdown();
es.shutdown();
return;
}
futures.add(es.submit(task));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
while(true) {
if(futures.size() == inputs.size()) {
for (Future<Integer> future : futures) {
Integer result = future.get();
results.add(result);
}
return results;
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Integer> results = getResults(new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
System.out.println(Arrays.toString(results.toArray()));
}
我在 while 循环中等待,直到所有任务 return 得到正确的结果。但它永远不会进入中断条件,它会无限循环。每当我放置一个 I/O 操作,如记录器甚至一个断点时,它都会打破 while 循环,一切都会变得正常。
我对 Java 并发比较陌生,我想了解正在发生的事情以及这是否是正确的做法。我想 I/O 操作会触发线程调度程序上的某些内容并使其检查集合的大小。
您需要同步线程。您有两个不同的线程(主线程和执行服务线程)访问 futures
列表,并且由于 LinkedList
不同步,这两个线程看到 futures
.[= 的两个不同值19=]
while(true) {
synchronized(futures) {
if(futures.size() == inputs.size()) {
...
}
}
}
发生这种情况是因为 java 中的线程使用 cpu 缓存来提高性能。所以每个线程在同步之前可以有不同的变量值。
这个 SO question 有更多关于这个的信息。
也来自答案:
It's all about memory. Threads communicate through shared memory, but when there are multiple CPUs in a system, all trying to access the same memory system, then the memory system becomes a bottleneck. Therefore, the CPUs in a typical multi-CPU computer are allowed to delay, re-order, and cache memory operations in order to speed things up.
That works great when threads are not interacting with one another, but it causes problems when they actually do want to interact: If thread A stores a value into an ordinary variable, Java makes no guarantee about when (or even if) thread B will see the value change.
In order to overcome that problem when it's important, Java gives you certain means of synchronizing threads. That is, getting the threads to agree on the state of the program's memory. The volatile keyword and the synchronized keyword are two means of establishing synchronization between threads.
最后,futures
列表不会在您的代码中更新,因为无限 while
块导致主线程不断被占用。在 while 循环中执行任何 I/O 操作都会使 cpu 有足够的呼吸 space 来更新其本地缓存。
无限 while 循环通常不是一个好主意,因为它非常耗费资源。在下一次迭代之前添加一个小的延迟可以让它变得更好一点(尽管仍然效率低下)。
我正在尝试以预定的非阻塞方式执行一些阻塞操作(比如 HTTP 请求)。假设我有 10 个请求,一个请求需要 3 秒,但我不想等待 3 秒,而是等待 1 秒,然后发送下一个请求。所有执行完成后,我想将所有结果收集在一个列表中,并 return 给用户。
下面是我的场景原型(线程休眠用作阻塞操作,而不是 HTTP 请求。)
public static List<Integer> getResults(List<Integer> inputs) throws InterruptedException, ExecutionException {
List<Integer> results = new LinkedList<Integer>();
Queue<Callable<Integer>> tasks = new LinkedList<Callable<Integer>>();
List<Future<Integer>> futures = new LinkedList<Future<Integer>>();
for (Integer input : inputs) {
Callable<Integer> task = new Callable<Integer>() {
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return input + 1000;
}
};
tasks.add(task);
}
ExecutorService es = Executors.newCachedThreadPool();
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Callable<Integer> task = tasks.poll();
if (task == null) {
ses.shutdown();
es.shutdown();
return;
}
futures.add(es.submit(task));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
while(true) {
if(futures.size() == inputs.size()) {
for (Future<Integer> future : futures) {
Integer result = future.get();
results.add(result);
}
return results;
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Integer> results = getResults(new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
System.out.println(Arrays.toString(results.toArray()));
}
我在 while 循环中等待,直到所有任务 return 得到正确的结果。但它永远不会进入中断条件,它会无限循环。每当我放置一个 I/O 操作,如记录器甚至一个断点时,它都会打破 while 循环,一切都会变得正常。
我对 Java 并发比较陌生,我想了解正在发生的事情以及这是否是正确的做法。我想 I/O 操作会触发线程调度程序上的某些内容并使其检查集合的大小。
您需要同步线程。您有两个不同的线程(主线程和执行服务线程)访问 futures
列表,并且由于 LinkedList
不同步,这两个线程看到 futures
.[= 的两个不同值19=]
while(true) {
synchronized(futures) {
if(futures.size() == inputs.size()) {
...
}
}
}
发生这种情况是因为 java 中的线程使用 cpu 缓存来提高性能。所以每个线程在同步之前可以有不同的变量值。 这个 SO question 有更多关于这个的信息。
也来自
It's all about memory. Threads communicate through shared memory, but when there are multiple CPUs in a system, all trying to access the same memory system, then the memory system becomes a bottleneck. Therefore, the CPUs in a typical multi-CPU computer are allowed to delay, re-order, and cache memory operations in order to speed things up.
That works great when threads are not interacting with one another, but it causes problems when they actually do want to interact: If thread A stores a value into an ordinary variable, Java makes no guarantee about when (or even if) thread B will see the value change.
In order to overcome that problem when it's important, Java gives you certain means of synchronizing threads. That is, getting the threads to agree on the state of the program's memory. The volatile keyword and the synchronized keyword are two means of establishing synchronization between threads.
最后,futures
列表不会在您的代码中更新,因为无限 while
块导致主线程不断被占用。在 while 循环中执行任何 I/O 操作都会使 cpu 有足够的呼吸 space 来更新其本地缓存。
无限 while 循环通常不是一个好主意,因为它非常耗费资源。在下一次迭代之前添加一个小的延迟可以让它变得更好一点(尽管仍然效率低下)。