生产者更快,消费者延迟,Java
Producer Faster, Consumer Delayed, Java
我正在阅读这个问题
但是,在我的例子中,生产者总是比消费者快。
我有一个 Processor
Class 实现了 Runnable
接口。
Processor
Class 从另一个 IndependentProducer
class 消耗 (reading) 并执行大型操作并产生 ( 写) 给另一个AnotherConsumer
class
IndependentProducer -> Processor -> AnotherConsumer
.
IndependentProducer
class
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
public IndependentProducer() {
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(BlockingQueue<byte[]> out) {
this.out = out;
}
@Override
public void run() {
while (true) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("IndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
out.put(bytes);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
public BlockingQueue<byte[]> getOut() {
return out;
}
}
Processor
class
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final BlockingQueue<byte[]> out;
public Processor(BlockingQueue<byte[]> in, BlockingQueue<byte[]> out) {
this.in = in;
this.out = out;
}
public Processor() {
this.in = new LinkedBlockingQueue<>();
this.out = new LinkedBlockingQueue<>();
}
@Override
public void run() {
if (in != null && out != null) {
try {
while (true) {
byte[] inBytes = out.take();
System.out.println("Processor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
}
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
} else {
System.out.println("Processor End");
}
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public BlockingQueue<byte[]> getOut() {
return out;
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
}
AnotherConsumer
class
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
public AnotherConsumer() {
this.in = new LinkedBlockingQueue<>();
}
public AnotherConsumer(BlockingQueue<byte[]> in) {
this.in = in;
}
@Override
public void run() {
while (true) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("AnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
public BlockingQueue<byte[]> getIn() {
return in;
}
}
在main
方法中。
public static void main(String... args) {
Processor processor = new Processor();
IndependentProducer producer = new IndependentProducer(processor.getOut());
AnotherConsumer consumer = new AnotherConsumer(processor.getIn());
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(procs);
executor.execute(producer);
executor.execute(consumer);
executor.execute(processor);
executor.shutdown();
}
当我测试时我有这个输出:
IndependentProducer -> 0.4130406465737616
Processor -> [B@41873c1
IndependentProducer -> 0.437038149157167
IndependentProducer -> 0.2725539847087094
IndependentProducer -> 0.6904194423406251
IndependentProducer -> 0.3995194490439792
Processor -> [B@adf9d32
AnotherConsumer -> 0.4130406465737616
IndependentProducer -> 0.7282271398850959
IndependentProducer -> 0.5323473994454264
IndependentProducer -> 0.25294453920266635
IndependentProducer -> 0.024447086310892985
IndependentProducer -> 0.4543848001132673
Processor -> [B@ee018b1
AnotherConsumer -> 0.437038149157167
IndependentProducer -> 0.778599966068157
IndependentProducer -> 0.39413401137724347
IndependentProducer -> 0.11395726966828834
IndependentProducer -> 0.8021737270773336
IndependentProducer -> 0.8099562159472291
Processor -> [B@4be29709
Thread.sleep(xxx);
模拟慢进程...
如何确定Processor
Class是否繁忙并创建另一个实例以加速AnotherConsumer
中的输出或消耗?
如何根据延迟级别增加实例数?
我的回复
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerFasterConsumerDelayed {
public static void main(String... args) {
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);
IndependentProducer producer = new IndependentProducer(executor);
AnotherConsumer consumer = new AnotherConsumer(producer.getOut());
executor.execute(producer);
executor.execute(consumer);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
producer.stop();
}
}, 600L); //Simulate independent turn off
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.stop();
}
}, 1000L); //Simulate independent turn off
//executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
private ExecutorService executor;
private volatile boolean isRunning = false;
public IndependentProducer(ExecutorService executor) {
this.executor = executor;
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out) {
this.executor = executor;
this.out = out;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("\t\tIndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
//out.put(bytes);
Processor processor = new Processor(out, bytes);
executor.execute(processor);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tSent:" + quantity);
}
public BlockingQueue<byte[]> getOut() {
return out;
}
public void stop() {
isRunning = false;
}
}
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final byte[] inBytes;
public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
this.in = in;
this.inBytes = inBytes;
}
@Override
public void run() {
if (inBytes != null) {
try {
System.out.println("\t\t\tProcessor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
private volatile boolean isRunning = false;
public AnotherConsumer() {
this.in = new LinkedBlockingQueue<>();
}
public AnotherConsumer(BlockingQueue<byte[]> in) {
this.in = in;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("\t\tAnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tRead:" + quantity);
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public void stop() {
isRunning = false;
}
}
}
输出
IndependentProducer -> 0.2727536875191199
Processor -> [B@30aa5984
IndependentProducer -> 0.3907197939463575
Processor -> [B@343c0758
IndependentProducer -> 0.17914054098557186
Processor -> [B@44029c5d
IndependentProducer -> 0.9639063829785499
Processor -> [B@6695b54d
IndependentProducer -> 0.7645697072469784
Processor -> [B@8730a97
AnotherConsumer -> 0.2727536875191199
IndependentProducer -> 0.5127428481615691
Processor -> [B@3a5232f4
AnotherConsumer -> 0.3907197939463575
Sent:6
AnotherConsumer -> 0.17914054098557186
AnotherConsumer -> 0.9639063829785499
AnotherConsumer -> 0.7645697072469784
AnotherConsumer -> 0.5127428481615691
Read:6
另一种解决方案,消耗了生成的确切数量。
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ProducerFasterConsumerSlower {
public static void main(String... args) {
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);
Counter counter = new Counter();
IndependentProducer producer = new IndependentProducer(executor, counter);
AnotherConsumer consumer = new AnotherConsumer(producer.getOut(), counter);
executor.execute(producer);
executor.execute(consumer);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
producer.stop();
}
}, 1200L);
//executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
private ExecutorService executor;
private Counter counter;
private volatile boolean isRunning = false;
public IndependentProducer(ExecutorService executor, Counter counter) {
this.executor = executor;
this.counter = counter;
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out, Counter counter) {
this.executor = executor;
this.counter = counter;
this.out = out;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("\t\tIndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
//out.put(bytes);
Processor processor = new Processor(out, bytes);
executor.execute(processor);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
counter.setValue(quantity);
}
System.out.println("\tSent:" + quantity);
}
public BlockingQueue<byte[]> getOut() {
return out;
}
public void stop() {
isRunning = false;
}
}
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final byte[] inBytes;
public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
this.in = in;
this.inBytes = inBytes;
}
@Override
public void run() {
if (inBytes != null) {
try {
System.out.println("\t\t\tProcessor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
private Counter counter;
private volatile boolean isRunning = false;
public AnotherConsumer(Counter counter) {
this.in = new LinkedBlockingQueue<>();
this.counter = counter;
}
public AnotherConsumer(BlockingQueue<byte[]> in, Counter counter) {
this.in = in;
this.counter = counter;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (quantity == 0 || (quantity > 0 && quantity < counter.getValue())) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("\t\tAnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tRead:" + quantity);
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public void stop() {
isRunning = false;
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Integer value;
public Counter() {
this.value = 0;
this.rwLock = new ReentrantReadWriteLock();
}
public Integer getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Integer value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
}
输出
--- exec-maven-plugin:1.5.0:exec (default-cli) @ MPMC ---
IndependentProducer -> 0.9331107666615924
Processor -> [B@692e1ca8
IndependentProducer -> 0.5493090855970735
Processor -> [B@42d792c2
IndependentProducer -> 0.20757600749100447
Processor -> [B@c1e719b
IndependentProducer -> 0.36075202290656716
Processor -> [B@189e7143
IndependentProducer -> 0.8367933009439126
Processor -> [B@202ad687
AnotherConsumer -> 0.9331107666615924
IndependentProducer -> 0.769980227177382
Processor -> [B@63d677d8
AnotherConsumer -> 0.5493090855970735
IndependentProducer -> 0.8042202343547955
Processor -> [B@4c1074e8
AnotherConsumer -> 0.20757600749100447
IndependentProducer -> 0.41744652947246386
Processor -> [B@2d99e68
AnotherConsumer -> 0.36075202290656716
IndependentProducer -> 0.6401181943036195
Processor -> [B@16598c20
AnotherConsumer -> 0.8367933009439126
IndependentProducer -> 0.5042132162738169
Processor -> [B@2eed3cf2
AnotherConsumer -> 0.769980227177382
IndependentProducer -> 0.32807737872103193
Processor -> [B@26d4e5c6
AnotherConsumer -> 0.8042202343547955
IndependentProducer -> 0.843270828872334
Processor -> [B@482ff9b2
AnotherConsumer -> 0.41744652947246386
Sent:12
AnotherConsumer -> 0.6401181943036195
AnotherConsumer -> 0.5042132162738169
AnotherConsumer -> 0.32807737872103193
AnotherConsumer -> 0.843270828872334
Read:12
我正在阅读这个问题
我有一个 Processor
Class 实现了 Runnable
接口。
Processor
Class 从另一个 IndependentProducer
class 消耗 (reading) 并执行大型操作并产生 ( 写) 给另一个AnotherConsumer
class
IndependentProducer -> Processor -> AnotherConsumer
.
IndependentProducer
class
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
public IndependentProducer() {
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(BlockingQueue<byte[]> out) {
this.out = out;
}
@Override
public void run() {
while (true) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("IndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
out.put(bytes);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
public BlockingQueue<byte[]> getOut() {
return out;
}
}
Processor
class
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final BlockingQueue<byte[]> out;
public Processor(BlockingQueue<byte[]> in, BlockingQueue<byte[]> out) {
this.in = in;
this.out = out;
}
public Processor() {
this.in = new LinkedBlockingQueue<>();
this.out = new LinkedBlockingQueue<>();
}
@Override
public void run() {
if (in != null && out != null) {
try {
while (true) {
byte[] inBytes = out.take();
System.out.println("Processor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
}
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
} else {
System.out.println("Processor End");
}
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public BlockingQueue<byte[]> getOut() {
return out;
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
}
AnotherConsumer
class
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
public AnotherConsumer() {
this.in = new LinkedBlockingQueue<>();
}
public AnotherConsumer(BlockingQueue<byte[]> in) {
this.in = in;
}
@Override
public void run() {
while (true) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("AnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
public BlockingQueue<byte[]> getIn() {
return in;
}
}
在main
方法中。
public static void main(String... args) {
Processor processor = new Processor();
IndependentProducer producer = new IndependentProducer(processor.getOut());
AnotherConsumer consumer = new AnotherConsumer(processor.getIn());
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(procs);
executor.execute(producer);
executor.execute(consumer);
executor.execute(processor);
executor.shutdown();
}
当我测试时我有这个输出:
IndependentProducer -> 0.4130406465737616
Processor -> [B@41873c1
IndependentProducer -> 0.437038149157167
IndependentProducer -> 0.2725539847087094
IndependentProducer -> 0.6904194423406251
IndependentProducer -> 0.3995194490439792
Processor -> [B@adf9d32
AnotherConsumer -> 0.4130406465737616
IndependentProducer -> 0.7282271398850959
IndependentProducer -> 0.5323473994454264
IndependentProducer -> 0.25294453920266635
IndependentProducer -> 0.024447086310892985
IndependentProducer -> 0.4543848001132673
Processor -> [B@ee018b1
AnotherConsumer -> 0.437038149157167
IndependentProducer -> 0.778599966068157
IndependentProducer -> 0.39413401137724347
IndependentProducer -> 0.11395726966828834
IndependentProducer -> 0.8021737270773336
IndependentProducer -> 0.8099562159472291
Processor -> [B@4be29709
Thread.sleep(xxx);
模拟慢进程...
如何确定Processor
Class是否繁忙并创建另一个实例以加速AnotherConsumer
中的输出或消耗?
如何根据延迟级别增加实例数?
我的回复
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerFasterConsumerDelayed {
public static void main(String... args) {
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);
IndependentProducer producer = new IndependentProducer(executor);
AnotherConsumer consumer = new AnotherConsumer(producer.getOut());
executor.execute(producer);
executor.execute(consumer);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
producer.stop();
}
}, 600L); //Simulate independent turn off
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.stop();
}
}, 1000L); //Simulate independent turn off
//executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
private ExecutorService executor;
private volatile boolean isRunning = false;
public IndependentProducer(ExecutorService executor) {
this.executor = executor;
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out) {
this.executor = executor;
this.out = out;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("\t\tIndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
//out.put(bytes);
Processor processor = new Processor(out, bytes);
executor.execute(processor);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tSent:" + quantity);
}
public BlockingQueue<byte[]> getOut() {
return out;
}
public void stop() {
isRunning = false;
}
}
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final byte[] inBytes;
public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
this.in = in;
this.inBytes = inBytes;
}
@Override
public void run() {
if (inBytes != null) {
try {
System.out.println("\t\t\tProcessor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
private volatile boolean isRunning = false;
public AnotherConsumer() {
this.in = new LinkedBlockingQueue<>();
}
public AnotherConsumer(BlockingQueue<byte[]> in) {
this.in = in;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("\t\tAnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tRead:" + quantity);
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public void stop() {
isRunning = false;
}
}
}
输出
IndependentProducer -> 0.2727536875191199
Processor -> [B@30aa5984
IndependentProducer -> 0.3907197939463575
Processor -> [B@343c0758
IndependentProducer -> 0.17914054098557186
Processor -> [B@44029c5d
IndependentProducer -> 0.9639063829785499
Processor -> [B@6695b54d
IndependentProducer -> 0.7645697072469784
Processor -> [B@8730a97
AnotherConsumer -> 0.2727536875191199
IndependentProducer -> 0.5127428481615691
Processor -> [B@3a5232f4
AnotherConsumer -> 0.3907197939463575
Sent:6
AnotherConsumer -> 0.17914054098557186
AnotherConsumer -> 0.9639063829785499
AnotherConsumer -> 0.7645697072469784
AnotherConsumer -> 0.5127428481615691
Read:6
另一种解决方案,消耗了生成的确切数量。
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ProducerFasterConsumerSlower {
public static void main(String... args) {
int procs = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);
Counter counter = new Counter();
IndependentProducer producer = new IndependentProducer(executor, counter);
AnotherConsumer consumer = new AnotherConsumer(producer.getOut(), counter);
executor.execute(producer);
executor.execute(consumer);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
producer.stop();
}
}, 1200L);
//executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).
}
public static String getRandomString(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; i++) {
char c = (char) (new Random().nextInt(25) + 'a');
sb.append(c);
}
return sb.toString();
}
public static class IndependentProducer implements Runnable {
private final BlockingQueue<byte[]> out;
private ExecutorService executor;
private Counter counter;
private volatile boolean isRunning = false;
public IndependentProducer(ExecutorService executor, Counter counter) {
this.executor = executor;
this.counter = counter;
this.out = new LinkedBlockingQueue<>();
}
public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out, Counter counter) {
this.executor = executor;
this.counter = counter;
this.out = out;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (isRunning) {
try {
byte[] bytes = new byte[8];
double value = Math.random();
System.out.println("\t\tIndependentProducer -> " + value);
ByteBuffer.wrap(bytes).putDouble(value);
//bytes to be Write taken or Produced from some method!
//out.put(bytes);
Processor processor = new Processor(out, bytes);
executor.execute(processor);
Thread.sleep(100);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
counter.setValue(quantity);
}
System.out.println("\tSent:" + quantity);
}
public BlockingQueue<byte[]> getOut() {
return out;
}
public void stop() {
isRunning = false;
}
}
public static class Processor implements Runnable {
private final BlockingQueue<byte[]> in;
private final byte[] inBytes;
public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
this.in = in;
this.inBytes = inBytes;
}
@Override
public void run() {
if (inBytes != null) {
try {
System.out.println("\t\t\tProcessor -> " + inBytes);
byte[] outBytes = internalProcessing(inBytes);
in.put(outBytes);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
}
}
}
private static byte[] internalProcessing(byte[] in) {
byte[] out = in;
//Some task processing Input
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return out;
}
public static class AnotherConsumer implements Runnable {
private final BlockingQueue<byte[]> in;
private Counter counter;
private volatile boolean isRunning = false;
public AnotherConsumer(Counter counter) {
this.in = new LinkedBlockingQueue<>();
this.counter = counter;
}
public AnotherConsumer(BlockingQueue<byte[]> in, Counter counter) {
this.in = in;
this.counter = counter;
}
@Override
public void run() {
int quantity = 0;
isRunning = true;
while (quantity == 0 || (quantity > 0 && quantity < counter.getValue())) {
try {
byte[] bytes = in.take();
double value = ByteBuffer.wrap(bytes).getDouble();
System.out.println("\t\tAnotherConsumer -> " + value);
Thread.sleep(50);
} catch (Exception e) {
//Handle Exceptions
e.printStackTrace();
}
quantity++;
}
System.out.println("\tRead:" + quantity);
}
public BlockingQueue<byte[]> getIn() {
return in;
}
public void stop() {
isRunning = false;
}
}
public static class Counter {
private final ReadWriteLock rwLock;
private Integer value;
public Counter() {
this.value = 0;
this.rwLock = new ReentrantReadWriteLock();
}
public Integer getValue() {
Lock readLock = rwLock.readLock();
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
public void setValue(Integer value) {
Lock writeLock = rwLock.writeLock();
writeLock.lock();
try {
this.value = value;
} finally {
writeLock.unlock();
}
}
}
}
输出
--- exec-maven-plugin:1.5.0:exec (default-cli) @ MPMC ---
IndependentProducer -> 0.9331107666615924
Processor -> [B@692e1ca8
IndependentProducer -> 0.5493090855970735
Processor -> [B@42d792c2
IndependentProducer -> 0.20757600749100447
Processor -> [B@c1e719b
IndependentProducer -> 0.36075202290656716
Processor -> [B@189e7143
IndependentProducer -> 0.8367933009439126
Processor -> [B@202ad687
AnotherConsumer -> 0.9331107666615924
IndependentProducer -> 0.769980227177382
Processor -> [B@63d677d8
AnotherConsumer -> 0.5493090855970735
IndependentProducer -> 0.8042202343547955
Processor -> [B@4c1074e8
AnotherConsumer -> 0.20757600749100447
IndependentProducer -> 0.41744652947246386
Processor -> [B@2d99e68
AnotherConsumer -> 0.36075202290656716
IndependentProducer -> 0.6401181943036195
Processor -> [B@16598c20
AnotherConsumer -> 0.8367933009439126
IndependentProducer -> 0.5042132162738169
Processor -> [B@2eed3cf2
AnotherConsumer -> 0.769980227177382
IndependentProducer -> 0.32807737872103193
Processor -> [B@26d4e5c6
AnotherConsumer -> 0.8042202343547955
IndependentProducer -> 0.843270828872334
Processor -> [B@482ff9b2
AnotherConsumer -> 0.41744652947246386
Sent:12
AnotherConsumer -> 0.6401181943036195
AnotherConsumer -> 0.5042132162738169
AnotherConsumer -> 0.32807737872103193
AnotherConsumer -> 0.843270828872334
Read:12