在 Java 中使用 ExecutorService 执行第一个可运行程序时是否可以阻止其他可运行程序?
Is it possible to block other runnables while executing the first one, using ExecutorService in Java?
我正在尝试使用 ExecutorService
在多线程中处理相对较大的 Stream
of List
。该方法看起来像这样。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我在这里要做的是,对于 Stream
中的每个 List
,正在创建一个 Runnable
并提交给 ExecutorService
.它似乎工作正常。但是,我现在真正想做的是,看看是否有任何方法可以使 ExecutorService
运行 从 List
中的第一个 List
获得第一个 Runnable
12=] 同时阻止其他 Runnables
直到
它的执行,然后继续 运行ning other Runnables
(并行)。真的需要一些帮助。
你可以先拿Runnable,执行它,然后再提交其他Runnable。
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
哪里
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
是(IMO)解决这个问题的正确方法。不要阻止可运行程序。当 运行 的先决条件已经满足时提交可运行程序。
让一个可运行块阻塞其他块的问题在于,您很可能会用阻塞等待另一任务完成的任务来阻塞执行程序的线程池。事实上,如果线程池是有界的,您甚至可能会遇到这样一种情况,即所有线程都处于这种状态,而执行程序无法启动将解除所有线程阻塞的任务。结果:死锁!
如果您仍然想要阻止可运行程序(尽管有上述情况),那么您可以使用 CountDownLatch
.
来实现它
在实例化 Runnable
之前,创建一个初始计数器为 1
的 CountDownLatch
实例。此实例必须由所有 Runnable
共享。
代码一 Runnable
以便它获取 List
,处理它,然后调用 latch.count()
;
编码一秒钟 Runnable
调用 latch.await()
然后获取并处理一个 List
.
使用第一个 Runnable
提交一个任务,使用第二个提交其余任务。
我正在尝试使用 ExecutorService
在多线程中处理相对较大的 Stream
of List
。该方法看起来像这样。
public void initMigration() {
ExecutorService executorService = Executors.newCachedThreadPool();
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
streamOfLists.forEach(record4List -> {
Runnable runnable = () -> {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
};
executorService.submit(runnable);
});
}
LOGGER.info("Shutting down the ExecutorService");
executorService.shutdown();
}
基本上,我在这里要做的是,对于 Stream
中的每个 List
,正在创建一个 Runnable
并提交给 ExecutorService
.它似乎工作正常。但是,我现在真正想做的是,看看是否有任何方法可以使 ExecutorService
运行 从 List
中的第一个 List
获得第一个 Runnable
12=] 同时阻止其他 Runnables
直到
它的执行,然后继续 运行ning other Runnables
(并行)。真的需要一些帮助。
你可以先拿Runnable,执行它,然后再提交其他Runnable。
try (Stream<List<Record4<Integer, Integer, String, byte[]>>> streamOfLists = getStreamOfLists()) {
Iterator<List<Record4<Integer, Integer, String, byte[]>>> it = streamOfLists.iterator();
if (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
runnable.run();
}
while (it.hasNext()) {
List<Record4<Integer, Integer, String, byte[]>> list = it.next();
Runnable runnable = new MyRunnable(record4List);
executorService.submit(runnable);
}
}
哪里
class MyRunnable implements Runnable {
Record4<Integer, Integer, String, byte[]> record4List;
MyRunnable(Record4<Integer, Integer, String, byte[]> record4List) {
this.record4List = record4List;
}
@Override
public void run() {
try {
final List<Attachment> attachments = RecordProcessor.prepareAttachmentsToPost(record4List);
LOGGER.info("Invoking POST with payload {}", attachments);
Collection<UploadLink> uploadLinks = restClient.postAttachments(attachments);
restClient.processUploadLinksAndUpload(RecordProcessor.recordsIntoPojo(record4List), uploadLinks);
} catch (ExceptionA | ExceptionB e) {
e.printStackTrace();
}
}
}
让一个可运行块阻塞其他块的问题在于,您很可能会用阻塞等待另一任务完成的任务来阻塞执行程序的线程池。事实上,如果线程池是有界的,您甚至可能会遇到这样一种情况,即所有线程都处于这种状态,而执行程序无法启动将解除所有线程阻塞的任务。结果:死锁!
如果您仍然想要阻止可运行程序(尽管有上述情况),那么您可以使用 CountDownLatch
.
在实例化
Runnable
之前,创建一个初始计数器为1
的CountDownLatch
实例。此实例必须由所有Runnable
共享。代码一
Runnable
以便它获取List
,处理它,然后调用latch.count()
;编码一秒钟
Runnable
调用latch.await()
然后获取并处理一个List
.使用第一个
Runnable
提交一个任务,使用第二个提交其余任务。