spring 引导和执行程序服务
spring boot and Executor Service
我正在使用 spring 启动
public interface StringConsume extends Consumer<String> {
default public void strHandel(String str) {
accept(str);
}
}
实施
@Component("StrImpl")
public class StringConsumeImpl implements StringConsume {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(500);
final ExecutorService exService = Executors.newSingleThreadExecutor();
Future<?> future = CompletableFuture.completedFuture(true);
@Override
public void accept(String t) {
try {
queue.put(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (null != queue.peek()) {
if (future.isDone()) {
future = exService.submit(() -> queue.take());
}
}
}
}
Class
@Component
public class Test {
@Resource(name="StrImpl")
private @Autowired StringConsume handler;
public void insertIntoQueue(String str) {
handler.accept(str);
}
}
在 StringConsumeImpl 中,我需要同步 while 循环吗?假设 StringConsumeImpl class 调用了五次,那么 while 循环将创建 5 个进程还是仅创建 1 个进程?如果有的话,什么是 StringConsumeImpl 中 while 循环的最佳替代品?
该代码存在一些问题。
首先,消费者实际上并没有"consume"任何东西,它只是将字符串添加到队列中然后将其取回。假设为了论证它也通过将它打印到控制台或其他东西来 "consumes" 它。
其次,由于循环,消费者只会被调用一次,除非它 运行 在自己的线程中。例如,如果你这样做
public static void main(String[]args) {
StringConsume consumer = new StringConsumeImpl();
consumer.accept("hello");
}
消费者会把"hello"放入队列,马上取出来然后留在循环中,等待更多的元素取出;但是,没有人实际添加任何内容。
做你想做的事情的通常概念是 "producer/consumer"。这意味着有一个 "producer" 将项目放入队列和一个 "consumer" 将它们取出并用它们做事。
所以在你的情况下,你的 class 所做的是 "consume" 将字符串放入队列,使其成为 "producer",然后 "consuming" 字符串将其从队列中取出。当然,还有字符串的 "actual" 生产者,即 class 调用它。
所以一般来说你会做这样的事情:
/** Produces random Strings */
class RandomStringProducer {
Random random = new Random();
public String produceString() {
return Double.toString(random.nextDouble());
}
}
/** Prints a String */
class PrintConsumer implements StringConsume {
public void accept(String s) { System.out.println(s); }
}
/** Consumes String by putting it into a queue */
class QueueProducer implements StringConsume {
BlockingQueue<String> queue;
public QueueProducer(BlockingQueue<String> q) { queue = q; }
public void accept(String s) {
queue.put(s);
}
}
public static void main(String[] args) {
// the producer
RandomStringProducer producer = new RandomStringProducer();
// the end consumer
StringConsume printConsumer = new PrintConsumer();
// the queue that links producer and consumer
BlockingQueue<String> queue = new ArrayBlockingQueue<>();
// the consumer putting strings into the queue
QueueProducer queuePutter = new QueueProducer(queue);
// now, let's tie them together
// one thread to produce strings and put them into the queue
ScheduledExecutorService producerService = Executors.newScheduledThreadPool(1);
Runnable createStringAndPutIntoQueue = () -> {
String created = producer.createString();
queuePutter.consume(created);
};
// put string into queue every 100ms
producerService.scheduleAtFixedRate(createStringAndPutIntoQueue, 100, TimeUnit.MILLISECONDS);
// one thread to consume strings
Runnable takeStringFromQueueAndPrint = () -> {
while(true) {
String takenFromQueue = queue.take(); // this will block until a string is available
printConsumer.consume(takenFromQueue);
}
};
// let it run in a different thread
ExecutorService consumerService = Executors.newSingleThreadExecutor();
consumerService.submit(takeStringFromQueueAndPrint);
// this will be printed; we are in the main thread and code is still being executed
System.out.println("the produce/consume has started");
}
所以当你运行这个的时候,就会出现三个线程:主线程,生产者线程和消费者线程。生产者和消费者将同时做他们的事情,主线程也将继续运行(如最后一行的System.out.println
所示)。
我正在使用 spring 启动
public interface StringConsume extends Consumer<String> {
default public void strHandel(String str) {
accept(str);
}
}
实施
@Component("StrImpl")
public class StringConsumeImpl implements StringConsume {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(500);
final ExecutorService exService = Executors.newSingleThreadExecutor();
Future<?> future = CompletableFuture.completedFuture(true);
@Override
public void accept(String t) {
try {
queue.put(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (null != queue.peek()) {
if (future.isDone()) {
future = exService.submit(() -> queue.take());
}
}
}
}
Class
@Component
public class Test {
@Resource(name="StrImpl")
private @Autowired StringConsume handler;
public void insertIntoQueue(String str) {
handler.accept(str);
}
}
在 StringConsumeImpl 中,我需要同步 while 循环吗?假设 StringConsumeImpl class 调用了五次,那么 while 循环将创建 5 个进程还是仅创建 1 个进程?如果有的话,什么是 StringConsumeImpl 中 while 循环的最佳替代品?
该代码存在一些问题。
首先,消费者实际上并没有"consume"任何东西,它只是将字符串添加到队列中然后将其取回。假设为了论证它也通过将它打印到控制台或其他东西来 "consumes" 它。
其次,由于循环,消费者只会被调用一次,除非它 运行 在自己的线程中。例如,如果你这样做
public static void main(String[]args) {
StringConsume consumer = new StringConsumeImpl();
consumer.accept("hello");
}
消费者会把"hello"放入队列,马上取出来然后留在循环中,等待更多的元素取出;但是,没有人实际添加任何内容。
做你想做的事情的通常概念是 "producer/consumer"。这意味着有一个 "producer" 将项目放入队列和一个 "consumer" 将它们取出并用它们做事。
所以在你的情况下,你的 class 所做的是 "consume" 将字符串放入队列,使其成为 "producer",然后 "consuming" 字符串将其从队列中取出。当然,还有字符串的 "actual" 生产者,即 class 调用它。
所以一般来说你会做这样的事情:
/** Produces random Strings */
class RandomStringProducer {
Random random = new Random();
public String produceString() {
return Double.toString(random.nextDouble());
}
}
/** Prints a String */
class PrintConsumer implements StringConsume {
public void accept(String s) { System.out.println(s); }
}
/** Consumes String by putting it into a queue */
class QueueProducer implements StringConsume {
BlockingQueue<String> queue;
public QueueProducer(BlockingQueue<String> q) { queue = q; }
public void accept(String s) {
queue.put(s);
}
}
public static void main(String[] args) {
// the producer
RandomStringProducer producer = new RandomStringProducer();
// the end consumer
StringConsume printConsumer = new PrintConsumer();
// the queue that links producer and consumer
BlockingQueue<String> queue = new ArrayBlockingQueue<>();
// the consumer putting strings into the queue
QueueProducer queuePutter = new QueueProducer(queue);
// now, let's tie them together
// one thread to produce strings and put them into the queue
ScheduledExecutorService producerService = Executors.newScheduledThreadPool(1);
Runnable createStringAndPutIntoQueue = () -> {
String created = producer.createString();
queuePutter.consume(created);
};
// put string into queue every 100ms
producerService.scheduleAtFixedRate(createStringAndPutIntoQueue, 100, TimeUnit.MILLISECONDS);
// one thread to consume strings
Runnable takeStringFromQueueAndPrint = () -> {
while(true) {
String takenFromQueue = queue.take(); // this will block until a string is available
printConsumer.consume(takenFromQueue);
}
};
// let it run in a different thread
ExecutorService consumerService = Executors.newSingleThreadExecutor();
consumerService.submit(takeStringFromQueueAndPrint);
// this will be printed; we are in the main thread and code is still being executed
System.out.println("the produce/consume has started");
}
所以当你运行这个的时候,就会出现三个线程:主线程,生产者线程和消费者线程。生产者和消费者将同时做他们的事情,主线程也将继续运行(如最后一行的System.out.println
所示)。