等待守护线程使用执行程序服务完成迭代
Wait for daemon threads to complete an iteration using an executor service
我必须并行化现有的后台任务,这样它就不会连续消耗 'x' 资源,而是仅使用 'y' 个线程 (y << x) 并行完成手头的工作。此任务在后台不断运行并不断处理一些资源。
代码结构如下:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
我按以下方式修改了 ChildBackground
:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
我不是每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中有点问题。虽然,我不明白它会有多糟糕,因为我不会在每次迭代中产生超过 10 个线程。
我无法理解如何等待所有 ResourceProcessor
完成一次迭代的资源处理,以便我可以重置一些计数并在 stopProcessing
中发出指标。我考虑了以下选项:
1) executorService.awaitTermination(超时)。这不会真正起作用,因为它会一直阻塞直到超时,因为 ResourceProcessor
线程永远不会真正完成它们的工作
2) 我可以找出 findResources
之后的资源数量,并将其提供给 child class 并让每个 ResourceProcessor
增加资源数量处理。在重置计数之前,我将不得不等待 stopProcessing
中处理完所有资源。我需要类似 CountDownLatch 的东西,但它应该计数 UP
。这个选项会有很多状态管理,我不是特别喜欢。
3) 我可以更新 public abstract void processResource(final int resource)
以包括总资源计数并让 child 进程等待所有线程处理完总资源。在这种情况下也会有一些状态管理,但仅限于 child class.
在这两种情况中的任何一种情况下,我都必须添加 wait() 和 notify() 逻辑,但我对我的方法没有信心。这就是我的:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
我不确定使用 AtomicInteger
是否是正确的方法,如果是这样,我是否需要调用 wait() 和 notify()。如果我不使用 wait() 和 notify() 我什至不必执行同步块中的所有内容。
如果我应该为每次迭代简单地创建和关闭 ExecutorService 或者是否有第四种方法我应该追求,请告诉我您对这种方法的想法。
您的代码似乎不必要地复杂。 ExecutorService
中已有一个队列,为什么要有自己的队列?当我认为您可以让股票 ExecutorService
为您处理时,您必须做一大堆管理工作。
我将您的工作定义为:
public static class ResourceProcessor implements Runnable {
private final int resource;
public ResourceProcessor(int resource) {
this.resource = resource;
}
public void run() {
try {
// does some work
} finally {
// if this is still necessary then you should use a `Future` instead
incrementProcessedResources();
}
}
}
然后您可以像这样提交它们:
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();
executorService.awaitTermination(timeout)
. This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs
现在可以了。
2) I can find out the number of resources [have finished].
如果可以打电话还需要这个吗awaitTermination(...)
?
3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...
同样的问题。需要这个吗?
如果您确实需要知道已处理请求的列表,那么您可以像@ScaryWombat 提到的那样使用 Future<Integer>
和 Callable<Integer>
或使用 ExecutorCompletionService
.
Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.
你能再解释一下吗?
希望这对您有所帮助。
我必须并行化现有的后台任务,这样它就不会连续消耗 'x' 资源,而是仅使用 'y' 个线程 (y << x) 并行完成手头的工作。此任务在后台不断运行并不断处理一些资源。
代码结构如下:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
我按以下方式修改了 ChildBackground
:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
我不是每次都创建和拆除 ExecutorService,因为垃圾收集在我的服务中有点问题。虽然,我不明白它会有多糟糕,因为我不会在每次迭代中产生超过 10 个线程。
我无法理解如何等待所有 ResourceProcessor
完成一次迭代的资源处理,以便我可以重置一些计数并在 stopProcessing
中发出指标。我考虑了以下选项:
1) executorService.awaitTermination(超时)。这不会真正起作用,因为它会一直阻塞直到超时,因为 ResourceProcessor
线程永远不会真正完成它们的工作
2) 我可以找出 findResources
之后的资源数量,并将其提供给 child class 并让每个 ResourceProcessor
增加资源数量处理。在重置计数之前,我将不得不等待 stopProcessing
中处理完所有资源。我需要类似 CountDownLatch 的东西,但它应该计数 UP
。这个选项会有很多状态管理,我不是特别喜欢。
3) 我可以更新 public abstract void processResource(final int resource)
以包括总资源计数并让 child 进程等待所有线程处理完总资源。在这种情况下也会有一些状态管理,但仅限于 child class.
在这两种情况中的任何一种情况下,我都必须添加 wait() 和 notify() 逻辑,但我对我的方法没有信心。这就是我的:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
我不确定使用 AtomicInteger
是否是正确的方法,如果是这样,我是否需要调用 wait() 和 notify()。如果我不使用 wait() 和 notify() 我什至不必执行同步块中的所有内容。
如果我应该为每次迭代简单地创建和关闭 ExecutorService 或者是否有第四种方法我应该追求,请告诉我您对这种方法的想法。
您的代码似乎不必要地复杂。 ExecutorService
中已有一个队列,为什么要有自己的队列?当我认为您可以让股票 ExecutorService
为您处理时,您必须做一大堆管理工作。
我将您的工作定义为:
public static class ResourceProcessor implements Runnable {
private final int resource;
public ResourceProcessor(int resource) {
this.resource = resource;
}
public void run() {
try {
// does some work
} finally {
// if this is still necessary then you should use a `Future` instead
incrementProcessedResources();
}
}
}
然后您可以像这样提交它们:
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();
executorService.awaitTermination(timeout)
. This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs
现在可以了。
2) I can find out the number of resources [have finished].
如果可以打电话还需要这个吗awaitTermination(...)
?
3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...
同样的问题。需要这个吗?
如果您确实需要知道已处理请求的列表,那么您可以像@ScaryWombat 提到的那样使用 Future<Integer>
和 Callable<Integer>
或使用 ExecutorCompletionService
.
Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.
你能再解释一下吗?
希望这对您有所帮助。