多个延迟消费者、并发句柄、BlockingQueue

Multiple Delayed Consumer, Concurrence handle, BlockingQueue

注意:我确实尝试过简化这段代码

我有多个进程(不同类型)由多个 Runnable 执行。

我试图用图表简化这种情况。

我有一个 RunnableProducer 随着时间的推移生产,生产的产品被转移到一个 RunnableWorker 执行一些操作 ProcessorDown蓝色箭头) 执行一个进程,并将其分发给它的接收者(class 的同类)。 如果RunnableWorker被标记(code不是null),它必须执行一个特殊类型的过程Processor和return它到"parent"RunnableWorker,谁转的。 也就是说,您的接收器收集了许多执行另一个额外的 ProcessorUp绿色箭头注意绿色箭头的数量。 初始的RunnableWorker将所有的数据(在同class的中介的帮助下)传输到RunnableConsumer而不混合它们,谁将执行另一个任务(对于这个问题,print).

RunnableProducer 只有在 RunnableConsumer 最终可以接收/收集所有生产的东西(由 RunnableWorker's 转移)时才应该生产。 RunnableProducer 可以单独关闭。 但是,RunnableConsumer 必须 运行 而 RunnableProducer 正在生产,直到他消耗完所有东西(及其变体)。

注意:您可以复制、粘贴、编译和 运行

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class TestCollectorRunnable1 {

  public static void main(String... args) {

    ExecutorService executorService = Executors.newCachedThreadPool();

    Counter counter = new Counter();
    LifeCycle rcLifeCycle = new LifeCycle();
    LifeCycle rpLifeCycle = new LifeCycle();

    RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, executorService, counter);
    RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, executorService, counter);
    RunnableWorker rw0 = new RunnableWorker("rw0", executorService, counter, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());

    RunnableWorker rw11 = new RunnableWorker("rw11", executorService, counter, null, rw0);
    RunnableWorker rw12 = new RunnableWorker("rw12", executorService, counter, null, rw0);
    rw0.addBlockingQueue(rw11.getInputBlockingQueue());
    rw0.addBlockingQueue(rw12.getInputBlockingQueue());

    RunnableWorker rw211 = new RunnableWorker("rw211", executorService, counter, 1, rw11);
    RunnableWorker rw212 = new RunnableWorker("rw212", executorService, counter, 2, rw11);
    RunnableWorker rw213 = new RunnableWorker("rw213", executorService, counter, 3, rw11);
    rw11.addBlockingQueue(rw211.getInputBlockingQueue());
    rw11.addBlockingQueue(rw212.getInputBlockingQueue());
    rw11.addBlockingQueue(rw213.getInputBlockingQueue());

    RunnableWorker rw221 = new RunnableWorker("rw221", executorService, counter, 4, rw12);
    RunnableWorker rw222 = new RunnableWorker("rw222", executorService, counter, 5, rw12);
    rw12.addBlockingQueue(rw221.getInputBlockingQueue());
    rw12.addBlockingQueue(rw222.getInputBlockingQueue());

    //Simulate Turn off
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
        rp.stop();
      }
    }, ThreadLocalRandom.current().nextLong(100L, 1000L));

  }

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
      sb.append(c);
    }
    return sb.toString();
  }

  public static class RunnableProducer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final ExecutorService executorService;
    private final Counter counter;
    private final int bufferSize;
    private final BlockingQueue<ChunkDTO> outBlockingQueue;
    private volatile boolean isRunning = false;

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
    }

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.executorService = executorService;
      this.counter = counter;
      this.bufferSize = 8;
      this.outBlockingQueue = outBlockingQueue;
      this.ownLifeCycle.setCreated(true);
      this.executorService.execute(this);
    }

    @Override
    public void run() {
      long quantity = 0;
      isRunning = true;

      //Blocking Wait (not very elegant) 
      /*
      block until the consumer can consume without losing what is produced and processed
       */
      while (!outLifeCycle.isRunning()) {
        try {
          Thread.sleep(10);
        } catch (Exception e) {
        }
      }

      while (/*isRunning*/quantity < 5) {
        ownLifeCycle.setRunning(true);
        try {
          byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
          ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
          outBlockingQueue.put(chunkDTO);
          System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
          int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
          Thread.sleep(timeSleeping);
        } catch (Exception e) {
        }
        quantity++;
        counter.setValue(quantity);
      }
      System.out.println(name + "\tSent:" + quantity);
    }

    public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
      return outBlockingQueue;
    }

    public void stop() {
      isRunning = false;
    }

  }

  public static class RunnableConsumer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final ExecutorService executorService;
    private final Counter counter;
    private final BlockingQueue<ChunkDTO> inBlockingQueue;

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
    }

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.executorService = executorService;
      this.counter = counter;
      this.inBlockingQueue = inBlockingQueue;
      this.ownLifeCycle.setCreated(true);
      this.executorService.execute(this);
    }

    @Override
    public void run() {
      if (inBlockingQueue != null) {
        try {
          int quantity = 0;
          while (!outLifeCycle.isCreated() || outLifeCycle.isRunning()/*haya recolectado lo que tiene que recolectar*/) {
            ownLifeCycle.setRunning(true);
            ChunkDTO chunkDTO = inBlockingQueue.take();
            System.out.println(name + ".Collected " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t pitch:" + chunkDTO.getPitch());
            quantity++;
          }
          System.out.println(name + "\tReceived:" + quantity);
        } catch (InterruptedException e) {
        }
      }
    }

    public BlockingQueue<ChunkDTO> getInBlockingQueue() {
      return inBlockingQueue;
    }

  }

  public static class RunnableWorker {

    private final ExecutorService executorService;
    private final RunnableWorker parent;
    private final BlockingQueue<ChunkDTO> inputBlockingQueue;
    private final BlockingQueue<ChunkDTO> outputBlockingQueue;
    private final List<BlockingQueue<ChunkDTO>> downList;
    private final List<BlockingQueue<ChunkDTO>> upList;
    private final Set<Integer> codes;

    public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
      this.executorService = executorService;
      this.parent = parent;
      this.inputBlockingQueue = inputBlockingQueue;
      this.outputBlockingQueue = outputBlockingQueue;
      this.downList = new ArrayList<>();
      this.upList = new ArrayList<>(Arrays.asList(new SynchronousQueue/*LinkedBlockingQueue*/<>()));
      this.codes = new HashSet<>();

      //RUNNABLE DISTRIBUTOR
      this.executorService.execute(() -> {
        if (inputBlockingQueue != null) {
          try {
            while (true) {
              ChunkDTO chunkDTO = inputBlockingQueue.take();
              /*
              if (codes.size() > 0) {
                System.out.println(name + " codes.length:" + codes.size());
              }
              if (parent == null) {
                System.out.println(name + ".Worked " + new String(chunkDTO.getChunk()) + "\tindex:" + chunkDTO.getIndex());
              }
              // */
              if (code == null) {
                new ProcessorDown(executorService, chunkDTO, downList);
              } else {
                ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
                System.out.println("\t\t" + name + ".Returned " + returned.toString());
                if (parent != null) {
                  new Processor(executorService, returned, parent.getUpList());
                  parent.addCodeSon(code);
                }
              }
            }
          } catch (Exception e) {
          }
        }
      });

      //RUNNABLE COLLECTOR
      if (code == null) {
        this.executorService.execute(() -> {
          int quantity = 0;
          while (quantity == 0) {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            if (outBlockingQueue != null) {
              try {
                while (quantity == 0 || (quantity > 0 && quantity < codes.size() * (counter.getValue()))) {
                  ChunkDTO chunkDTO = outBlockingQueue.take();
                  /*
                  System.out.println("\t" + name + ".quantity: " + quantity + ", codes.size():" + codes.size() + ", counter.getValue():" + counter.getValue() + ", total:" + (codes.size() * counter.getValue())
                      + "\r\t\tcchunk:" + chunkDTO
                      + "\r\t\tcodes:" + codes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
                  // */
                  if (chunkDTO != null) {
                    if (parent == null) {
                      outputBlockingQueue.put(chunkDTO);
                      System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
                    } else {
                      new ProcessorUp(executorService, chunkDTO, parent.getUpList());
                    }
                    quantity++;
                  }
                }
                /*
                if (quantity != 0) {
                  String codesString = codes.stream().map(i -> i.toString()).collect(Collectors.joining(","));
                  System.out.println("\t" + name + "\tWorked:" + quantity + ", \tcodes:" + codesString);
                }
                // */
              } catch (InterruptedException e) {
              }
            }
          }
        });
      }
    }

    public void addCodeSon(Integer code) {
      if (parent != null) {
        parent.addCodeSon(code);
      }
      codes.add(code);
    }

    public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent) {
      this(name, executorService, counter, code, parent, new SynchronousQueue/*LinkedBlockingQueue*/<>(), new SynchronousQueue/*LinkedBlockingQueue*/<>());
    }

    public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
      return inputBlockingQueue;
    }

    public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
      downList.add(blockingQueue);
    }

    public void delBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
      downList.remove(blockingQueue);
    }

    public List<BlockingQueue<ChunkDTO>> getUpList() {
      return upList;
    }

  }

  public static class Processor implements Runnable {

    private final ExecutorService executorService;
    private final List<BlockingQueue<ChunkDTO>> listOutput;
    private final ChunkDTO inChunkDTO;

    public Processor(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      this.executorService = executorService;
      this.listOutput = listOutput;
      this.inChunkDTO = inChunkDTO;
      this.executorService.execute(this);
    }

    @Override
    public void run() {
      if (inChunkDTO != null) {
        try {
          byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
          ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
          if (listOutput != null) {
            listOutput.forEach(output -> {
              try {
                output.put(outChunkDTO);
              } catch (Exception e) {
              }
            });
          }
        } catch (Exception e) {
        }
      }

    }
  }

  public static class ProcessorDown extends Processor {

    public ProcessorDown(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(executorService, inChunkDTO, listOutput);
    }
  }

  public static class ProcessorUp extends Processor {

    public ProcessorUp(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(executorService, inChunkDTO, listOutput);
    }
  }

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    try {
      Thread.sleep(10);
    } catch (InterruptedException e) {
    }
    return out;
  }

  public static class ChunkDTO {

    private final byte[] chunk;
    private final long index;
    private final Integer pitch;

    public ChunkDTO(byte[] chunk, long index, Integer pitch) {
      this.chunk = chunk;
      this.index = index;
      this.pitch = pitch;
    }

    public byte[] getChunk() {
      return chunk;
    }

    public long getIndex() {
      return index;
    }

    public Integer getPitch() {
      return pitch;
    }

    @Override
    public String toString() {
      return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
    }

  }

  public static class Counter {

    private final ReadWriteLock rwLock;
    private Long value;

    public Counter() {
      this.rwLock = new ReentrantReadWriteLock();
      this.value = 0L;
    }

    public Long getValue() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return value;
      } finally {
        readLock.unlock();
      }

    }

    public void setValue(Long value) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.value = value;
      } finally {
        writeLock.unlock();
      }
    }

  }

  public static class LifeCycle {

    private final ReadWriteLock rwLock;
    private boolean created;
    private boolean running;
    private boolean finished;

    public LifeCycle() {
      this.rwLock = new ReentrantReadWriteLock();
    }

    public boolean isCreated() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return created;
      } finally {
        readLock.unlock();
      }
    }

    public void setCreated(boolean created) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.created = created;
      } finally {
        writeLock.unlock();
      }
    }

    public boolean isRunning() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return running;
      } finally {
        readLock.unlock();
      }
    }

    public void setRunning(boolean running) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.running = running;
      } finally {
        writeLock.unlock();
      }
    }

    public boolean isFinished() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return finished;
      } finally {
        readLock.unlock();
      }
    }

    public void setFinished(boolean finished) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.finished = finished;
      } finally {
        writeLock.unlock();
      }
    }

  }

}

class ChunkDTO 包含数据、索引(位置)和代码( 以促进其被 RunnableConsumer)class化。

Counter class 以便控制 RunnableConsumer/RunnableWorker 应该期待什么。

如果RunnableProducer产生7个,有5个代码,RunnableConsumer最终应该收集到35个。 rw11 RunnableWorker 应该收集 3*7=21rw12 RunnableWorker 应该收集 2*7=14

LifeCycle class 是为了控制 LifeCycle Producer 和 Consumer 而创建的,我仍然没有 RunnableWorker 的逻辑。

RunnableWorker有两个Runnable,用来处理Transfer their children(//RUNNABLE DISTRIBUTOR) and parents(//RUNNABLE COLLECTOR).

输出

rp.Produced tpwqomrt     index:0
rw0.Worked tpwqomrt index:0
        rw221.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
        rw222.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
        rw212.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
        rw213.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
        rw211.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rp.Produced xwnlpkju     index:1
rw0 codes.length:5
rw0.Worked xwnlpkju index:1
rw11 codes.length:3
rw12 codes.length:2
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rc.Collected tpwqomrt    index:0     pitch:2
rc.Collected tpwqomrt    index:0     pitch:4
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rc.Collected tpwqomrt    index:0     pitch:1
rc.Collected tpwqomrt    index:0     pitch:3
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rc.Collected tpwqomrt    index:0     pitch:5
        rw212.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=2}
        rw221.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=4}
        rw222.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=5}
        rw213.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=3}
        rw211.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=1}
rp.Produced xmdfcmmo     index:2
rw0 codes.length:5
rw0.Worked xmdfcmmo index:2
rw12 codes.length:2
rw11 codes.length:3
        rw221.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=4}
        rw212.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=2}
        rw222.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=5}
        rw213.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=3}
        rw211.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=1}
rw0 codes.length:5
rp.Produced syqpyxuk     index:3
rw0.Worked syqpyxuk index:3
rw11 codes.length:3
rw0 codes.length:5
rw0.Worked linlkasp index:4
rp.Produced linlkasp     index:4
rw12 codes.length:2
        rw211.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=1}
        rw213.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=3}
        rw212.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=2}
rw11 codes.length:3
        rw222.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=5}
        rw221.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=4}
rw12 codes.length:2
        rw211.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=1}
        rw213.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=3}
        rw222.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=5}
        rw212.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=2}
        rw221.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=4}
rp  Sent:5

如您所见,Producer 发送了 5 个块,但我失去了大部分,我需要接收 25 个(对于此示例)。消费者只收集了很少的物品。 我的逻辑有什么问题?

我不知道创建两个 运行nable 是否是 RunnableWorker class 的一个很好的解决方案。 有没有更好的实现方式?

我知道我有一个可怕的方法来阻止生产者等待消费者。 您推荐什么解决方案?

我自己的回答

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TestCollectorRunnable7a {

  public static void main(String... args) {

    List<Future<?>> futureList = new ArrayList<>();
    ExecutorService executorService = Executors.newCachedThreadPool();

    Counter counter = new Counter();
    CodesList codesList = new CodesList();
    LifeCycle rcLifeCycle = new LifeCycle();
    LifeCycle rpLifeCycle = new LifeCycle();

    RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, futureList, executorService, counter, codesList);
    RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, futureList, executorService, counter);
    RunnableWorker rw0 = new RunnableWorker("rw0", rcLifeCycle, futureList, executorService, counter, codesList, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());

    RunnableWorker rw11 = new RunnableWorker("rw11", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
    RunnableWorker rw12 = new RunnableWorker("rw12", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
    rw0.addBlockingQueue(rw11.getInputBlockingQueue());
    rw0.addBlockingQueue(rw12.getInputBlockingQueue());

    RunnableWorker rw211 = new RunnableWorker("rw211", rcLifeCycle, futureList, executorService, counter, codesList, 1, rw11);
    RunnableWorker rw212 = new RunnableWorker("rw212", rcLifeCycle, futureList, executorService, counter, codesList, 2, rw11);
    RunnableWorker rw213 = new RunnableWorker("rw213", rcLifeCycle, futureList, executorService, counter, codesList, 3, rw11);
    rw11.addBlockingQueue(rw211.getInputBlockingQueue());
    rw11.addBlockingQueue(rw212.getInputBlockingQueue());
    rw11.addBlockingQueue(rw213.getInputBlockingQueue());

    RunnableWorker rw221 = new RunnableWorker("rw221", rcLifeCycle, futureList, executorService, counter, codesList, 4, rw12);
    RunnableWorker rw222 = new RunnableWorker("rw222", rcLifeCycle, futureList, executorService, counter, codesList, 5, rw12);
    rw12.addBlockingQueue(rw221.getInputBlockingQueue());
    rw12.addBlockingQueue(rw222.getInputBlockingQueue());

    //Simulate Turn off
    new Timer(false).schedule(new TimerTask() {
      @Override
      public void run() {
        rp.stop();
      }
    }, ThreadLocalRandom.current().nextLong(100L, 1000L));
  }

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
      sb.append(c);
    }
    return sb.toString();
  }

  public static class RunnableProducer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter counter;
    private final int bufferSize;
    private final BlockingQueue<ChunkDTO> outBlockingQueue;
    private volatile boolean isRunning = false;

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue<>());
    }

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.counter = counter;
      this.bufferSize = 8;
      this.outBlockingQueue = outBlockingQueue;
      this.ownLifeCycle.setCreated(true);
      this.futureList.add(this.executorService.submit(this));

    }

    @Override
    public void run() {
      long quantity = 0;
      while (!outLifeCycle.isRunning()) {
        try {
          Thread.sleep(100);
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        }
      }

      isRunning = true;
      ownLifeCycle.setRunning(true);
      while (isRunning) {
        try {
          byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
          ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
          outBlockingQueue.put(chunkDTO);
          System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
          int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
          Thread.sleep(timeSleeping);
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        }
        quantity++;
        counter.setValue(quantity);
      }
      ownLifeCycle.setRunning(false);
      System.out.println(name + "\tSent:" + quantity);
      ownLifeCycle.setFinished(true);
    }

    public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
      return outBlockingQueue;
    }

    public void stop() {
      isRunning = false;
    }

  }

  public static class RunnableConsumer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter counter;
    private final Counter intCounter;
    private final CodesList codesList;
    private final BlockingQueue<ChunkDTO> inBlockingQueue;

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList) {
      this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue/*SynchronousQueue LinkedBlockingQueue*/<>(), codesList);
    }

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue, CodesList codesList) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.counter = counter;
      this.inBlockingQueue = inBlockingQueue;
      this.intCounter = new Counter();
      this.codesList = codesList;
      this.ownLifeCycle.setCreated(true);

      this.futureList.add(this.executorService.submit(() -> {
        while (!this.outLifeCycle.isFinished() || intCounter.getValue() < counter.getValue() * codesList.size()) {
          try {
            Thread.sleep(100);
          } catch (InterruptedException ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          }
        }
        inBlockingQueue.add(new ChunkStopper(null, -1, null));
      }));
      this.futureList.add(this.executorService.submit(this));
    }

    @Override
    public void run() {
      if (inBlockingQueue != null) {
        try {
          long quantity = 0;
          ownLifeCycle.setRunning(true);
          while (true) {
            ChunkDTO chunkDTO = inBlockingQueue.take();
            if (chunkDTO instanceof ChunkStopper) {
              ownLifeCycle.setRunning(false);
              break;
            }
            System.out.println(name + ".Consumed " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t code:" + chunkDTO.getPitch() + ", \tquantity:" + quantity);
            quantity++;
            intCounter.setValue(quantity);
          }
          ownLifeCycle.setRunning(false);
          System.out.println(name + "\tReceived:" + quantity);
          ownLifeCycle.setFinished(true);
        } catch (InterruptedException ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        }
      }
    }

    public BlockingQueue<ChunkDTO> getInBlockingQueue() {
      return inBlockingQueue;
    }

  }

  public static class RunnableWorker {

    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter intCounter;
    private final CodesList codesList;
    private final RunnableWorker parent;
    private final BlockingQueue<ChunkDTO> inputBlockingQueue;
    private final BlockingQueue<ChunkDTO> outputBlockingQueue;
    private final List<BlockingQueue<ChunkDTO>> downList;
    private final List<BlockingQueue<ChunkDTO>> upList;
    private final Set<Integer> codes;

    public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
      this.ownLifeCycle = new LifeCycle();
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.intCounter = new Counter();
      this.codesList = codesList;
      this.parent = parent;
      this.inputBlockingQueue = inputBlockingQueue;
      this.outputBlockingQueue = outputBlockingQueue;
      this.downList = new ArrayList<>();
      this.upList = new ArrayList<>(Arrays.asList(new LinkedBlockingQueue<>()));
      this.codes = new HashSet<>();

      if (code != null) {
        this.codesList.addCode(code);
      }

      this.futureList.add(this.executorService.submit(() -> {
        while (!outLifeCycle.isFinished()) {
          try {
            Thread.sleep(200);
          } catch (InterruptedException ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          }
        }
        //System.out.println(name + " -> Consumer Finished!");
        while (true) {
          try {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            outBlockingQueue.add(new ChunkStopper(null, -1, null));
            break;
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          }
        }
        while (true) {
          try {
            inputBlockingQueue.add(new ChunkStopper(null, -1, null));
            break;
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          }
        }
      }));

      //RUNNABLE DISTRIBUTOR
      this.futureList.add(this.executorService.submit(() -> {
        long quantity = 0;
        if (inputBlockingQueue != null) {
          try {
            ownLifeCycle.setRunning(true);
            while (true) {
              ChunkDTO chunkDTO = inputBlockingQueue.take();
              if (chunkDTO instanceof ChunkStopper) {
                break;
              }
              if (code == null) {
                new ProcessorDown(futureList, executorService, chunkDTO, downList);
              } else {
                ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
                //System.out.println("\t\t" + name + ".Returned " + returned.toString());
                if (parent != null) {
                  new Processor(this.futureList, executorService, returned, parent.getUpList());
                  parent.addCodeSon(code);
                }
              }
              quantity++;
              intCounter.setValue(quantity);
            }
            ownLifeCycle.setRunning(false);
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          }
        }
        //System.out.println(name + ". DISTRIBUTOR Finished");
      }));

      //RUNNABLE COLLECTOR
      if (code == null) {
        this.futureList.add(this.executorService.submit(() -> {
          int quantity = 0;
          while (quantity == 0) {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            if (outBlockingQueue != null) {
              try {
                while (true) {
                  ChunkDTO chunkDTO = outBlockingQueue.take();
                  if (chunkDTO instanceof ChunkStopper) {
                    break;
                  }
                  if (parent == null) {
                    outputBlockingQueue.put(chunkDTO);
                    //System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
                  } else {
                    new ProcessorUp(futureList, executorService, chunkDTO, parent.getUpList());
                  }
                  quantity++;
                }
              } catch (InterruptedException ex) {
                Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
              }
            }
          }
          //System.out.println(name + ". COLLECTOR Finished");
        }));
      }
    }

    public LifeCycle getOwnLifeCycle() {
      return ownLifeCycle;
    }

    public void addCodeSon(Integer code) {
      if (parent != null) {
        parent.addCodeSon(code);
      }
      codes.add(code);
    }

    public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent) {
      this(name, outLifeCycle, futureList, executorService, counter, codesList, code, parent, new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>());
    }

    public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
      return inputBlockingQueue;
    }

    public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
      downList.add(blockingQueue);
    }

    public List<BlockingQueue<ChunkDTO>> getUpList() {
      return upList;
    }

  }

  public static class Processor implements Runnable {

    private final ExecutorService executorService;
    private final List<BlockingQueue<ChunkDTO>> listOutput;
    private final ChunkDTO inChunkDTO;

    public Processor(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      this.executorService = executorService;
      this.listOutput = listOutput;
      this.inChunkDTO = inChunkDTO;
      futureList.add(this.executorService.submit(this));
    }

    @Override
    public void run() {
      if (inChunkDTO != null) {
        try {
          byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
          ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
          if (listOutput != null) {
            listOutput.forEach(output -> {
              try {
                output.put(outChunkDTO);
              } catch (Exception ex) {
                Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
              }
            });
          }
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        }
      }

    }
  }

  public static class ProcessorDown extends Processor {

    public ProcessorDown(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(futureList, executorService, inChunkDTO, listOutput);
    }
  }

  public static class ProcessorUp extends Processor {

    public ProcessorUp(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(futureList, executorService, inChunkDTO, listOutput);
    }
  }

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    try {
      Thread.sleep(10);
    } catch (InterruptedException e) {
    }
    return out;
  }

  public static class ChunkStopper extends ChunkDTO {

    public ChunkStopper(byte[] chunk, long index, Integer pitch) {
      super(chunk, index, pitch);
    }
  }

  public static class ChunkDTO {

    private final byte[] chunk;
    private final long index;
    private final Integer pitch;

    public ChunkDTO(byte[] chunk, long index, Integer pitch) {
      this.chunk = chunk;
      this.index = index;
      this.pitch = pitch;
    }

    public byte[] getChunk() {
      return chunk;
    }

    public long getIndex() {
      return index;
    }

    public Integer getPitch() {
      return pitch;
    }

    @Override
    public String toString() {
      return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
    }

  }

  public static class Counter {

    private final ReadWriteLock rwLock;
    private Long value;

    public Counter() {
      this.rwLock = new ReentrantReadWriteLock();
      this.value = 0L;
    }

    public Long getValue() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return value;
      } finally {
        readLock.unlock();
      }

    }

    public void setValue(Long value) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.value = value;
      } finally {
        writeLock.unlock();
      }
    }

  }

  public static class CodesList {

    private final List<Integer> codes;
    private final ReadWriteLock rwLock;

    public CodesList() {
      this.codes = new ArrayList<>();
      this.rwLock = new ReentrantReadWriteLock();
    }

    public void addCode(Integer code) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();

      try {
        codes.add(code);
      } finally {
        writeLock.unlock();
      }
    }

    public int size() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return codes.size();
      } finally {
        readLock.unlock();
      }
    }

  }

  public static class LifeCycle {

    private final ReadWriteLock rwLock;
    private boolean created;
    private boolean running;
    private boolean finished;

    public LifeCycle() {
      this.rwLock = new ReentrantReadWriteLock();
    }

    public boolean isCreated() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return created;
      } finally {
        readLock.unlock();
      }
    }

    public void setCreated(boolean created) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.created = created;
      } finally {
        writeLock.unlock();
      }
    }

    public boolean isRunning() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return running;
      } finally {
        readLock.unlock();
      }
    }

    public void setRunning(boolean running) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.running = running;
      } finally {
        writeLock.unlock();
      }
    }

    public boolean isFinished() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return finished;
      } finally {
        readLock.unlock();
      }
    }

    public void setFinished(boolean finished) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.finished = finished;
      } finally {
        writeLock.unlock();
      }
    }

  }

}

我的测试

rp.Produced egxjthjr     index:0
rp.Produced pdiutqkt     index:1
rc.Consumed egxjthjr     index:0     code:2,    quantity:0
rc.Consumed egxjthjr     index:0     code:1,    quantity:1
rc.Consumed egxjthjr     index:0     code:5,    quantity:2
rc.Consumed egxjthjr     index:0     code:3,    quantity:3
rc.Consumed egxjthjr     index:0     code:4,    quantity:4
rc.Consumed pdiutqkt     index:1     code:4,    quantity:5
rp.Produced dwqtvoun     index:2
rc.Consumed pdiutqkt     index:1     code:2,    quantity:6
rc.Consumed pdiutqkt     index:1     code:5,    quantity:7
rc.Consumed pdiutqkt     index:1     code:1,    quantity:8
rc.Consumed pdiutqkt     index:1     code:3,    quantity:9
rp.Produced ydwqheks     index:3
rc.Consumed dwqtvoun     index:2     code:4,    quantity:10
rc.Consumed dwqtvoun     index:2     code:5,    quantity:11
rc.Consumed dwqtvoun     index:2     code:1,    quantity:12
rc.Consumed dwqtvoun     index:2     code:2,    quantity:13
rc.Consumed dwqtvoun     index:2     code:3,    quantity:14
rc.Consumed ydwqheks     index:3     code:1,    quantity:15
rc.Consumed ydwqheks     index:3     code:3,    quantity:16
rc.Consumed ydwqheks     index:3     code:2,    quantity:17
rc.Consumed ydwqheks     index:3     code:5,    quantity:18
rc.Consumed ydwqheks     index:3     code:4,    quantity:19
rp.Produced tamvejvq     index:4
rp.Produced tpqjkgqd     index:5
rc.Consumed tamvejvq     index:4     code:4,    quantity:20
rc.Consumed tamvejvq     index:4     code:5,    quantity:21
rc.Consumed tamvejvq     index:4     code:2,    quantity:22
rc.Consumed tamvejvq     index:4     code:3,    quantity:23
rc.Consumed tamvejvq     index:4     code:1,    quantity:24
rp.Produced quchekol     index:6
rc.Consumed tpqjkgqd     index:5     code:4,    quantity:25
rc.Consumed tpqjkgqd     index:5     code:2,    quantity:26
rc.Consumed tpqjkgqd     index:5     code:5,    quantity:27
rc.Consumed tpqjkgqd     index:5     code:3,    quantity:28
rc.Consumed tpqjkgqd     index:5     code:1,    quantity:29
rc.Consumed quchekol     index:6     code:4,    quantity:30
rc.Consumed quchekol     index:6     code:1,    quantity:31
rc.Consumed quchekol     index:6     code:5,    quantity:32
rc.Consumed quchekol     index:6     code:2,    quantity:33
rc.Consumed quchekol     index:6     code:3,    quantity:34
rp  Sent:7
rc  Received:35

另一个测试

rp.Produced iufalvxu     index:0
rp.Produced ammjynnm     index:1
rc.Consumed iufalvxu     index:0     code:4,    quantity:0
rc.Consumed iufalvxu     index:0     code:2,    quantity:1
rc.Consumed iufalvxu     index:0     code:1,    quantity:2
rc.Consumed iufalvxu     index:0     code:5,    quantity:3
rc.Consumed iufalvxu     index:0     code:3,    quantity:4
rc.Consumed ammjynnm     index:1     code:1,    quantity:5
rc.Consumed ammjynnm     index:1     code:3,    quantity:6
rc.Consumed ammjynnm     index:1     code:4,    quantity:7
rc.Consumed ammjynnm     index:1     code:5,    quantity:8
rc.Consumed ammjynnm     index:1     code:2,    quantity:9
rp.Produced clbecbge     index:2
rc.Consumed clbecbge     index:2     code:1,    quantity:10
rc.Consumed clbecbge     index:2     code:4,    quantity:11
rc.Consumed clbecbge     index:2     code:3,    quantity:12
rc.Consumed clbecbge     index:2     code:5,    quantity:13
rc.Consumed clbecbge     index:2     code:2,    quantity:14
rp.Produced sletiovo     index:3
rc.Consumed sletiovo     index:3     code:5,    quantity:15
rc.Consumed sletiovo     index:3     code:1,    quantity:16
rc.Consumed sletiovo     index:3     code:2,    quantity:17
rc.Consumed sletiovo     index:3     code:4,    quantity:18
rc.Consumed sletiovo     index:3     code:3,    quantity:19
rp  Sent:4
rc  Received:20