使用 blockingQueue Java 触发 SheduledExecutor

Trigger SheduledExecutor with blockingQueue Java

我目前正在开发 java 应用程序,该应用程序具有多个生产者将任务添加到队列的场景,只要队列不为空,任务就应以预定义的速率执行。 (使用多线程来保持执行率)在执行可用任务后,执行者必须等待队列中的任务再次可用。

我知道 blockingQueue 可用于触发此处的部分,而 ScheduledExecutorService 可用于以固定速率执行任务。但是我找不到一种方法来满足我的需要 link 这两种能力。因此,如果您能给我任何建议来实现这一目标,我将不胜感激。

您需要生产者线程和消费者线程都可以访问任务队列。我已经编写了一个基本程序来演示这一点,但我会让您根据需要使用 BlockingQueue API 和 ScheduledExecutor

import java.util.concurrent.*;


public class ProducerConsumer {
    private static final BlockingQueue<Integer> taskQueue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService consumers = Executors.newFixedThreadPool(3);
        consumers.submit(new Consumer());
        consumers.submit(new Consumer());
        consumers.submit(new Consumer());

        ExecutorService producers = Executors.newFixedThreadPool(2);
        producers.submit(new Producer(1));
        producers.submit(new Producer(2));
    }

    private static class Producer implements Runnable {
        private final int task;

        Producer(int task) {
            this.task = task;
        }

        @Override
        public void run() {
            System.out.println("Adding task: " + task);
            taskQueue.add(task); // put is better, since it will block if queue is full
        }
    }

    private static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                Integer task = taskQueue.take(); // block if there is no task available
                System.out.println("Executing task: " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

这是我想出的解决方法。它看起来有点生锈,但我已经对此进行了测试并且代码可以正常工作。

package test;

import java.util.concurrent.*;

public class FixedRateConsumer {

private BlockingQueue<String> queue = new ArrayBlockingQueue<>(20);

private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

private boolean continueRunning = true;

public void executeInBackGraound() throws InterruptedException, ExecutionException {
    while (continueRunning) {
        String s = queue.take();
        Worker w = new Worker(s);
        ScheduledFuture future = executorService.scheduleAtFixedRate(w, 0, 1, TimeUnit.SECONDS);
        w.future = future;

        try {
            if (!future.isDone()) {
                future.get();
            }
        } catch (CancellationException e) {
            // Skipping
        }
    }
}

public void setContinueRunning(boolean state) {
    continueRunning = state;
}

public void addConsumableObject(String s) throws InterruptedException {
    queue.put(s);
}

private void consumeString(String s) {
    System.out.println("Consumed -> " + s + ", ... @ -> "  + System.currentTimeMillis() + " ms");
}

private class Worker implements Runnable {
    String consumableObject;
    ScheduledFuture future;

    public Worker(String initialConsumableObject) {
        this.consumableObject = initialConsumableObject;
    }

    @Override
    public void run() {
        try {
            if (consumableObject == null) {
                consumableObject = queue.take();
            }

            consumeString(consumableObject);

            consumableObject = null;
            if (queue.isEmpty()) {
                if (future == null) {
                    while (future == null) {
                        Thread.sleep(50);
                    }
                }

                future.cancel(false);
            }

        } catch (Exception e) {
            System.out.println("Exception : " + e);
        }
    }
}
}