Spring Executors - 在具有特定索引的线程中执行方法
Spring Executors - Executing method in thread with specific index
在我的应用程序中,我有一个接收器,它应该处理具有多个线程的消息。发送方模块将消息放入栈中,接收方模块从栈中取出消息。一切都与两个线程一起工作(一个用于发送,一个用于接收)。现在我想接收多线程的消息。
消息有某种组号。我需要在一个线程中处理一组消息(消息组比线程多)。我该如何处理?
我想做这样的事情:
public class ReceiverExample {
@Autowired
Receiver receiver;
@Autowired
TaskExecutor executor;
public void receive(){
Msg msg = receiver.receive();
int index = msg.getIndex();
executor.execute(
//put index ? ,
msg
);
}
}
我正在使用 Spring 框架。我知道我必须使用某种执行者,但我找不到任何很好的例子来说明我的问题。
因此您需要确保具有相同 "message group id" 的两个 Runnable
永远不会 运行 并行。
"executor"接口没有"messageGroupId"的概念,所以你可能需要写另一个class来协调这个(不确定第三个中是否存在这样的东西派对图书馆或其他任何东西,也许有人可以 post 提出建议??)
下面的 class 为每个线程维护一个队列,并将确保具有相同消息组 ID 的任务始终在同一队列中。为这些队列中的每一个创建一个线程来处理任务......(它并不完美,但它可能是一个很好的起点)
package stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageGroupTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MessageGroupTaskExecutor.class);
private static final long POLL_TIME = 10;
private BlockingQueue<Runnable>[] queues;
private Thread[] workerThreads;
private volatile boolean stopped;
private ThreadFactory threadFactory;
public MessageGroupTaskExecutor(int numberOfThreads, ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.queues = new BlockingQueue[numberOfThreads];
this.workerThreads = new Thread[numberOfThreads];
}
public void init() {
for (int k = 0; k < workerThreads.length; k++) {
queues[k] = new LinkedBlockingQueue<Runnable>();
workerThreads[k] = threadFactory.newThread(new WorkerThread(queues[k]));
workerThreads[k].start();
}
}
/**
* Not a great impl - no way to "shutdown after all tasks finished" (at the moment...)
*/
public void shutdown() {
stopped = true;
for (Thread thread : workerThreads) {
try {
thread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting for thread to finish", e);
}
}
}
public void execute(int messageGroup, Runnable command) {
queues[messageGroup % queues.length].add(command);
LOG.debug("Submitted {} for message group {}", command, messageGroup);
}
private class WorkerThread implements Runnable {
private BlockingQueue<Runnable> queue;
public WorkerThread(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!stopped) {
Runnable nextJob = null;
try {
nextJob = queue.poll(POLL_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Interrupted when polling queue", e);
}
if (nextJob != null) {
try {
LOG.debug("Got job " + nextJob);
nextJob.run();
LOG.debug("Completed " + nextJob);
} catch (Exception ex) {
LOG.debug("Exception thrown by " + nextJob, ex);
}
}
}
}
}
}
如您所见,当通过 execute
方法提交作业时,消息组 ID 用于 select 将作业放在哪个队列上。
工作线程将处理这些队列。
这是使用它的基本 class...
package stack;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestIt {
private static final Logger LOG = LoggerFactory.getLogger(TestIt.class);
public static void main(String[] args) {
MessageGroupTaskExecutor exe = new MessageGroupTaskExecutor(5, Executors.defaultThreadFactory());
exe.init();
for (int k = 0; k < 100; k++) {
int messageGroup = (int) (Math.random() * 15.0);
exe.execute(messageGroup, new MyTask(messageGroup, k));
}
}
private static class MyTask implements Runnable {
private int mg;
private int taskNumber;
public MyTask(int mg, int taskNumber) {
this.mg = mg;
this.taskNumber = taskNumber;
}
@Override
public void run() {
LOG.info("Doing task {} for message group {}", taskNumber, mg);
try {
Thread.sleep((long) (Math.random() * 500));
} catch (InterruptedException e) {
}
LOG.info("Done task {} for message group {}", taskNumber, mg);
}
}
}
希望对您有所帮助
在我的应用程序中,我有一个接收器,它应该处理具有多个线程的消息。发送方模块将消息放入栈中,接收方模块从栈中取出消息。一切都与两个线程一起工作(一个用于发送,一个用于接收)。现在我想接收多线程的消息。
消息有某种组号。我需要在一个线程中处理一组消息(消息组比线程多)。我该如何处理?
我想做这样的事情:
public class ReceiverExample {
@Autowired
Receiver receiver;
@Autowired
TaskExecutor executor;
public void receive(){
Msg msg = receiver.receive();
int index = msg.getIndex();
executor.execute(
//put index ? ,
msg
);
}
}
我正在使用 Spring 框架。我知道我必须使用某种执行者,但我找不到任何很好的例子来说明我的问题。
因此您需要确保具有相同 "message group id" 的两个 Runnable
永远不会 运行 并行。
"executor"接口没有"messageGroupId"的概念,所以你可能需要写另一个class来协调这个(不确定第三个中是否存在这样的东西派对图书馆或其他任何东西,也许有人可以 post 提出建议??)
下面的 class 为每个线程维护一个队列,并将确保具有相同消息组 ID 的任务始终在同一队列中。为这些队列中的每一个创建一个线程来处理任务......(它并不完美,但它可能是一个很好的起点)
package stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageGroupTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MessageGroupTaskExecutor.class);
private static final long POLL_TIME = 10;
private BlockingQueue<Runnable>[] queues;
private Thread[] workerThreads;
private volatile boolean stopped;
private ThreadFactory threadFactory;
public MessageGroupTaskExecutor(int numberOfThreads, ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.queues = new BlockingQueue[numberOfThreads];
this.workerThreads = new Thread[numberOfThreads];
}
public void init() {
for (int k = 0; k < workerThreads.length; k++) {
queues[k] = new LinkedBlockingQueue<Runnable>();
workerThreads[k] = threadFactory.newThread(new WorkerThread(queues[k]));
workerThreads[k].start();
}
}
/**
* Not a great impl - no way to "shutdown after all tasks finished" (at the moment...)
*/
public void shutdown() {
stopped = true;
for (Thread thread : workerThreads) {
try {
thread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting for thread to finish", e);
}
}
}
public void execute(int messageGroup, Runnable command) {
queues[messageGroup % queues.length].add(command);
LOG.debug("Submitted {} for message group {}", command, messageGroup);
}
private class WorkerThread implements Runnable {
private BlockingQueue<Runnable> queue;
public WorkerThread(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!stopped) {
Runnable nextJob = null;
try {
nextJob = queue.poll(POLL_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Interrupted when polling queue", e);
}
if (nextJob != null) {
try {
LOG.debug("Got job " + nextJob);
nextJob.run();
LOG.debug("Completed " + nextJob);
} catch (Exception ex) {
LOG.debug("Exception thrown by " + nextJob, ex);
}
}
}
}
}
}
如您所见,当通过 execute
方法提交作业时,消息组 ID 用于 select 将作业放在哪个队列上。
工作线程将处理这些队列。
这是使用它的基本 class...
package stack;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestIt {
private static final Logger LOG = LoggerFactory.getLogger(TestIt.class);
public static void main(String[] args) {
MessageGroupTaskExecutor exe = new MessageGroupTaskExecutor(5, Executors.defaultThreadFactory());
exe.init();
for (int k = 0; k < 100; k++) {
int messageGroup = (int) (Math.random() * 15.0);
exe.execute(messageGroup, new MyTask(messageGroup, k));
}
}
private static class MyTask implements Runnable {
private int mg;
private int taskNumber;
public MyTask(int mg, int taskNumber) {
this.mg = mg;
this.taskNumber = taskNumber;
}
@Override
public void run() {
LOG.info("Doing task {} for message group {}", taskNumber, mg);
try {
Thread.sleep((long) (Math.random() * 500));
} catch (InterruptedException e) {
}
LOG.info("Done task {} for message group {}", taskNumber, mg);
}
}
}
希望对您有所帮助