多个延迟消费者、并发句柄、BlockingQueue
Multiple Delayed Consumer, Concurrence handle, BlockingQueue
注意:我确实尝试过简化这段代码
我有多个进程(不同类型)由多个 Runnable 执行。
我试图用图表简化这种情况。
我有一个 RunnableProducer
随着时间的推移生产,生产的产品被转移到一个 RunnableWorker
执行一些操作 ProcessorDown
(蓝色箭头) 执行一个进程,并将其分发给它的接收者(class 的同类)。
如果RunnableWorker
被标记(code
不是null),它必须执行一个特殊类型的过程Processor
和return它到"parent"RunnableWorker
,谁转的。
也就是说,您的接收器收集了许多执行另一个额外的 ProcessorUp
(绿色箭头)注意绿色箭头的数量。
初始的RunnableWorker
将所有的数据(在同class的中介的帮助下)传输到RunnableConsumer
而不混合它们,谁将执行另一个任务(对于这个问题,print
).
RunnableProducer
只有在 RunnableConsumer
最终可以接收/收集所有生产的东西(由 RunnableWorker's
转移)时才应该生产。
RunnableProducer
可以单独关闭。
但是,RunnableConsumer
必须 运行 而 RunnableProducer
正在生产,直到他消耗完所有东西(及其变体)。
注意:您可以复制、粘贴、编译和 运行
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestCollectorRunnable1 {
public static void main(String... args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Counter counter = new Counter();
LifeCycle rcLifeCycle = new LifeCycle();
LifeCycle rpLifeCycle = new LifeCycle();
RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, executorService, counter);
RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, executorService, counter);
RunnableWorker rw0 = new RunnableWorker("rw0", executorService, counter, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());
RunnableWorker rw11 = new RunnableWorker("rw11", executorService, counter, null, rw0);
RunnableWorker rw12 = new RunnableWorker("rw12", executorService, counter, null, rw0);
rw0.addBlockingQueue(rw11.getInputBlockingQueue());
rw0.addBlockingQueue(rw12.getInputBlockingQueue());
RunnableWorker rw211 = new RunnableWorker("rw211", executorService, counter, 1, rw11);
RunnableWorker rw212 = new RunnableWorker("rw212", executorService, counter, 2, rw11);
RunnableWorker rw213 = new RunnableWorker("rw213", executorService, counter, 3, rw11);
rw11.addBlockingQueue(rw211.getInputBlockingQueue());
rw11.addBlockingQueue(rw212.getInputBlockingQueue());
rw11.addBlockingQueue(rw213.getInputBlockingQueue());
RunnableWorker rw221 = new RunnableWorker("rw221", executorService, counter, 4, rw12);
RunnableWorker rw222 = new RunnableWorker("rw222", executorService, counter, 5, rw12);
rw12.addBlockingQueue(rw221.getInputBlockingQueue());
rw12.addBlockingQueue(rw222.getInputBlockingQueue());
//Simulate Turn off
new Timer().schedule(new TimerTask() {
@Override
public void run() {
rp.stop();
}
}, ThreadLocalRandom.current().nextLong(100L, 1000L));
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class RunnableProducer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final int bufferSize;
private final BlockingQueue<ChunkDTO> outBlockingQueue;
private volatile boolean isRunning = false;
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.bufferSize = 8;
this.outBlockingQueue = outBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
long quantity = 0;
isRunning = true;
//Blocking Wait (not very elegant)
/*
block until the consumer can consume without losing what is produced and processed
*/
while (!outLifeCycle.isRunning()) {
try {
Thread.sleep(10);
} catch (Exception e) {
}
}
while (/*isRunning*/quantity < 5) {
ownLifeCycle.setRunning(true);
try {
byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
outBlockingQueue.put(chunkDTO);
System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
Thread.sleep(timeSleeping);
} catch (Exception e) {
}
quantity++;
counter.setValue(quantity);
}
System.out.println(name + "\tSent:" + quantity);
}
public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
return outBlockingQueue;
}
public void stop() {
isRunning = false;
}
}
public static class RunnableConsumer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final BlockingQueue<ChunkDTO> inBlockingQueue;
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.inBlockingQueue = inBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
if (inBlockingQueue != null) {
try {
int quantity = 0;
while (!outLifeCycle.isCreated() || outLifeCycle.isRunning()/*haya recolectado lo que tiene que recolectar*/) {
ownLifeCycle.setRunning(true);
ChunkDTO chunkDTO = inBlockingQueue.take();
System.out.println(name + ".Collected " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t pitch:" + chunkDTO.getPitch());
quantity++;
}
System.out.println(name + "\tReceived:" + quantity);
} catch (InterruptedException e) {
}
}
}
public BlockingQueue<ChunkDTO> getInBlockingQueue() {
return inBlockingQueue;
}
}
public static class RunnableWorker {
private final ExecutorService executorService;
private final RunnableWorker parent;
private final BlockingQueue<ChunkDTO> inputBlockingQueue;
private final BlockingQueue<ChunkDTO> outputBlockingQueue;
private final List<BlockingQueue<ChunkDTO>> downList;
private final List<BlockingQueue<ChunkDTO>> upList;
private final Set<Integer> codes;
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
this.executorService = executorService;
this.parent = parent;
this.inputBlockingQueue = inputBlockingQueue;
this.outputBlockingQueue = outputBlockingQueue;
this.downList = new ArrayList<>();
this.upList = new ArrayList<>(Arrays.asList(new SynchronousQueue/*LinkedBlockingQueue*/<>()));
this.codes = new HashSet<>();
//RUNNABLE DISTRIBUTOR
this.executorService.execute(() -> {
if (inputBlockingQueue != null) {
try {
while (true) {
ChunkDTO chunkDTO = inputBlockingQueue.take();
/*
if (codes.size() > 0) {
System.out.println(name + " codes.length:" + codes.size());
}
if (parent == null) {
System.out.println(name + ".Worked " + new String(chunkDTO.getChunk()) + "\tindex:" + chunkDTO.getIndex());
}
// */
if (code == null) {
new ProcessorDown(executorService, chunkDTO, downList);
} else {
ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
System.out.println("\t\t" + name + ".Returned " + returned.toString());
if (parent != null) {
new Processor(executorService, returned, parent.getUpList());
parent.addCodeSon(code);
}
}
}
} catch (Exception e) {
}
}
});
//RUNNABLE COLLECTOR
if (code == null) {
this.executorService.execute(() -> {
int quantity = 0;
while (quantity == 0) {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
if (outBlockingQueue != null) {
try {
while (quantity == 0 || (quantity > 0 && quantity < codes.size() * (counter.getValue()))) {
ChunkDTO chunkDTO = outBlockingQueue.take();
/*
System.out.println("\t" + name + ".quantity: " + quantity + ", codes.size():" + codes.size() + ", counter.getValue():" + counter.getValue() + ", total:" + (codes.size() * counter.getValue())
+ "\r\t\tcchunk:" + chunkDTO
+ "\r\t\tcodes:" + codes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
// */
if (chunkDTO != null) {
if (parent == null) {
outputBlockingQueue.put(chunkDTO);
System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
} else {
new ProcessorUp(executorService, chunkDTO, parent.getUpList());
}
quantity++;
}
}
/*
if (quantity != 0) {
String codesString = codes.stream().map(i -> i.toString()).collect(Collectors.joining(","));
System.out.println("\t" + name + "\tWorked:" + quantity + ", \tcodes:" + codesString);
}
// */
} catch (InterruptedException e) {
}
}
}
});
}
}
public void addCodeSon(Integer code) {
if (parent != null) {
parent.addCodeSon(code);
}
codes.add(code);
}
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent) {
this(name, executorService, counter, code, parent, new SynchronousQueue/*LinkedBlockingQueue*/<>(), new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
return inputBlockingQueue;
}
public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.add(blockingQueue);
}
public void delBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.remove(blockingQueue);
}
public List<BlockingQueue<ChunkDTO>> getUpList() {
return upList;
}
}
public static class Processor implements Runnable {
private final ExecutorService executorService;
private final List<BlockingQueue<ChunkDTO>> listOutput;
private final ChunkDTO inChunkDTO;
public Processor(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
this.executorService = executorService;
this.listOutput = listOutput;
this.inChunkDTO = inChunkDTO;
this.executorService.execute(this);
}
@Override
public void run() {
if (inChunkDTO != null) {
try {
byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
if (listOutput != null) {
listOutput.forEach(output -> {
try {
output.put(outChunkDTO);
} catch (Exception e) {
}
});
}
} catch (Exception e) {
}
}
}
}
public static class ProcessorDown extends Processor {
public ProcessorDown(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
public static class ProcessorUp extends Processor {
public ProcessorUp(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return out;
}
public static class ChunkDTO {
private final byte[] chunk;
private final long index;
private final Integer pitch;
public ChunkDTO(byte[] chunk, long index, Integer pitch) {
this.chunk = chunk;
this.index = index;
this.pitch = pitch;
}
public byte[] getChunk() {
return chunk;
}
public long getIndex() {
return index;
}
public Integer getPitch() {
return pitch;
}
@Override
public String toString() {
return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Long value;
public Counter() {
this.rwLock = new ReentrantReadWriteLock();
this.value = 0L;
}
public Long getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Long value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
public static class LifeCycle {
private final ReadWriteLock rwLock;
private boolean created;
private boolean running;
private boolean finished;
public LifeCycle() {
this.rwLock = new ReentrantReadWriteLock();
}
public boolean isCreated() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return created;
} finally {
readLock.unlock();
}
}
public void setCreated(boolean created) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.created = created;
} finally {
writeLock.unlock();
}
}
public boolean isRunning() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return running;
} finally {
readLock.unlock();
}
}
public void setRunning(boolean running) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.running = running;
} finally {
writeLock.unlock();
}
}
public boolean isFinished() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return finished;
} finally {
readLock.unlock();
}
}
public void setFinished(boolean finished) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.finished = finished;
} finally {
writeLock.unlock();
}
}
}
}
class ChunkDTO
包含数据、索引(位置)和代码(
以促进其被 RunnableConsumer
)class化。
Counter
class 以便控制 RunnableConsumer/RunnableWorker
应该期待什么。
如果RunnableProducer
产生7个,有5个代码,RunnableConsumer
最终应该收集到35个。 rw11 RunnableWorker
应该收集 3*7=21
而 rw12 RunnableWorker
应该收集 2*7=14
。
LifeCycle
class 是为了控制 LifeCycle Producer 和 Consumer 而创建的,我仍然没有 RunnableWorker
的逻辑。
RunnableWorker
有两个Runnable,用来处理Transfer their children(//RUNNABLE DISTRIBUTOR
) and parents(//RUNNABLE COLLECTOR
).
输出
rp.Produced tpwqomrt index:0
rw0.Worked tpwqomrt index:0
rw221.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw222.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rw212.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rw213.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw211.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rp.Produced xwnlpkju index:1
rw0 codes.length:5
rw0.Worked xwnlpkju index:1
rw11 codes.length:3
rw12 codes.length:2
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rc.Collected tpwqomrt index:0 pitch:2
rc.Collected tpwqomrt index:0 pitch:4
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rc.Collected tpwqomrt index:0 pitch:1
rc.Collected tpwqomrt index:0 pitch:3
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rc.Collected tpwqomrt index:0 pitch:5
rw212.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=2}
rw221.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=4}
rw222.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=5}
rw213.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=3}
rw211.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=1}
rp.Produced xmdfcmmo index:2
rw0 codes.length:5
rw0.Worked xmdfcmmo index:2
rw12 codes.length:2
rw11 codes.length:3
rw221.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=4}
rw212.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=2}
rw222.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=5}
rw213.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=3}
rw211.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=1}
rw0 codes.length:5
rp.Produced syqpyxuk index:3
rw0.Worked syqpyxuk index:3
rw11 codes.length:3
rw0 codes.length:5
rw0.Worked linlkasp index:4
rp.Produced linlkasp index:4
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=1}
rw213.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=3}
rw212.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=2}
rw11 codes.length:3
rw222.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=5}
rw221.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=4}
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=1}
rw213.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=3}
rw222.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=5}
rw212.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=2}
rw221.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=4}
rp Sent:5
如您所见,Producer 发送了 5 个块,但我失去了大部分,我需要接收 25 个(对于此示例)。消费者只收集了很少的物品。 我的逻辑有什么问题?
我不知道创建两个 运行nable 是否是 RunnableWorker class 的一个很好的解决方案。 有没有更好的实现方式?
我知道我有一个可怕的方法来阻止生产者等待消费者。 您推荐什么解决方案?
我自己的回答
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestCollectorRunnable7a {
public static void main(String... args) {
List<Future<?>> futureList = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
Counter counter = new Counter();
CodesList codesList = new CodesList();
LifeCycle rcLifeCycle = new LifeCycle();
LifeCycle rpLifeCycle = new LifeCycle();
RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, futureList, executorService, counter, codesList);
RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, futureList, executorService, counter);
RunnableWorker rw0 = new RunnableWorker("rw0", rcLifeCycle, futureList, executorService, counter, codesList, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());
RunnableWorker rw11 = new RunnableWorker("rw11", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
RunnableWorker rw12 = new RunnableWorker("rw12", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
rw0.addBlockingQueue(rw11.getInputBlockingQueue());
rw0.addBlockingQueue(rw12.getInputBlockingQueue());
RunnableWorker rw211 = new RunnableWorker("rw211", rcLifeCycle, futureList, executorService, counter, codesList, 1, rw11);
RunnableWorker rw212 = new RunnableWorker("rw212", rcLifeCycle, futureList, executorService, counter, codesList, 2, rw11);
RunnableWorker rw213 = new RunnableWorker("rw213", rcLifeCycle, futureList, executorService, counter, codesList, 3, rw11);
rw11.addBlockingQueue(rw211.getInputBlockingQueue());
rw11.addBlockingQueue(rw212.getInputBlockingQueue());
rw11.addBlockingQueue(rw213.getInputBlockingQueue());
RunnableWorker rw221 = new RunnableWorker("rw221", rcLifeCycle, futureList, executorService, counter, codesList, 4, rw12);
RunnableWorker rw222 = new RunnableWorker("rw222", rcLifeCycle, futureList, executorService, counter, codesList, 5, rw12);
rw12.addBlockingQueue(rw221.getInputBlockingQueue());
rw12.addBlockingQueue(rw222.getInputBlockingQueue());
//Simulate Turn off
new Timer(false).schedule(new TimerTask() {
@Override
public void run() {
rp.stop();
}
}, ThreadLocalRandom.current().nextLong(100L, 1000L));
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class RunnableProducer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter counter;
private final int bufferSize;
private final BlockingQueue<ChunkDTO> outBlockingQueue;
private volatile boolean isRunning = false;
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue<>());
}
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.counter = counter;
this.bufferSize = 8;
this.outBlockingQueue = outBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
long quantity = 0;
while (!outLifeCycle.isRunning()) {
try {
Thread.sleep(100);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
isRunning = true;
ownLifeCycle.setRunning(true);
while (isRunning) {
try {
byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
outBlockingQueue.put(chunkDTO);
System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
Thread.sleep(timeSleeping);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
quantity++;
counter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
System.out.println(name + "\tSent:" + quantity);
ownLifeCycle.setFinished(true);
}
public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
return outBlockingQueue;
}
public void stop() {
isRunning = false;
}
}
public static class RunnableConsumer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter counter;
private final Counter intCounter;
private final CodesList codesList;
private final BlockingQueue<ChunkDTO> inBlockingQueue;
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList) {
this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue/*SynchronousQueue LinkedBlockingQueue*/<>(), codesList);
}
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue, CodesList codesList) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.counter = counter;
this.inBlockingQueue = inBlockingQueue;
this.intCounter = new Counter();
this.codesList = codesList;
this.ownLifeCycle.setCreated(true);
this.futureList.add(this.executorService.submit(() -> {
while (!this.outLifeCycle.isFinished() || intCounter.getValue() < counter.getValue() * codesList.size()) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
inBlockingQueue.add(new ChunkStopper(null, -1, null));
}));
this.futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
if (inBlockingQueue != null) {
try {
long quantity = 0;
ownLifeCycle.setRunning(true);
while (true) {
ChunkDTO chunkDTO = inBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
ownLifeCycle.setRunning(false);
break;
}
System.out.println(name + ".Consumed " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t code:" + chunkDTO.getPitch() + ", \tquantity:" + quantity);
quantity++;
intCounter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
System.out.println(name + "\tReceived:" + quantity);
ownLifeCycle.setFinished(true);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public BlockingQueue<ChunkDTO> getInBlockingQueue() {
return inBlockingQueue;
}
}
public static class RunnableWorker {
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter intCounter;
private final CodesList codesList;
private final RunnableWorker parent;
private final BlockingQueue<ChunkDTO> inputBlockingQueue;
private final BlockingQueue<ChunkDTO> outputBlockingQueue;
private final List<BlockingQueue<ChunkDTO>> downList;
private final List<BlockingQueue<ChunkDTO>> upList;
private final Set<Integer> codes;
public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
this.ownLifeCycle = new LifeCycle();
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.intCounter = new Counter();
this.codesList = codesList;
this.parent = parent;
this.inputBlockingQueue = inputBlockingQueue;
this.outputBlockingQueue = outputBlockingQueue;
this.downList = new ArrayList<>();
this.upList = new ArrayList<>(Arrays.asList(new LinkedBlockingQueue<>()));
this.codes = new HashSet<>();
if (code != null) {
this.codesList.addCode(code);
}
this.futureList.add(this.executorService.submit(() -> {
while (!outLifeCycle.isFinished()) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
//System.out.println(name + " -> Consumer Finished!");
while (true) {
try {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
outBlockingQueue.add(new ChunkStopper(null, -1, null));
break;
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
while (true) {
try {
inputBlockingQueue.add(new ChunkStopper(null, -1, null));
break;
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}));
//RUNNABLE DISTRIBUTOR
this.futureList.add(this.executorService.submit(() -> {
long quantity = 0;
if (inputBlockingQueue != null) {
try {
ownLifeCycle.setRunning(true);
while (true) {
ChunkDTO chunkDTO = inputBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
break;
}
if (code == null) {
new ProcessorDown(futureList, executorService, chunkDTO, downList);
} else {
ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
//System.out.println("\t\t" + name + ".Returned " + returned.toString());
if (parent != null) {
new Processor(this.futureList, executorService, returned, parent.getUpList());
parent.addCodeSon(code);
}
}
quantity++;
intCounter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
//System.out.println(name + ". DISTRIBUTOR Finished");
}));
//RUNNABLE COLLECTOR
if (code == null) {
this.futureList.add(this.executorService.submit(() -> {
int quantity = 0;
while (quantity == 0) {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
if (outBlockingQueue != null) {
try {
while (true) {
ChunkDTO chunkDTO = outBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
break;
}
if (parent == null) {
outputBlockingQueue.put(chunkDTO);
//System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
} else {
new ProcessorUp(futureList, executorService, chunkDTO, parent.getUpList());
}
quantity++;
}
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
//System.out.println(name + ". COLLECTOR Finished");
}));
}
}
public LifeCycle getOwnLifeCycle() {
return ownLifeCycle;
}
public void addCodeSon(Integer code) {
if (parent != null) {
parent.addCodeSon(code);
}
codes.add(code);
}
public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent) {
this(name, outLifeCycle, futureList, executorService, counter, codesList, code, parent, new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>());
}
public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
return inputBlockingQueue;
}
public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.add(blockingQueue);
}
public List<BlockingQueue<ChunkDTO>> getUpList() {
return upList;
}
}
public static class Processor implements Runnable {
private final ExecutorService executorService;
private final List<BlockingQueue<ChunkDTO>> listOutput;
private final ChunkDTO inChunkDTO;
public Processor(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
this.executorService = executorService;
this.listOutput = listOutput;
this.inChunkDTO = inChunkDTO;
futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
if (inChunkDTO != null) {
try {
byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
if (listOutput != null) {
listOutput.forEach(output -> {
try {
output.put(outChunkDTO);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
});
}
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
public static class ProcessorDown extends Processor {
public ProcessorDown(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(futureList, executorService, inChunkDTO, listOutput);
}
}
public static class ProcessorUp extends Processor {
public ProcessorUp(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(futureList, executorService, inChunkDTO, listOutput);
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return out;
}
public static class ChunkStopper extends ChunkDTO {
public ChunkStopper(byte[] chunk, long index, Integer pitch) {
super(chunk, index, pitch);
}
}
public static class ChunkDTO {
private final byte[] chunk;
private final long index;
private final Integer pitch;
public ChunkDTO(byte[] chunk, long index, Integer pitch) {
this.chunk = chunk;
this.index = index;
this.pitch = pitch;
}
public byte[] getChunk() {
return chunk;
}
public long getIndex() {
return index;
}
public Integer getPitch() {
return pitch;
}
@Override
public String toString() {
return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Long value;
public Counter() {
this.rwLock = new ReentrantReadWriteLock();
this.value = 0L;
}
public Long getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Long value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
public static class CodesList {
private final List<Integer> codes;
private final ReadWriteLock rwLock;
public CodesList() {
this.codes = new ArrayList<>();
this.rwLock = new ReentrantReadWriteLock();
}
public void addCode(Integer code) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
codes.add(code);
} finally {
writeLock.unlock();
}
}
public int size() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return codes.size();
} finally {
readLock.unlock();
}
}
}
public static class LifeCycle {
private final ReadWriteLock rwLock;
private boolean created;
private boolean running;
private boolean finished;
public LifeCycle() {
this.rwLock = new ReentrantReadWriteLock();
}
public boolean isCreated() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return created;
} finally {
readLock.unlock();
}
}
public void setCreated(boolean created) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.created = created;
} finally {
writeLock.unlock();
}
}
public boolean isRunning() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return running;
} finally {
readLock.unlock();
}
}
public void setRunning(boolean running) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.running = running;
} finally {
writeLock.unlock();
}
}
public boolean isFinished() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return finished;
} finally {
readLock.unlock();
}
}
public void setFinished(boolean finished) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.finished = finished;
} finally {
writeLock.unlock();
}
}
}
}
我的测试
rp.Produced egxjthjr index:0
rp.Produced pdiutqkt index:1
rc.Consumed egxjthjr index:0 code:2, quantity:0
rc.Consumed egxjthjr index:0 code:1, quantity:1
rc.Consumed egxjthjr index:0 code:5, quantity:2
rc.Consumed egxjthjr index:0 code:3, quantity:3
rc.Consumed egxjthjr index:0 code:4, quantity:4
rc.Consumed pdiutqkt index:1 code:4, quantity:5
rp.Produced dwqtvoun index:2
rc.Consumed pdiutqkt index:1 code:2, quantity:6
rc.Consumed pdiutqkt index:1 code:5, quantity:7
rc.Consumed pdiutqkt index:1 code:1, quantity:8
rc.Consumed pdiutqkt index:1 code:3, quantity:9
rp.Produced ydwqheks index:3
rc.Consumed dwqtvoun index:2 code:4, quantity:10
rc.Consumed dwqtvoun index:2 code:5, quantity:11
rc.Consumed dwqtvoun index:2 code:1, quantity:12
rc.Consumed dwqtvoun index:2 code:2, quantity:13
rc.Consumed dwqtvoun index:2 code:3, quantity:14
rc.Consumed ydwqheks index:3 code:1, quantity:15
rc.Consumed ydwqheks index:3 code:3, quantity:16
rc.Consumed ydwqheks index:3 code:2, quantity:17
rc.Consumed ydwqheks index:3 code:5, quantity:18
rc.Consumed ydwqheks index:3 code:4, quantity:19
rp.Produced tamvejvq index:4
rp.Produced tpqjkgqd index:5
rc.Consumed tamvejvq index:4 code:4, quantity:20
rc.Consumed tamvejvq index:4 code:5, quantity:21
rc.Consumed tamvejvq index:4 code:2, quantity:22
rc.Consumed tamvejvq index:4 code:3, quantity:23
rc.Consumed tamvejvq index:4 code:1, quantity:24
rp.Produced quchekol index:6
rc.Consumed tpqjkgqd index:5 code:4, quantity:25
rc.Consumed tpqjkgqd index:5 code:2, quantity:26
rc.Consumed tpqjkgqd index:5 code:5, quantity:27
rc.Consumed tpqjkgqd index:5 code:3, quantity:28
rc.Consumed tpqjkgqd index:5 code:1, quantity:29
rc.Consumed quchekol index:6 code:4, quantity:30
rc.Consumed quchekol index:6 code:1, quantity:31
rc.Consumed quchekol index:6 code:5, quantity:32
rc.Consumed quchekol index:6 code:2, quantity:33
rc.Consumed quchekol index:6 code:3, quantity:34
rp Sent:7
rc Received:35
另一个测试
rp.Produced iufalvxu index:0
rp.Produced ammjynnm index:1
rc.Consumed iufalvxu index:0 code:4, quantity:0
rc.Consumed iufalvxu index:0 code:2, quantity:1
rc.Consumed iufalvxu index:0 code:1, quantity:2
rc.Consumed iufalvxu index:0 code:5, quantity:3
rc.Consumed iufalvxu index:0 code:3, quantity:4
rc.Consumed ammjynnm index:1 code:1, quantity:5
rc.Consumed ammjynnm index:1 code:3, quantity:6
rc.Consumed ammjynnm index:1 code:4, quantity:7
rc.Consumed ammjynnm index:1 code:5, quantity:8
rc.Consumed ammjynnm index:1 code:2, quantity:9
rp.Produced clbecbge index:2
rc.Consumed clbecbge index:2 code:1, quantity:10
rc.Consumed clbecbge index:2 code:4, quantity:11
rc.Consumed clbecbge index:2 code:3, quantity:12
rc.Consumed clbecbge index:2 code:5, quantity:13
rc.Consumed clbecbge index:2 code:2, quantity:14
rp.Produced sletiovo index:3
rc.Consumed sletiovo index:3 code:5, quantity:15
rc.Consumed sletiovo index:3 code:1, quantity:16
rc.Consumed sletiovo index:3 code:2, quantity:17
rc.Consumed sletiovo index:3 code:4, quantity:18
rc.Consumed sletiovo index:3 code:3, quantity:19
rp Sent:4
rc Received:20
注意:我确实尝试过简化这段代码
我有多个进程(不同类型)由多个 Runnable 执行。
我试图用图表简化这种情况。
我有一个 RunnableProducer
随着时间的推移生产,生产的产品被转移到一个 RunnableWorker
执行一些操作 ProcessorDown
(蓝色箭头) 执行一个进程,并将其分发给它的接收者(class 的同类)。
如果RunnableWorker
被标记(code
不是null),它必须执行一个特殊类型的过程Processor
和return它到"parent"RunnableWorker
,谁转的。
也就是说,您的接收器收集了许多执行另一个额外的 ProcessorUp
(绿色箭头)注意绿色箭头的数量。
初始的RunnableWorker
将所有的数据(在同class的中介的帮助下)传输到RunnableConsumer
而不混合它们,谁将执行另一个任务(对于这个问题,print
).
RunnableProducer
只有在 RunnableConsumer
最终可以接收/收集所有生产的东西(由 RunnableWorker's
转移)时才应该生产。
RunnableProducer
可以单独关闭。
但是,RunnableConsumer
必须 运行 而 RunnableProducer
正在生产,直到他消耗完所有东西(及其变体)。
注意:您可以复制、粘贴、编译和 运行
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestCollectorRunnable1 {
public static void main(String... args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Counter counter = new Counter();
LifeCycle rcLifeCycle = new LifeCycle();
LifeCycle rpLifeCycle = new LifeCycle();
RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, executorService, counter);
RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, executorService, counter);
RunnableWorker rw0 = new RunnableWorker("rw0", executorService, counter, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());
RunnableWorker rw11 = new RunnableWorker("rw11", executorService, counter, null, rw0);
RunnableWorker rw12 = new RunnableWorker("rw12", executorService, counter, null, rw0);
rw0.addBlockingQueue(rw11.getInputBlockingQueue());
rw0.addBlockingQueue(rw12.getInputBlockingQueue());
RunnableWorker rw211 = new RunnableWorker("rw211", executorService, counter, 1, rw11);
RunnableWorker rw212 = new RunnableWorker("rw212", executorService, counter, 2, rw11);
RunnableWorker rw213 = new RunnableWorker("rw213", executorService, counter, 3, rw11);
rw11.addBlockingQueue(rw211.getInputBlockingQueue());
rw11.addBlockingQueue(rw212.getInputBlockingQueue());
rw11.addBlockingQueue(rw213.getInputBlockingQueue());
RunnableWorker rw221 = new RunnableWorker("rw221", executorService, counter, 4, rw12);
RunnableWorker rw222 = new RunnableWorker("rw222", executorService, counter, 5, rw12);
rw12.addBlockingQueue(rw221.getInputBlockingQueue());
rw12.addBlockingQueue(rw222.getInputBlockingQueue());
//Simulate Turn off
new Timer().schedule(new TimerTask() {
@Override
public void run() {
rp.stop();
}
}, ThreadLocalRandom.current().nextLong(100L, 1000L));
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class RunnableProducer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final int bufferSize;
private final BlockingQueue<ChunkDTO> outBlockingQueue;
private volatile boolean isRunning = false;
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.bufferSize = 8;
this.outBlockingQueue = outBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
long quantity = 0;
isRunning = true;
//Blocking Wait (not very elegant)
/*
block until the consumer can consume without losing what is produced and processed
*/
while (!outLifeCycle.isRunning()) {
try {
Thread.sleep(10);
} catch (Exception e) {
}
}
while (/*isRunning*/quantity < 5) {
ownLifeCycle.setRunning(true);
try {
byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
outBlockingQueue.put(chunkDTO);
System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
Thread.sleep(timeSleeping);
} catch (Exception e) {
}
quantity++;
counter.setValue(quantity);
}
System.out.println(name + "\tSent:" + quantity);
}
public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
return outBlockingQueue;
}
public void stop() {
isRunning = false;
}
}
public static class RunnableConsumer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final ExecutorService executorService;
private final Counter counter;
private final BlockingQueue<ChunkDTO> inBlockingQueue;
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, executorService, counter, new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.executorService = executorService;
this.counter = counter;
this.inBlockingQueue = inBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.executorService.execute(this);
}
@Override
public void run() {
if (inBlockingQueue != null) {
try {
int quantity = 0;
while (!outLifeCycle.isCreated() || outLifeCycle.isRunning()/*haya recolectado lo que tiene que recolectar*/) {
ownLifeCycle.setRunning(true);
ChunkDTO chunkDTO = inBlockingQueue.take();
System.out.println(name + ".Collected " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t pitch:" + chunkDTO.getPitch());
quantity++;
}
System.out.println(name + "\tReceived:" + quantity);
} catch (InterruptedException e) {
}
}
}
public BlockingQueue<ChunkDTO> getInBlockingQueue() {
return inBlockingQueue;
}
}
public static class RunnableWorker {
private final ExecutorService executorService;
private final RunnableWorker parent;
private final BlockingQueue<ChunkDTO> inputBlockingQueue;
private final BlockingQueue<ChunkDTO> outputBlockingQueue;
private final List<BlockingQueue<ChunkDTO>> downList;
private final List<BlockingQueue<ChunkDTO>> upList;
private final Set<Integer> codes;
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
this.executorService = executorService;
this.parent = parent;
this.inputBlockingQueue = inputBlockingQueue;
this.outputBlockingQueue = outputBlockingQueue;
this.downList = new ArrayList<>();
this.upList = new ArrayList<>(Arrays.asList(new SynchronousQueue/*LinkedBlockingQueue*/<>()));
this.codes = new HashSet<>();
//RUNNABLE DISTRIBUTOR
this.executorService.execute(() -> {
if (inputBlockingQueue != null) {
try {
while (true) {
ChunkDTO chunkDTO = inputBlockingQueue.take();
/*
if (codes.size() > 0) {
System.out.println(name + " codes.length:" + codes.size());
}
if (parent == null) {
System.out.println(name + ".Worked " + new String(chunkDTO.getChunk()) + "\tindex:" + chunkDTO.getIndex());
}
// */
if (code == null) {
new ProcessorDown(executorService, chunkDTO, downList);
} else {
ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
System.out.println("\t\t" + name + ".Returned " + returned.toString());
if (parent != null) {
new Processor(executorService, returned, parent.getUpList());
parent.addCodeSon(code);
}
}
}
} catch (Exception e) {
}
}
});
//RUNNABLE COLLECTOR
if (code == null) {
this.executorService.execute(() -> {
int quantity = 0;
while (quantity == 0) {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
if (outBlockingQueue != null) {
try {
while (quantity == 0 || (quantity > 0 && quantity < codes.size() * (counter.getValue()))) {
ChunkDTO chunkDTO = outBlockingQueue.take();
/*
System.out.println("\t" + name + ".quantity: " + quantity + ", codes.size():" + codes.size() + ", counter.getValue():" + counter.getValue() + ", total:" + (codes.size() * counter.getValue())
+ "\r\t\tcchunk:" + chunkDTO
+ "\r\t\tcodes:" + codes.stream().map(i -> i.toString()).collect(Collectors.joining(",")));
// */
if (chunkDTO != null) {
if (parent == null) {
outputBlockingQueue.put(chunkDTO);
System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
} else {
new ProcessorUp(executorService, chunkDTO, parent.getUpList());
}
quantity++;
}
}
/*
if (quantity != 0) {
String codesString = codes.stream().map(i -> i.toString()).collect(Collectors.joining(","));
System.out.println("\t" + name + "\tWorked:" + quantity + ", \tcodes:" + codesString);
}
// */
} catch (InterruptedException e) {
}
}
}
});
}
}
public void addCodeSon(Integer code) {
if (parent != null) {
parent.addCodeSon(code);
}
codes.add(code);
}
public RunnableWorker(String name, ExecutorService executorService, Counter counter, Integer code, RunnableWorker parent) {
this(name, executorService, counter, code, parent, new SynchronousQueue/*LinkedBlockingQueue*/<>(), new SynchronousQueue/*LinkedBlockingQueue*/<>());
}
public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
return inputBlockingQueue;
}
public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.add(blockingQueue);
}
public void delBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.remove(blockingQueue);
}
public List<BlockingQueue<ChunkDTO>> getUpList() {
return upList;
}
}
public static class Processor implements Runnable {
private final ExecutorService executorService;
private final List<BlockingQueue<ChunkDTO>> listOutput;
private final ChunkDTO inChunkDTO;
public Processor(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
this.executorService = executorService;
this.listOutput = listOutput;
this.inChunkDTO = inChunkDTO;
this.executorService.execute(this);
}
@Override
public void run() {
if (inChunkDTO != null) {
try {
byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
if (listOutput != null) {
listOutput.forEach(output -> {
try {
output.put(outChunkDTO);
} catch (Exception e) {
}
});
}
} catch (Exception e) {
}
}
}
}
public static class ProcessorDown extends Processor {
public ProcessorDown(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
public static class ProcessorUp extends Processor {
public ProcessorUp(ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(executorService, inChunkDTO, listOutput);
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return out;
}
public static class ChunkDTO {
private final byte[] chunk;
private final long index;
private final Integer pitch;
public ChunkDTO(byte[] chunk, long index, Integer pitch) {
this.chunk = chunk;
this.index = index;
this.pitch = pitch;
}
public byte[] getChunk() {
return chunk;
}
public long getIndex() {
return index;
}
public Integer getPitch() {
return pitch;
}
@Override
public String toString() {
return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Long value;
public Counter() {
this.rwLock = new ReentrantReadWriteLock();
this.value = 0L;
}
public Long getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Long value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
public static class LifeCycle {
private final ReadWriteLock rwLock;
private boolean created;
private boolean running;
private boolean finished;
public LifeCycle() {
this.rwLock = new ReentrantReadWriteLock();
}
public boolean isCreated() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return created;
} finally {
readLock.unlock();
}
}
public void setCreated(boolean created) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.created = created;
} finally {
writeLock.unlock();
}
}
public boolean isRunning() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return running;
} finally {
readLock.unlock();
}
}
public void setRunning(boolean running) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.running = running;
} finally {
writeLock.unlock();
}
}
public boolean isFinished() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return finished;
} finally {
readLock.unlock();
}
}
public void setFinished(boolean finished) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.finished = finished;
} finally {
writeLock.unlock();
}
}
}
}
class ChunkDTO
包含数据、索引(位置)和代码(
以促进其被 RunnableConsumer
)class化。
Counter
class 以便控制 RunnableConsumer/RunnableWorker
应该期待什么。
如果RunnableProducer
产生7个,有5个代码,RunnableConsumer
最终应该收集到35个。 rw11 RunnableWorker
应该收集 3*7=21
而 rw12 RunnableWorker
应该收集 2*7=14
。
LifeCycle
class 是为了控制 LifeCycle Producer 和 Consumer 而创建的,我仍然没有 RunnableWorker
的逻辑。
RunnableWorker
有两个Runnable,用来处理Transfer their children(//RUNNABLE DISTRIBUTOR
) and parents(//RUNNABLE COLLECTOR
).
输出
rp.Produced tpwqomrt index:0
rw0.Worked tpwqomrt index:0
rw221.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw222.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rw212.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rw213.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw211.Returned ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rp.Produced xwnlpkju index:1
rw0 codes.length:5
rw0.Worked xwnlpkju index:1
rw11 codes.length:3
rw12 codes.length:2
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=2}
rc.Collected tpwqomrt index:0 pitch:2
rc.Collected tpwqomrt index:0 pitch:4
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=4}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=1}
rc.Collected tpwqomrt index:0 pitch:1
rc.Collected tpwqomrt index:0 pitch:3
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=3}
rw0.Collected ChunkDTO{chunk=tpwqomrt, index=0, pitch=5}
rc.Collected tpwqomrt index:0 pitch:5
rw212.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=2}
rw221.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=4}
rw222.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=5}
rw213.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=3}
rw211.Returned ChunkDTO{chunk=xwnlpkju, index=1, pitch=1}
rp.Produced xmdfcmmo index:2
rw0 codes.length:5
rw0.Worked xmdfcmmo index:2
rw12 codes.length:2
rw11 codes.length:3
rw221.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=4}
rw212.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=2}
rw222.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=5}
rw213.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=3}
rw211.Returned ChunkDTO{chunk=xmdfcmmo, index=2, pitch=1}
rw0 codes.length:5
rp.Produced syqpyxuk index:3
rw0.Worked syqpyxuk index:3
rw11 codes.length:3
rw0 codes.length:5
rw0.Worked linlkasp index:4
rp.Produced linlkasp index:4
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=1}
rw213.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=3}
rw212.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=2}
rw11 codes.length:3
rw222.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=5}
rw221.Returned ChunkDTO{chunk=syqpyxuk, index=3, pitch=4}
rw12 codes.length:2
rw211.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=1}
rw213.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=3}
rw222.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=5}
rw212.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=2}
rw221.Returned ChunkDTO{chunk=linlkasp, index=4, pitch=4}
rp Sent:5
如您所见,Producer 发送了 5 个块,但我失去了大部分,我需要接收 25 个(对于此示例)。消费者只收集了很少的物品。 我的逻辑有什么问题?
我不知道创建两个 运行nable 是否是 RunnableWorker class 的一个很好的解决方案。 有没有更好的实现方式?
我知道我有一个可怕的方法来阻止生产者等待消费者。 您推荐什么解决方案?
我自己的回答
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestCollectorRunnable7a {
public static void main(String... args) {
List<Future<?>> futureList = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
Counter counter = new Counter();
CodesList codesList = new CodesList();
LifeCycle rcLifeCycle = new LifeCycle();
LifeCycle rpLifeCycle = new LifeCycle();
RunnableConsumer rc = new RunnableConsumer("rc", rcLifeCycle, rpLifeCycle, futureList, executorService, counter, codesList);
RunnableProducer rp = new RunnableProducer("rp", rpLifeCycle, rcLifeCycle, futureList, executorService, counter);
RunnableWorker rw0 = new RunnableWorker("rw0", rcLifeCycle, futureList, executorService, counter, codesList, null, null, rp.getOutBlockingQueue(), rc.getInBlockingQueue());
RunnableWorker rw11 = new RunnableWorker("rw11", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
RunnableWorker rw12 = new RunnableWorker("rw12", rcLifeCycle, futureList, executorService, counter, codesList, null, rw0);
rw0.addBlockingQueue(rw11.getInputBlockingQueue());
rw0.addBlockingQueue(rw12.getInputBlockingQueue());
RunnableWorker rw211 = new RunnableWorker("rw211", rcLifeCycle, futureList, executorService, counter, codesList, 1, rw11);
RunnableWorker rw212 = new RunnableWorker("rw212", rcLifeCycle, futureList, executorService, counter, codesList, 2, rw11);
RunnableWorker rw213 = new RunnableWorker("rw213", rcLifeCycle, futureList, executorService, counter, codesList, 3, rw11);
rw11.addBlockingQueue(rw211.getInputBlockingQueue());
rw11.addBlockingQueue(rw212.getInputBlockingQueue());
rw11.addBlockingQueue(rw213.getInputBlockingQueue());
RunnableWorker rw221 = new RunnableWorker("rw221", rcLifeCycle, futureList, executorService, counter, codesList, 4, rw12);
RunnableWorker rw222 = new RunnableWorker("rw222", rcLifeCycle, futureList, executorService, counter, codesList, 5, rw12);
rw12.addBlockingQueue(rw221.getInputBlockingQueue());
rw12.addBlockingQueue(rw222.getInputBlockingQueue());
//Simulate Turn off
new Timer(false).schedule(new TimerTask() {
@Override
public void run() {
rp.stop();
}
}, ThreadLocalRandom.current().nextLong(100L, 1000L));
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class RunnableProducer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter counter;
private final int bufferSize;
private final BlockingQueue<ChunkDTO> outBlockingQueue;
private volatile boolean isRunning = false;
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter) {
this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue<>());
}
public RunnableProducer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> outBlockingQueue) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.counter = counter;
this.bufferSize = 8;
this.outBlockingQueue = outBlockingQueue;
this.ownLifeCycle.setCreated(true);
this.futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
long quantity = 0;
while (!outLifeCycle.isRunning()) {
try {
Thread.sleep(100);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
isRunning = true;
ownLifeCycle.setRunning(true);
while (isRunning) {
try {
byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
ChunkDTO chunkDTO = new ChunkDTO(outBytesSamples, quantity, null);
outBlockingQueue.put(chunkDTO);
System.out.println(name + ".Produced " + new String(outBytesSamples) + "\t index:" + quantity);
int timeSleeping = ThreadLocalRandom.current().nextInt(10, 100);
Thread.sleep(timeSleeping);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
quantity++;
counter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
System.out.println(name + "\tSent:" + quantity);
ownLifeCycle.setFinished(true);
}
public BlockingQueue<ChunkDTO> getOutBlockingQueue() {
return outBlockingQueue;
}
public void stop() {
isRunning = false;
}
}
public static class RunnableConsumer implements Runnable {
private final String name;
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter counter;
private final Counter intCounter;
private final CodesList codesList;
private final BlockingQueue<ChunkDTO> inBlockingQueue;
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList) {
this(name, ownLifeCycle, outLifeCycle, futureList, executorService, counter, new LinkedBlockingQueue/*SynchronousQueue LinkedBlockingQueue*/<>(), codesList);
}
public RunnableConsumer(String name, LifeCycle ownLifeCycle, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, BlockingQueue<ChunkDTO> inBlockingQueue, CodesList codesList) {
this.name = name;
this.ownLifeCycle = ownLifeCycle;
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.counter = counter;
this.inBlockingQueue = inBlockingQueue;
this.intCounter = new Counter();
this.codesList = codesList;
this.ownLifeCycle.setCreated(true);
this.futureList.add(this.executorService.submit(() -> {
while (!this.outLifeCycle.isFinished() || intCounter.getValue() < counter.getValue() * codesList.size()) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
inBlockingQueue.add(new ChunkStopper(null, -1, null));
}));
this.futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
if (inBlockingQueue != null) {
try {
long quantity = 0;
ownLifeCycle.setRunning(true);
while (true) {
ChunkDTO chunkDTO = inBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
ownLifeCycle.setRunning(false);
break;
}
System.out.println(name + ".Consumed " + new String(chunkDTO.getChunk()) + "\t index:" + chunkDTO.getIndex() + "\t code:" + chunkDTO.getPitch() + ", \tquantity:" + quantity);
quantity++;
intCounter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
System.out.println(name + "\tReceived:" + quantity);
ownLifeCycle.setFinished(true);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public BlockingQueue<ChunkDTO> getInBlockingQueue() {
return inBlockingQueue;
}
}
public static class RunnableWorker {
private final LifeCycle ownLifeCycle;
private final LifeCycle outLifeCycle;
private final List<Future<?>> futureList;
private final ExecutorService executorService;
private final Counter intCounter;
private final CodesList codesList;
private final RunnableWorker parent;
private final BlockingQueue<ChunkDTO> inputBlockingQueue;
private final BlockingQueue<ChunkDTO> outputBlockingQueue;
private final List<BlockingQueue<ChunkDTO>> downList;
private final List<BlockingQueue<ChunkDTO>> upList;
private final Set<Integer> codes;
public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent, BlockingQueue<ChunkDTO> inputBlockingQueue, BlockingQueue<ChunkDTO> outputBlockingQueue) {
this.ownLifeCycle = new LifeCycle();
this.outLifeCycle = outLifeCycle;
this.futureList = futureList;
this.executorService = executorService;
this.intCounter = new Counter();
this.codesList = codesList;
this.parent = parent;
this.inputBlockingQueue = inputBlockingQueue;
this.outputBlockingQueue = outputBlockingQueue;
this.downList = new ArrayList<>();
this.upList = new ArrayList<>(Arrays.asList(new LinkedBlockingQueue<>()));
this.codes = new HashSet<>();
if (code != null) {
this.codesList.addCode(code);
}
this.futureList.add(this.executorService.submit(() -> {
while (!outLifeCycle.isFinished()) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
//System.out.println(name + " -> Consumer Finished!");
while (true) {
try {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
outBlockingQueue.add(new ChunkStopper(null, -1, null));
break;
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
while (true) {
try {
inputBlockingQueue.add(new ChunkStopper(null, -1, null));
break;
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}));
//RUNNABLE DISTRIBUTOR
this.futureList.add(this.executorService.submit(() -> {
long quantity = 0;
if (inputBlockingQueue != null) {
try {
ownLifeCycle.setRunning(true);
while (true) {
ChunkDTO chunkDTO = inputBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
break;
}
if (code == null) {
new ProcessorDown(futureList, executorService, chunkDTO, downList);
} else {
ChunkDTO returned = new ChunkDTO(chunkDTO.getChunk(), chunkDTO.getIndex(), code);
//System.out.println("\t\t" + name + ".Returned " + returned.toString());
if (parent != null) {
new Processor(this.futureList, executorService, returned, parent.getUpList());
parent.addCodeSon(code);
}
}
quantity++;
intCounter.setValue(quantity);
}
ownLifeCycle.setRunning(false);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
//System.out.println(name + ". DISTRIBUTOR Finished");
}));
//RUNNABLE COLLECTOR
if (code == null) {
this.futureList.add(this.executorService.submit(() -> {
int quantity = 0;
while (quantity == 0) {
BlockingQueue<ChunkDTO> outBlockingQueue = upList.get(0);
if (outBlockingQueue != null) {
try {
while (true) {
ChunkDTO chunkDTO = outBlockingQueue.take();
if (chunkDTO instanceof ChunkStopper) {
break;
}
if (parent == null) {
outputBlockingQueue.put(chunkDTO);
//System.out.println("\t\t" + name + ".Collected " + chunkDTO.toString());
} else {
new ProcessorUp(futureList, executorService, chunkDTO, parent.getUpList());
}
quantity++;
}
} catch (InterruptedException ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
//System.out.println(name + ". COLLECTOR Finished");
}));
}
}
public LifeCycle getOwnLifeCycle() {
return ownLifeCycle;
}
public void addCodeSon(Integer code) {
if (parent != null) {
parent.addCodeSon(code);
}
codes.add(code);
}
public RunnableWorker(String name, LifeCycle outLifeCycle, List<Future<?>> futureList, ExecutorService executorService, Counter counter, CodesList codesList, Integer code, RunnableWorker parent) {
this(name, outLifeCycle, futureList, executorService, counter, codesList, code, parent, new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>());
}
public BlockingQueue<ChunkDTO> getInputBlockingQueue() {
return inputBlockingQueue;
}
public void addBlockingQueue(BlockingQueue<ChunkDTO> blockingQueue) {
downList.add(blockingQueue);
}
public List<BlockingQueue<ChunkDTO>> getUpList() {
return upList;
}
}
public static class Processor implements Runnable {
private final ExecutorService executorService;
private final List<BlockingQueue<ChunkDTO>> listOutput;
private final ChunkDTO inChunkDTO;
public Processor(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
this.executorService = executorService;
this.listOutput = listOutput;
this.inChunkDTO = inChunkDTO;
futureList.add(this.executorService.submit(this));
}
@Override
public void run() {
if (inChunkDTO != null) {
try {
byte[] outBytes = internalProcessing(inChunkDTO.getChunk());
ChunkDTO outChunkDTO = new ChunkDTO(outBytes, inChunkDTO.getIndex(), inChunkDTO.getPitch());
if (listOutput != null) {
listOutput.forEach(output -> {
try {
output.put(outChunkDTO);
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
});
}
} catch (Exception ex) {
Logger.getLogger(TestCollectorRunnable7a.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
public static class ProcessorDown extends Processor {
public ProcessorDown(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(futureList, executorService, inChunkDTO, listOutput);
}
}
public static class ProcessorUp extends Processor {
public ProcessorUp(List<Future<?>> futureList, ExecutorService executorService, ChunkDTO inChunkDTO, List<BlockingQueue<ChunkDTO>> listOutput) {
super(futureList, executorService, inChunkDTO, listOutput);
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return out;
}
public static class ChunkStopper extends ChunkDTO {
public ChunkStopper(byte[] chunk, long index, Integer pitch) {
super(chunk, index, pitch);
}
}
public static class ChunkDTO {
private final byte[] chunk;
private final long index;
private final Integer pitch;
public ChunkDTO(byte[] chunk, long index, Integer pitch) {
this.chunk = chunk;
this.index = index;
this.pitch = pitch;
}
public byte[] getChunk() {
return chunk;
}
public long getIndex() {
return index;
}
public Integer getPitch() {
return pitch;
}
@Override
public String toString() {
return "ChunkDTO{" + "chunk=" + new String(chunk) + ", index=" + index + ", pitch=" + pitch + '}';
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Long value;
public Counter() {
this.rwLock = new ReentrantReadWriteLock();
this.value = 0L;
}
public Long getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Long value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
public static class CodesList {
private final List<Integer> codes;
private final ReadWriteLock rwLock;
public CodesList() {
this.codes = new ArrayList<>();
this.rwLock = new ReentrantReadWriteLock();
}
public void addCode(Integer code) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
codes.add(code);
} finally {
writeLock.unlock();
}
}
public int size() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return codes.size();
} finally {
readLock.unlock();
}
}
}
public static class LifeCycle {
private final ReadWriteLock rwLock;
private boolean created;
private boolean running;
private boolean finished;
public LifeCycle() {
this.rwLock = new ReentrantReadWriteLock();
}
public boolean isCreated() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return created;
} finally {
readLock.unlock();
}
}
public void setCreated(boolean created) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.created = created;
} finally {
writeLock.unlock();
}
}
public boolean isRunning() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return running;
} finally {
readLock.unlock();
}
}
public void setRunning(boolean running) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.running = running;
} finally {
writeLock.unlock();
}
}
public boolean isFinished() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return finished;
} finally {
readLock.unlock();
}
}
public void setFinished(boolean finished) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.finished = finished;
} finally {
writeLock.unlock();
}
}
}
}
我的测试
rp.Produced egxjthjr index:0
rp.Produced pdiutqkt index:1
rc.Consumed egxjthjr index:0 code:2, quantity:0
rc.Consumed egxjthjr index:0 code:1, quantity:1
rc.Consumed egxjthjr index:0 code:5, quantity:2
rc.Consumed egxjthjr index:0 code:3, quantity:3
rc.Consumed egxjthjr index:0 code:4, quantity:4
rc.Consumed pdiutqkt index:1 code:4, quantity:5
rp.Produced dwqtvoun index:2
rc.Consumed pdiutqkt index:1 code:2, quantity:6
rc.Consumed pdiutqkt index:1 code:5, quantity:7
rc.Consumed pdiutqkt index:1 code:1, quantity:8
rc.Consumed pdiutqkt index:1 code:3, quantity:9
rp.Produced ydwqheks index:3
rc.Consumed dwqtvoun index:2 code:4, quantity:10
rc.Consumed dwqtvoun index:2 code:5, quantity:11
rc.Consumed dwqtvoun index:2 code:1, quantity:12
rc.Consumed dwqtvoun index:2 code:2, quantity:13
rc.Consumed dwqtvoun index:2 code:3, quantity:14
rc.Consumed ydwqheks index:3 code:1, quantity:15
rc.Consumed ydwqheks index:3 code:3, quantity:16
rc.Consumed ydwqheks index:3 code:2, quantity:17
rc.Consumed ydwqheks index:3 code:5, quantity:18
rc.Consumed ydwqheks index:3 code:4, quantity:19
rp.Produced tamvejvq index:4
rp.Produced tpqjkgqd index:5
rc.Consumed tamvejvq index:4 code:4, quantity:20
rc.Consumed tamvejvq index:4 code:5, quantity:21
rc.Consumed tamvejvq index:4 code:2, quantity:22
rc.Consumed tamvejvq index:4 code:3, quantity:23
rc.Consumed tamvejvq index:4 code:1, quantity:24
rp.Produced quchekol index:6
rc.Consumed tpqjkgqd index:5 code:4, quantity:25
rc.Consumed tpqjkgqd index:5 code:2, quantity:26
rc.Consumed tpqjkgqd index:5 code:5, quantity:27
rc.Consumed tpqjkgqd index:5 code:3, quantity:28
rc.Consumed tpqjkgqd index:5 code:1, quantity:29
rc.Consumed quchekol index:6 code:4, quantity:30
rc.Consumed quchekol index:6 code:1, quantity:31
rc.Consumed quchekol index:6 code:5, quantity:32
rc.Consumed quchekol index:6 code:2, quantity:33
rc.Consumed quchekol index:6 code:3, quantity:34
rp Sent:7
rc Received:35
另一个测试
rp.Produced iufalvxu index:0
rp.Produced ammjynnm index:1
rc.Consumed iufalvxu index:0 code:4, quantity:0
rc.Consumed iufalvxu index:0 code:2, quantity:1
rc.Consumed iufalvxu index:0 code:1, quantity:2
rc.Consumed iufalvxu index:0 code:5, quantity:3
rc.Consumed iufalvxu index:0 code:3, quantity:4
rc.Consumed ammjynnm index:1 code:1, quantity:5
rc.Consumed ammjynnm index:1 code:3, quantity:6
rc.Consumed ammjynnm index:1 code:4, quantity:7
rc.Consumed ammjynnm index:1 code:5, quantity:8
rc.Consumed ammjynnm index:1 code:2, quantity:9
rp.Produced clbecbge index:2
rc.Consumed clbecbge index:2 code:1, quantity:10
rc.Consumed clbecbge index:2 code:4, quantity:11
rc.Consumed clbecbge index:2 code:3, quantity:12
rc.Consumed clbecbge index:2 code:5, quantity:13
rc.Consumed clbecbge index:2 code:2, quantity:14
rp.Produced sletiovo index:3
rc.Consumed sletiovo index:3 code:5, quantity:15
rc.Consumed sletiovo index:3 code:1, quantity:16
rc.Consumed sletiovo index:3 code:2, quantity:17
rc.Consumed sletiovo index:3 code:4, quantity:18
rc.Consumed sletiovo index:3 code:3, quantity:19
rp Sent:4
rc Received:20