使用 blockingQueue Java 触发 SheduledExecutor
Trigger SheduledExecutor with blockingQueue Java
我目前正在开发 java 应用程序,该应用程序具有多个生产者将任务添加到队列的场景,只要队列不为空,任务就应以预定义的速率执行。 (使用多线程来保持执行率)在执行可用任务后,执行者必须等待队列中的任务再次可用。
我知道 blockingQueue 可用于触发此处的部分,而 ScheduledExecutorService 可用于以固定速率执行任务。但是我找不到一种方法来满足我的需要 link 这两种能力。因此,如果您能给我任何建议来实现这一目标,我将不胜感激。
您需要生产者线程和消费者线程都可以访问任务队列。我已经编写了一个基本程序来演示这一点,但我会让您根据需要使用 BlockingQueue
API 和 ScheduledExecutor
:
import java.util.concurrent.*;
public class ProducerConsumer {
private static final BlockingQueue<Integer> taskQueue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
ExecutorService consumers = Executors.newFixedThreadPool(3);
consumers.submit(new Consumer());
consumers.submit(new Consumer());
consumers.submit(new Consumer());
ExecutorService producers = Executors.newFixedThreadPool(2);
producers.submit(new Producer(1));
producers.submit(new Producer(2));
}
private static class Producer implements Runnable {
private final int task;
Producer(int task) {
this.task = task;
}
@Override
public void run() {
System.out.println("Adding task: " + task);
taskQueue.add(task); // put is better, since it will block if queue is full
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
try {
Integer task = taskQueue.take(); // block if there is no task available
System.out.println("Executing task: " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这是我想出的解决方法。它看起来有点生锈,但我已经对此进行了测试并且代码可以正常工作。
package test;
import java.util.concurrent.*;
public class FixedRateConsumer {
private BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);
private boolean continueRunning = true;
public void executeInBackGraound() throws InterruptedException, ExecutionException {
while (continueRunning) {
String s = queue.take();
Worker w = new Worker(s);
ScheduledFuture future = executorService.scheduleAtFixedRate(w, 0, 1, TimeUnit.SECONDS);
w.future = future;
try {
if (!future.isDone()) {
future.get();
}
} catch (CancellationException e) {
// Skipping
}
}
}
public void setContinueRunning(boolean state) {
continueRunning = state;
}
public void addConsumableObject(String s) throws InterruptedException {
queue.put(s);
}
private void consumeString(String s) {
System.out.println("Consumed -> " + s + ", ... @ -> " + System.currentTimeMillis() + " ms");
}
private class Worker implements Runnable {
String consumableObject;
ScheduledFuture future;
public Worker(String initialConsumableObject) {
this.consumableObject = initialConsumableObject;
}
@Override
public void run() {
try {
if (consumableObject == null) {
consumableObject = queue.take();
}
consumeString(consumableObject);
consumableObject = null;
if (queue.isEmpty()) {
if (future == null) {
while (future == null) {
Thread.sleep(50);
}
}
future.cancel(false);
}
} catch (Exception e) {
System.out.println("Exception : " + e);
}
}
}
}
我目前正在开发 java 应用程序,该应用程序具有多个生产者将任务添加到队列的场景,只要队列不为空,任务就应以预定义的速率执行。 (使用多线程来保持执行率)在执行可用任务后,执行者必须等待队列中的任务再次可用。
我知道 blockingQueue 可用于触发此处的部分,而 ScheduledExecutorService 可用于以固定速率执行任务。但是我找不到一种方法来满足我的需要 link 这两种能力。因此,如果您能给我任何建议来实现这一目标,我将不胜感激。
您需要生产者线程和消费者线程都可以访问任务队列。我已经编写了一个基本程序来演示这一点,但我会让您根据需要使用 BlockingQueue
API 和 ScheduledExecutor
:
import java.util.concurrent.*;
public class ProducerConsumer {
private static final BlockingQueue<Integer> taskQueue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
ExecutorService consumers = Executors.newFixedThreadPool(3);
consumers.submit(new Consumer());
consumers.submit(new Consumer());
consumers.submit(new Consumer());
ExecutorService producers = Executors.newFixedThreadPool(2);
producers.submit(new Producer(1));
producers.submit(new Producer(2));
}
private static class Producer implements Runnable {
private final int task;
Producer(int task) {
this.task = task;
}
@Override
public void run() {
System.out.println("Adding task: " + task);
taskQueue.add(task); // put is better, since it will block if queue is full
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
try {
Integer task = taskQueue.take(); // block if there is no task available
System.out.println("Executing task: " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这是我想出的解决方法。它看起来有点生锈,但我已经对此进行了测试并且代码可以正常工作。
package test;
import java.util.concurrent.*;
public class FixedRateConsumer {
private BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);
private boolean continueRunning = true;
public void executeInBackGraound() throws InterruptedException, ExecutionException {
while (continueRunning) {
String s = queue.take();
Worker w = new Worker(s);
ScheduledFuture future = executorService.scheduleAtFixedRate(w, 0, 1, TimeUnit.SECONDS);
w.future = future;
try {
if (!future.isDone()) {
future.get();
}
} catch (CancellationException e) {
// Skipping
}
}
}
public void setContinueRunning(boolean state) {
continueRunning = state;
}
public void addConsumableObject(String s) throws InterruptedException {
queue.put(s);
}
private void consumeString(String s) {
System.out.println("Consumed -> " + s + ", ... @ -> " + System.currentTimeMillis() + " ms");
}
private class Worker implements Runnable {
String consumableObject;
ScheduledFuture future;
public Worker(String initialConsumableObject) {
this.consumableObject = initialConsumableObject;
}
@Override
public void run() {
try {
if (consumableObject == null) {
consumableObject = queue.take();
}
consumeString(consumableObject);
consumableObject = null;
if (queue.isEmpty()) {
if (future == null) {
while (future == null) {
Thread.sleep(50);
}
}
future.cancel(false);
}
} catch (Exception e) {
System.out.println("Exception : " + e);
}
}
}
}