
Multiple Delayed Consumer, Concurrence handle, BlockingQueue


我有多个进程(不同类型)由多个 Runnable 执行。


我有一个 RunnableProducer 随着时间的推移生产,生产的产品被转移到一个 RunnableWorker 执行一些操作 ProcessorDown蓝色箭头) 执行一个进程,并将其分发给它的接收者(class 的同类)。 如果RunnableWorker被标记(code不是null),它必须执行一个特殊类型的过程Processor和return它到"parent"RunnableWorker,谁转的。 也就是说,您的接收器收集了许多执行另一个额外的 ProcessorUp绿色箭头注意绿色箭头的数量。 初始的RunnableWorker将所有的数据(在同class的中介的帮助下)传输到RunnableConsumer而不混合它们,谁将执行另一个任务(对于这个问题,print).

RunnableProducer 只有在 RunnableConsumer 最终可以接收/收集所有生产的东西(由 RunnableWorker's 转移)时才应该生产。 RunnableProducer 可以单独关闭。 但是,RunnableConsumer 必须 运行 而 RunnableProducer 正在生产,直到他消耗完所有东西(及其变体)。

注意:您可以复制、粘贴、编译和 运行

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestCollectorRunnable1 {

  public static void main(String... args) {

    ExecutorService executorService = Executors.newCachedThreadPool();

    Counter counter = new Counter();
    LifeCycle rcLifeCycle = new LifeCycle();
    LifeCycle rpLifeCycle = new LifeCycle();

    RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, executorService, counter);
    RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, executorService, counter);
    RunnableWorker rw0 = new RunnableWorker("rw0", executorService, counter, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());

    RunnableWorker rw11 = new RunnableWorker("rw11", executorService, counter, null, rw0);
    RunnableWorker rw12 = new RunnableWorker("rw12", executorService, counter, null, rw0);

    RunnableWorker rw211 = new RunnableWorker("rw211", executorService, counter, 1, rw11);
    RunnableWorker rw212 = new RunnableWorker("rw212", executorService, counter, 2, rw11);
    RunnableWorker rw213 = new RunnableWorker("rw213", executorService, counter, 3, rw11);

    RunnableWorker rw221 = new RunnableWorker("rw221", executorService, counter, 4, rw12);
    RunnableWorker rw222 = new RunnableWorker("rw222", executorService, counter, 5, rw12);

    //Simulate Turn off
    new Timer().schedule(new TimerTask() {
      public void run() {
    }, ThreadLocalRandom.current().nextLong(100L, 1000L));


  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
    return sb.toString();

  public static class RunnableProducer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final ExecutorService executorService;
    private final Counter counter;
    private final int bufferSize;
    private final BlockingQueue<ChunkDTO> outBlockingQueue;
    private volatile boolean isRunning = false;

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.executorService = executorService;
      this.counter = counter;
      this.bufferSize = 8;
      this.outBlockingQueue = outBlockingQueue;

    public void run() {
      long quantity = 0;
      isRunning = true;

      //Blocking Wait (not very elegant) 
      block until the consumer can consume without losing what is produced and processed
      while (!outLifeCycle.isRunning()) {
        try {
        } catch (Exception e) {

      while (/*isRunning*/quantity < 5) {
        try {
          byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
          ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
          System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
          int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
        } catch (Exception e) {
      System.out.println(name + "\tSent:" + quantity);

    public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
      return outBlockingQueue;

    public void stop() {
      isRunning = false;


  public static class RunnableConsumer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final ExecutorService executorService;
    private final Counter counter;
    private final BlockingQueue<ChunkDTO> inBlockingQueue;

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.executorService = executorService;
      this.counter = counter;
      this.inBlockingQueue = inBlockingQueue;

    public void run() {
      if (inBlockingQueue != null) {
        try {
          int quantity = 0;
          while (!outLifeCycle.isCreated() || outLifeCycle.isRunning()/*haya recolectado lo que tiene que recolectar*/) {
            ChunkDTO chunkDTO = inBlockingQueue.take();
            System.out.println(name + ".Collected " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t pitch:" + chunkDTO.getPitch());
          System.out.println(name + "\tReceived:" + quantity);
        } catch (InterruptedException e) {

    public BlockingQueue<ChunkDTO> getInBlockingQueue() {
      return inBlockingQueue;


  public static class RunnableWorker {

    private final ExecutorService executorService;
    private final RunnableWorker parent;
    private final BlockingQueue<ChunkDTO> inputBlockingQueue;
    private final BlockingQueue<ChunkDTO> outputBlockingQueue;
    private final List<BlockingQueue<ChunkDTO>> downList;
    private final List<BlockingQueue<ChunkDTO>> upList;
    private final Set<Integer> codes;

    public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
      this.executorService = executorService;
      this.parent = parent;
      this.inputBlockingQueue = inputBlockingQueue;
      this.outputBlockingQueue = outputBlockingQueue;
      this.downList = new ArrayList<>();
      this.upList = new ArrayList<>(Arrays.asList(new SynchronousQueue/*LinkedBlockingQueue*/<>()));
      this.codes = new HashSet<>();

      this.executorService.execute(() -> {
        if (inputBlockingQueue != null) {
          try {
            while (true) {
              ChunkDTO chunkDTO = inputBlockingQueue.take();
              if (codes.size() > 0) {
                System.out.println(name + " codes.length:" + codes.size());
              if (parent == null) {
                System.out.println(name + ".Worked " + new String(chunkDTO.getChunk()) + "\tindex:" + chunkDTO.getIndex());
              // */
              if (code == null) {
                new ProcessorDown(executorService, chunkDTO, downList);
              } else {
                ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
                System.out.println("\t\t" + name + ".Returned " + returned.toString());
                if (parent != null) {
                  new Processor(executorService, returned, parent.getUpList());
          } catch (Exception e) {

      if (code == null) {
        this.executorService.execute(() -> {
          int quantity = 0;
          while (quantity == 0) {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            if (outBlockingQueue != null) {
              try {
                while (quantity == 0 || (quantity > 0 && quantity < codes.size() * (counter.getValue()))) {
                  ChunkDTO chunkDTO = outBlockingQueue.take();
                  System.out.println("\t" + name + ".quantity: " + quantity + ", codes.size():" + codes.size() + ", counter.getValue():" + counter.getValue() + ", total:" + (codes.size() * counter.getValue())
                      + "\r\t\tcchunk:" + chunkDTO
                      + "\r\t\tcodes:" + codes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
                  // */
                  if (chunkDTO != null) {
                    if (parent == null) {
                      System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
                    } else {
                      new ProcessorUp(executorService, chunkDTO, parent.getUpList());
                if (quantity != 0) {
                  String codesString = codes.stream().map(i -> i.toString()).collect(Collectors.joining(","));
                  System.out.println("\t" + name + "\tWorked:" + quantity + ", \tcodes:" + codesString);
                // */
              } catch (InterruptedException e) {

    public void addCodeSon(Integer code) {
      if (parent != null) {

    public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent) {
      this(name, executorService, counter, code, parent, new SynchronousQueue/*LinkedBlockingQueue*/<>(), new SynchronousQueue/*LinkedBlockingQueue*/<>());

    public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
      return inputBlockingQueue;

    public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {

    public void delBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {

    public List<BlockingQueue<ChunkDTO>> getUpList() {
      return upList;


  public static class Processor implements Runnable {

    private final ExecutorService executorService;
    private final List<BlockingQueue<ChunkDTO>> listOutput;
    private final ChunkDTO inChunkDTO;

    public Processor(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      this.executorService = executorService;
      this.listOutput = listOutput;
      this.inChunkDTO = inChunkDTO;

    public void run() {
      if (inChunkDTO != null) {
        try {
          byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
          ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
          if (listOutput != null) {
            listOutput.forEach(output -> {
              try {
              } catch (Exception e) {
        } catch (Exception e) {


  public static class ProcessorDown extends Processor {

    public ProcessorDown(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(executorService, inChunkDTO, listOutput);

  public static class ProcessorUp extends Processor {

    public ProcessorUp(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(executorService, inChunkDTO, listOutput);

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    try {
    } catch (InterruptedException e) {
    return out;

  public static class ChunkDTO {

    private final byte[] chunk;
    private final long index;
    private final Integer pitch;

    public ChunkDTO(byte[] chunk, long index, Integer pitch) {
      this.chunk = chunk;
      this.index = index;
      this.pitch = pitch;

    public byte[] getChunk() {
      return chunk;

    public long getIndex() {
      return index;

    public Integer getPitch() {
      return pitch;

    public String toString() {
      return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';


  public static class Counter {

    private final ReadWriteLock rwLock;
    private Long value;

    public Counter() {
      this.rwLock = new ReentrantReadWriteLock();
      this.value = 0L;

    public Long getValue() {
      Lock readLock = rwLock.readLock();
      try {
        return value;
      } finally {


    public void setValue(Long value) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.value = value;
      } finally {


  public static class LifeCycle {

    private final ReadWriteLock rwLock;
    private boolean created;
    private boolean running;
    private boolean finished;

    public LifeCycle() {
      this.rwLock = new ReentrantReadWriteLock();

    public boolean isCreated() {
      Lock readLock = rwLock.readLock();
      try {
        return created;
      } finally {

    public void setCreated(boolean created) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.created = created;
      } finally {

    public boolean isRunning() {
      Lock readLock = rwLock.readLock();
      try {
        return running;
      } finally {

    public void setRunning(boolean running) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.running = running;
      } finally {

    public boolean isFinished() {
      Lock readLock = rwLock.readLock();
      try {
        return finished;
      } finally {

    public void setFinished(boolean finished) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.finished = finished;
      } finally {



class ChunkDTO 包含数据、索引(位置)和代码( 以促进其被 RunnableConsumer)class化。

Counter class 以便控制 RunnableConsumer/RunnableWorker 应该期待什么。

如果RunnableProducer产生7个,有5个代码,RunnableConsumer最终应该收集到35个。 rw11 RunnableWorker 应该收集 3*7=21rw12 RunnableWorker 应该收集 2*7=14

LifeCycle class 是为了控制 LifeCycle Producer 和 Consumer 而创建的,我仍然没有 RunnableWorker 的逻辑。

RunnableWorker有两个Runnable,用来处理Transfer their children(//RUNNABLE DISTRIBUTOR) and parents(//RUNNABLE COLLECTOR).


rp.Produced tpwqomrt     index:0
rw0.Worked tpwqomrt index:0
        rw221.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
        rw222.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
        rw212.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
        rw213.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
        rw211.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rp.Produced xwnlpkju     index:1
rw0 codes.length:5
rw0.Worked xwnlpkju index:1
rw11 codes.length:3
rw12 codes.length:2
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rc.Collected tpwqomrt    index:0     pitch:2
rc.Collected tpwqomrt    index:0     pitch:4
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rc.Collected tpwqomrt    index:0     pitch:1
rc.Collected tpwqomrt    index:0     pitch:3
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
        rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rc.Collected tpwqomrt    index:0     pitch:5
        rw212.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=2}
        rw221.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=4}
        rw222.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=5}
        rw213.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=3}
        rw211.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=1}
rp.Produced xmdfcmmo     index:2
rw0 codes.length:5
rw0.Worked xmdfcmmo index:2
rw12 codes.length:2
rw11 codes.length:3
        rw221.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=4}
        rw212.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=2}
        rw222.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=5}
        rw213.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=3}
        rw211.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=1}
rw0 codes.length:5
rp.Produced syqpyxuk     index:3
rw0.Worked syqpyxuk index:3
rw11 codes.length:3
rw0 codes.length:5
rw0.Worked linlkasp index:4
rp.Produced linlkasp     index:4
rw12 codes.length:2
        rw211.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=1}
        rw213.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=3}
        rw212.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=2}
rw11 codes.length:3
        rw222.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=5}
        rw221.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=4}
rw12 codes.length:2
        rw211.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=1}
        rw213.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=3}
        rw222.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=5}
        rw212.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=2}
        rw221.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=4}
rp  Sent:5

如您所见,Producer 发送了 5 个块,但我失去了大部分,我需要接收 25 个(对于此示例)。消费者只收集了很少的物品。 我的逻辑有什么问题?

我不知道创建两个 运行nable 是否是 RunnableWorker class 的一个很好的解决方案。 有没有更好的实现方式?

我知道我有一个可怕的方法来阻止生产者等待消费者。 您推荐什么解决方案?


import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TestCollectorRunnable7a {

  public static void main(String... args) {

    List<Future<?>> futureList = new ArrayList<>();
    ExecutorService executorService = Executors.newCachedThreadPool();

    Counter counter = new Counter();
    CodesList codesList = new CodesList();
    LifeCycle rcLifeCycle = new LifeCycle();
    LifeCycle rpLifeCycle = new LifeCycle();

    RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, futureList, executorService, counter, codesList);
    RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, futureList, executorService, counter);
    RunnableWorker rw0 = new RunnableWorker("rw0", rcLifeCycle, futureList, executorService, counter, codesList, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());

    RunnableWorker rw11 = new RunnableWorker("rw11", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
    RunnableWorker rw12 = new RunnableWorker("rw12", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);

    RunnableWorker rw211 = new RunnableWorker("rw211", rcLifeCycle, futureList, executorService, counter, codesList, 1, rw11);
    RunnableWorker rw212 = new RunnableWorker("rw212", rcLifeCycle, futureList, executorService, counter, codesList, 2, rw11);
    RunnableWorker rw213 = new RunnableWorker("rw213", rcLifeCycle, futureList, executorService, counter, codesList, 3, rw11);

    RunnableWorker rw221 = new RunnableWorker("rw221", rcLifeCycle, futureList, executorService, counter, codesList, 4, rw12);
    RunnableWorker rw222 = new RunnableWorker("rw222", rcLifeCycle, futureList, executorService, counter, codesList, 5, rw12);

    //Simulate Turn off
    new Timer(false).schedule(new TimerTask() {
      public void run() {
    }, ThreadLocalRandom.current().nextLong(100L, 1000L));

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
    return sb.toString();

  public static class RunnableProducer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter counter;
    private final int bufferSize;
    private final BlockingQueue<ChunkDTO> outBlockingQueue;
    private volatile boolean isRunning = false;

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter) {
      this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue<>());

    public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.counter = counter;
      this.bufferSize = 8;
      this.outBlockingQueue = outBlockingQueue;


    public void run() {
      long quantity = 0;
      while (!outLifeCycle.isRunning()) {
        try {
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);

      isRunning = true;
      while (isRunning) {
        try {
          byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
          ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
          System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
          int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
      System.out.println(name + "\tSent:" + quantity);

    public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
      return outBlockingQueue;

    public void stop() {
      isRunning = false;


  public static class RunnableConsumer implements Runnable {

    private final String name;
    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter counter;
    private final Counter intCounter;
    private final CodesList codesList;
    private final BlockingQueue<ChunkDTO> inBlockingQueue;

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList) {
      this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue/*SynchronousQueue LinkedBlockingQueue*/<>(), codesList);

    public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue, CodesList codesList) {
      this.name = name;
      this.ownLifeCycle = ownLifeCycle;
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.counter = counter;
      this.inBlockingQueue = inBlockingQueue;
      this.intCounter = new Counter();
      this.codesList = codesList;

      this.futureList.add(this.executorService.submit(() -> {
        while (!this.outLifeCycle.isFinished() || intCounter.getValue() < counter.getValue() * codesList.size()) {
          try {
          } catch (InterruptedException ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        inBlockingQueue.add(new ChunkStopper(null, -1, null));

    public void run() {
      if (inBlockingQueue != null) {
        try {
          long quantity = 0;
          while (true) {
            ChunkDTO chunkDTO = inBlockingQueue.take();
            if (chunkDTO instanceof ChunkStopper) {
            System.out.println(name + ".Consumed " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t code:" + chunkDTO.getPitch() + ", \tquantity:" + quantity);
          System.out.println(name + "\tReceived:" + quantity);
        } catch (InterruptedException ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);

    public BlockingQueue<ChunkDTO> getInBlockingQueue() {
      return inBlockingQueue;


  public static class RunnableWorker {

    private final LifeCycle ownLifeCycle;
    private final LifeCycle outLifeCycle;
    private final List<Future<?>> futureList;
    private final ExecutorService executorService;
    private final Counter intCounter;
    private final CodesList codesList;
    private final RunnableWorker parent;
    private final BlockingQueue<ChunkDTO> inputBlockingQueue;
    private final BlockingQueue<ChunkDTO> outputBlockingQueue;
    private final List<BlockingQueue<ChunkDTO>> downList;
    private final List<BlockingQueue<ChunkDTO>> upList;
    private final Set<Integer> codes;

    public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
      this.ownLifeCycle = new LifeCycle();
      this.outLifeCycle = outLifeCycle;
      this.futureList = futureList;
      this.executorService = executorService;
      this.intCounter = new Counter();
      this.codesList = codesList;
      this.parent = parent;
      this.inputBlockingQueue = inputBlockingQueue;
      this.outputBlockingQueue = outputBlockingQueue;
      this.downList = new ArrayList<>();
      this.upList = new ArrayList<>(Arrays.asList(new LinkedBlockingQueue<>()));
      this.codes = new HashSet<>();

      if (code != null) {

      this.futureList.add(this.executorService.submit(() -> {
        while (!outLifeCycle.isFinished()) {
          try {
          } catch (InterruptedException ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        //System.out.println(name + " -> Consumer Finished!");
        while (true) {
          try {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            outBlockingQueue.add(new ChunkStopper(null, -1, null));
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        while (true) {
          try {
            inputBlockingQueue.add(new ChunkStopper(null, -1, null));
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);

      this.futureList.add(this.executorService.submit(() -> {
        long quantity = 0;
        if (inputBlockingQueue != null) {
          try {
            while (true) {
              ChunkDTO chunkDTO = inputBlockingQueue.take();
              if (chunkDTO instanceof ChunkStopper) {
              if (code == null) {
                new ProcessorDown(futureList, executorService, chunkDTO, downList);
              } else {
                ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
                //System.out.println("\t\t" + name + ".Returned " + returned.toString());
                if (parent != null) {
                  new Processor(this.futureList, executorService, returned, parent.getUpList());
          } catch (Exception ex) {
            Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        //System.out.println(name + ". DISTRIBUTOR Finished");

      if (code == null) {
        this.futureList.add(this.executorService.submit(() -> {
          int quantity = 0;
          while (quantity == 0) {
            BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
            if (outBlockingQueue != null) {
              try {
                while (true) {
                  ChunkDTO chunkDTO = outBlockingQueue.take();
                  if (chunkDTO instanceof ChunkStopper) {
                  if (parent == null) {
                    //System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
                  } else {
                    new ProcessorUp(futureList, executorService, chunkDTO, parent.getUpList());
              } catch (InterruptedException ex) {
                Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
          //System.out.println(name + ". COLLECTOR Finished");

    public LifeCycle getOwnLifeCycle() {
      return ownLifeCycle;

    public void addCodeSon(Integer code) {
      if (parent != null) {

    public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent) {
      this(name, outLifeCycle, futureList, executorService, counter, codesList, code, parent, new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>());

    public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
      return inputBlockingQueue;

    public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {

    public List<BlockingQueue<ChunkDTO>> getUpList() {
      return upList;


  public static class Processor implements Runnable {

    private final ExecutorService executorService;
    private final List<BlockingQueue<ChunkDTO>> listOutput;
    private final ChunkDTO inChunkDTO;

    public Processor(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      this.executorService = executorService;
      this.listOutput = listOutput;
      this.inChunkDTO = inChunkDTO;

    public void run() {
      if (inChunkDTO != null) {
        try {
          byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
          ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
          if (listOutput != null) {
            listOutput.forEach(output -> {
              try {
              } catch (Exception ex) {
                Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
        } catch (Exception ex) {
          Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);


  public static class ProcessorDown extends Processor {

    public ProcessorDown(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(futureList, executorService, inChunkDTO, listOutput);

  public static class ProcessorUp extends Processor {

    public ProcessorUp(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
      super(futureList, executorService, inChunkDTO, listOutput);

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    try {
    } catch (InterruptedException e) {
    return out;

  public static class ChunkStopper extends ChunkDTO {

    public ChunkStopper(byte[] chunk, long index, Integer pitch) {
      super(chunk, index, pitch);

  public static class ChunkDTO {

    private final byte[] chunk;
    private final long index;
    private final Integer pitch;

    public ChunkDTO(byte[] chunk, long index, Integer pitch) {
      this.chunk = chunk;
      this.index = index;
      this.pitch = pitch;

    public byte[] getChunk() {
      return chunk;

    public long getIndex() {
      return index;

    public Integer getPitch() {
      return pitch;

    public String toString() {
      return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';


  public static class Counter {

    private final ReadWriteLock rwLock;
    private Long value;

    public Counter() {
      this.rwLock = new ReentrantReadWriteLock();
      this.value = 0L;

    public Long getValue() {
      Lock readLock = rwLock.readLock();
      try {
        return value;
      } finally {


    public void setValue(Long value) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.value = value;
      } finally {


  public static class CodesList {

    private final List<Integer> codes;
    private final ReadWriteLock rwLock;

    public CodesList() {
      this.codes = new ArrayList<>();
      this.rwLock = new ReentrantReadWriteLock();

    public void addCode(Integer code) {
      Lock writeLock = rwLock.writeLock();

      try {
      } finally {

    public int size() {
      Lock readLock = rwLock.readLock();
      try {
        return codes.size();
      } finally {


  public static class LifeCycle {

    private final ReadWriteLock rwLock;
    private boolean created;
    private boolean running;
    private boolean finished;

    public LifeCycle() {
      this.rwLock = new ReentrantReadWriteLock();

    public boolean isCreated() {
      Lock readLock = rwLock.readLock();
      try {
        return created;
      } finally {

    public void setCreated(boolean created) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.created = created;
      } finally {

    public boolean isRunning() {
      Lock readLock = rwLock.readLock();
      try {
        return running;
      } finally {

    public void setRunning(boolean running) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.running = running;
      } finally {

    public boolean isFinished() {
      Lock readLock = rwLock.readLock();
      try {
        return finished;
      } finally {

    public void setFinished(boolean finished) {
      Lock writeLock = rwLock.writeLock();
      try {
        this.finished = finished;
      } finally {




rp.Produced egxjthjr     index:0
rp.Produced pdiutqkt     index:1
rc.Consumed egxjthjr     index:0     code:2,    quantity:0
rc.Consumed egxjthjr     index:0     code:1,    quantity:1
rc.Consumed egxjthjr     index:0     code:5,    quantity:2
rc.Consumed egxjthjr     index:0     code:3,    quantity:3
rc.Consumed egxjthjr     index:0     code:4,    quantity:4
rc.Consumed pdiutqkt     index:1     code:4,    quantity:5
rp.Produced dwqtvoun     index:2
rc.Consumed pdiutqkt     index:1     code:2,    quantity:6
rc.Consumed pdiutqkt     index:1     code:5,    quantity:7
rc.Consumed pdiutqkt     index:1     code:1,    quantity:8
rc.Consumed pdiutqkt     index:1     code:3,    quantity:9
rp.Produced ydwqheks     index:3
rc.Consumed dwqtvoun     index:2     code:4,    quantity:10
rc.Consumed dwqtvoun     index:2     code:5,    quantity:11
rc.Consumed dwqtvoun     index:2     code:1,    quantity:12
rc.Consumed dwqtvoun     index:2     code:2,    quantity:13
rc.Consumed dwqtvoun     index:2     code:3,    quantity:14
rc.Consumed ydwqheks     index:3     code:1,    quantity:15
rc.Consumed ydwqheks     index:3     code:3,    quantity:16
rc.Consumed ydwqheks     index:3     code:2,    quantity:17
rc.Consumed ydwqheks     index:3     code:5,    quantity:18
rc.Consumed ydwqheks     index:3     code:4,    quantity:19
rp.Produced tamvejvq     index:4
rp.Produced tpqjkgqd     index:5
rc.Consumed tamvejvq     index:4     code:4,    quantity:20
rc.Consumed tamvejvq     index:4     code:5,    quantity:21
rc.Consumed tamvejvq     index:4     code:2,    quantity:22
rc.Consumed tamvejvq     index:4     code:3,    quantity:23
rc.Consumed tamvejvq     index:4     code:1,    quantity:24
rp.Produced quchekol     index:6
rc.Consumed tpqjkgqd     index:5     code:4,    quantity:25
rc.Consumed tpqjkgqd     index:5     code:2,    quantity:26
rc.Consumed tpqjkgqd     index:5     code:5,    quantity:27
rc.Consumed tpqjkgqd     index:5     code:3,    quantity:28
rc.Consumed tpqjkgqd     index:5     code:1,    quantity:29
rc.Consumed quchekol     index:6     code:4,    quantity:30
rc.Consumed quchekol     index:6     code:1,    quantity:31
rc.Consumed quchekol     index:6     code:5,    quantity:32
rc.Consumed quchekol     index:6     code:2,    quantity:33
rc.Consumed quchekol     index:6     code:3,    quantity:34
rp  Sent:7
rc  Received:35


rp.Produced iufalvxu     index:0
rp.Produced ammjynnm     index:1
rc.Consumed iufalvxu     index:0     code:4,    quantity:0
rc.Consumed iufalvxu     index:0     code:2,    quantity:1
rc.Consumed iufalvxu     index:0     code:1,    quantity:2
rc.Consumed iufalvxu     index:0     code:5,    quantity:3
rc.Consumed iufalvxu     index:0     code:3,    quantity:4
rc.Consumed ammjynnm     index:1     code:1,    quantity:5
rc.Consumed ammjynnm     index:1     code:3,    quantity:6
rc.Consumed ammjynnm     index:1     code:4,    quantity:7
rc.Consumed ammjynnm     index:1     code:5,    quantity:8
rc.Consumed ammjynnm     index:1     code:2,    quantity:9
rp.Produced clbecbge     index:2
rc.Consumed clbecbge     index:2     code:1,    quantity:10
rc.Consumed clbecbge     index:2     code:4,    quantity:11
rc.Consumed clbecbge     index:2     code:3,    quantity:12
rc.Consumed clbecbge     index:2     code:5,    quantity:13
rc.Consumed clbecbge     index:2     code:2,    quantity:14
rp.Produced sletiovo     index:3
rc.Consumed sletiovo     index:3     code:5,    quantity:15
rc.Consumed sletiovo     index:3     code:1,    quantity:16
rc.Consumed sletiovo     index:3     code:2,    quantity:17
rc.Consumed sletiovo     index:3     code:4,    quantity:18
rc.Consumed sletiovo     index:3     code:3,    quantity:19
rp  Sent:4
rc  Received:20