Java:具有多个队列的执行器服务

Java: Executor Service with multiple queues

要求:

  1. 我将消息分为不同的类型,例如 Type1, Type2 ... Type100
  2. 我想并行执行不同类型的消息。比方说在 10 个线程中,但是所有相同类型的消息必须一个接一个地执行。执行顺序无关紧要。
  3. 一旦线程完成 TypeX 的所有消息。它应该开始处理另一种类型。

我浏览了不同的答案: 他们中的大多数建议执行程序服务来处理多线程。 假设我们创建执行程序服务

ExecutorService executorService = Executors.newFixedThreadPool(10);

但是一旦我们使用 executorService.submit(runnableMessage);

提交消息

我们无法控制将特定类型的消息分配给特定线程。

解法:

创建单线程执行器数组

ExecutorService[] pools = new ExecutorService[10];

并初步传递Type1,Type2 ... Type10的消息 然后,如果任何执行程序已完成执行,则将 Type11 分配给它并继续这样做,直到所有类型都得到处理。

有什么更好的方法吗?

类似于具有多个队列的执行程序服务,我可以在其中将每种类型的消息推送到不同的队列?

我建议看一下 Akka。他们提供了一个更适合这种用例的 Actor 框架。除了定义您自己的 ExecutorService 接口实现之外,JDK 提供的默认实现并没有提供对调度的太多控制。

创建一组硬编码的 ExecutionServices 不会非常动态或健壮,尤其是因为每个 ExecutionService 都有一个线程池。可以用散列映射替换数组,然后将其放在 ExecutionService 的自定义实现之后,这样的优势是对调用者隐藏这些细节,但它不会解决拥有这么多线程池的线程浪费问题。

在 Akka 中,每个 Actor 都有自己的与之关联的消息队列。每个 Actor 在其自己的线程中有效 运行s,一次处理其队列中的每条消息。 Akka 将管理跨多个 Actor 的线程共享。因此,如果您要为每种消息类型创建一个 Actor,然后将这些消息与这些 Actor 一起排队,那么您将达到这样的目标,即每种消息类型一次最多由一个线程处理,同时仅由一个池支持线程数。

技术演示:

Maven 对 Akka 的依赖。

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.4.17</version>
    </dependency>

Java8个代码。复制并粘贴到 Java 文件中,然后 运行 您的 IDE.

中的主要方法
package com.softwaremosaic.demos.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


public class ActorDemo {

    public static void main( String[] args ) throws InterruptedException {
        // The following partitioner will spread the requests over
        // multiple actors, which I chose to demonstrate the technique.
        // You will need to change it to one that better maps the the
        // jobs to your use case.   Remember that jobs that get mapped
        // to the same key, will get executed in serial (probably
        // but not necessarily) by the same thread.
        ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );

        for ( int i=0; i<100; i++ ) {
            int id = i;
            exectorService.submit( () -> System.out.println("JOB " + id) );
        }

        exectorService.shutdown();
        exectorService.awaitTermination( 1, TimeUnit.MINUTES );

        System.out.println( "DONE" );
    }

}


class ActorExecutionService extends AbstractExecutorService {

    private final ActorSystem                              actorSystem;
    private final Function<Runnable, String>               partitioner;
    private final ConcurrentHashMap<String,ActorRef>       actors = new ConcurrentHashMap<>();

    public ActorExecutionService( Function<Runnable,String> partitioner ) {
        this.actorSystem = ActorSystem.create("demo");
        this.partitioner = partitioner;
    }


    public void execute( Runnable command ) {
        String partitionKey = partitioner.apply( command );

        ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );

        actorRef.tell( command, actorRef );
    }

    private ActorRef createNewActor( String partitionKey ) {
        return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
    }


    public void shutdown() {
        actorSystem.terminate();
    }

    public List<Runnable> shutdownNow() {
        actorSystem.terminate();

        try {
            awaitTermination( 1, TimeUnit.MINUTES );
        } catch ( InterruptedException e ) {
            throw new RuntimeException( e );
        }

        return Collections.emptyList();
    }

    public boolean isShutdown() {
        return actorSystem.isTerminated();
    }

    public boolean isTerminated() {
        return actorSystem.isTerminated();
    }

    public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
        actorSystem.awaitTermination();

        return actorSystem.isTerminated();
    }
}

 class ExecutionServiceActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof Runnable) {
            ((Runnable) message).run();
        } else {
            unhandled(message);
        }
    }
}

注意上面的代码将以未定义的顺序打印 1-100。由于批处理(Akka 这样做是为了获得额外的性能优势),订单看起来主要是串行的。但是,随着不同线程穿插在工作中,您会看到数字有些随机性。每个作业花费 运行 的时间越长,分配给 Akka 线程池的线程越多,使用的分区键越多,底层 CPU 核心越多,序列可能变得越随机。

这是我的非常基本的示例,说明它的外观。 您创建了一个包含 10 个 ArrayDeques 的 Map,它们的地址是 "Typ"。 您还启动了 10 个 ScheduledExecutors。 每个最初等待 5 秒,然后每 200 毫秒轮询一次其队列。 在当前示例中,输出将始终为 "current message queue of TypeX: null",因为队列都是空的。

但是您现在可以启动它并将您的消息传递到匹配的队列中。该服务将每 200 毫秒获取一次,并用它做任何你想做的事。 当您使用队列时,消息的处理方式也会自动排序。

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Messages {

    public static void main(String[] args) {

        Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        long initialDelay = 5000;
        long period = 200;

        // create 10 Queues, indexed by the type
        // create 10 executor-services, focused on their message queue
        for(int i=1; i<11; i++) {
            String type = "Type" + i;

            Runnable task = () -> System.out.println(
                     "current message of " + type + ": " + messages.get(type).poll()
            );

            messages.put(type, new ArrayDeque<String>());
            service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
        }

    }
}

一个更简单的解决方案可能是:

而不是让每条消息都可以运行。 我们可以根据类型创建群消息:

例如我们为 type1

的所有消息创建 Group1
class MessageGroup implements Runnable {
    String type;
    String List<Message> messageList;

    @Override
    public void run() {
      for(Message message : MessageList) {
         message.process();
      }
    }
} 

并且我们可以创建具有固定线程的常规执行程序服务,例如

ExecutorService executorService = Executors.newFixedThreadPool(10); 

我们可以提交一组消息,例如

,而不是提交单个消息
executorService.submit(runnableGroup);

并且每组会在同一个线程中依次执行同类型的消息。

每种消息类型保留一个执行程序服务

是的,您在问题末尾提到的一系列执行程序服务是正确的想法。

但让我们让它变得更易于管理Map

Map < MessageType , ExecutorService >

让我们为您的消息类型定义一个枚举。

enum MessageType { Type01, Type02, Type03 } 

还有地图。

Map < MessageType , ExecutorService > messageTypeExecutorMap =  new EnumMap<>( MessageType.class ) ;

使用每种消息类型的执行程序服务填充地图。您希望每个消息类型集合一个一个地处理,因此使用 single-threaded 执行程序服务。

for( MessageType messageType : MessageType.values )
{
    messageTypeExecutorMap.put( messageType , Executors.newSingleThreadExecutor() ) ;
}

要提交消息进行处理,请按消息类型检索执行程序服务。

messageTypeExecutorMap
.get( task.getMessageType() )
.submit( task ) ;

保持多余的执行程序服务并准备工作并没有错。如果它们不执行任务,则它们在内存或 CPU 方面的开销很小。只需确保您的部署机器上有足够的核心来支持您希望同时执行的最大数量的消息类型。