生产者更快,消费者延迟,Java

Producer Faster, Consumer Delayed, Java

我正在阅读这个问题 但是,在我的例子中,生产者总是比消费者快。

我有一个 Processor Class 实现了 Runnable 接口。

Processor Class 从另一个 IndependentProducer class 消耗 (reading) 并执行大型操作并产生 ( ) 给另一个AnotherConsumer class

IndependentProducer -> Processor -> AnotherConsumer.

IndependentProducer class

public static class IndependentProducer implements Runnable {

  private final BlockingQueue<byte[]> out;

  public IndependentProducer() {
    this.out = new LinkedBlockingQueue<>();
  }

  public IndependentProducer(BlockingQueue<byte[]> out) {
    this.out = out;
  }

  @Override
  public void run() {
    while (true) {
      try {
        byte[] bytes = new byte[8];
        double value = Math.random();
        System.out.println("IndependentProducer -> " + value);
        ByteBuffer.wrap(bytes).putDouble(value);
        //bytes to be Write taken or Produced from some method!
        out.put(bytes);
        Thread.sleep(100);
      } catch (Exception e) {
        //Handle Exceptions
        e.printStackTrace();
      }
    }
  }

  public BlockingQueue<byte[]> getOut() {
    return out;
  }

}

Processor class

public static class Processor implements Runnable {

  private final BlockingQueue<byte[]> in;
  private final BlockingQueue<byte[]> out;

  public Processor(BlockingQueue<byte[]> in, BlockingQueue<byte[]> out) {
    this.in = in;
    this.out = out;
  }

  public Processor() {
    this.in = new LinkedBlockingQueue<>();
    this.out = new LinkedBlockingQueue<>();
  }

  @Override
  public void run() {
    if (in != null && out != null) {
      try {
        while (true) {
          byte[] inBytes = out.take();
          System.out.println("Processor -> " + inBytes);
          byte[] outBytes = internalProcessing(inBytes);
          in.put(outBytes);
        }
      } catch (Exception e) {
        //Handle Exceptions
        e.printStackTrace();
      }
    } else {
      System.out.println("Processor End");
    }
  }

  public BlockingQueue<byte[]> getIn() {
    return in;
  }

  public BlockingQueue<byte[]> getOut() {
    return out;
  }

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    //Some task processing Input
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
    }
    return out;
  }

}

AnotherConsumer class

public static class AnotherConsumer implements Runnable {

  private final BlockingQueue<byte[]> in;

  public AnotherConsumer() {
    this.in = new LinkedBlockingQueue<>();
  }

  public AnotherConsumer(BlockingQueue<byte[]> in) {
    this.in = in;
  }

  @Override
  public void run() {
    while (true) {
      try {
        byte[] bytes = in.take();
        double value = ByteBuffer.wrap(bytes).getDouble();
        System.out.println("AnotherConsumer -> " + value);
        Thread.sleep(50);
      } catch (Exception e) {
        //Handle Exceptions
        e.printStackTrace();
      }
    }
  }

  public BlockingQueue<byte[]> getIn() {
    return in;
  }

}

main方法中。

public static void main(String... args) {
  Processor processor = new Processor();
  IndependentProducer producer = new IndependentProducer(processor.getOut());
  AnotherConsumer consumer = new AnotherConsumer(processor.getIn());

  int procs = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = Executors.newFixedThreadPool(procs);
  executor.execute(producer);
  executor.execute(consumer);
  executor.execute(processor);
  executor.shutdown();
}

当我测试时我有这个输出:

IndependentProducer -> 0.4130406465737616
Processor -> [B@41873c1
IndependentProducer -> 0.437038149157167
IndependentProducer -> 0.2725539847087094
IndependentProducer -> 0.6904194423406251
IndependentProducer -> 0.3995194490439792
Processor -> [B@adf9d32
AnotherConsumer -> 0.4130406465737616
IndependentProducer -> 0.7282271398850959
IndependentProducer -> 0.5323473994454264
IndependentProducer -> 0.25294453920266635
IndependentProducer -> 0.024447086310892985
IndependentProducer -> 0.4543848001132673
Processor -> [B@ee018b1
AnotherConsumer -> 0.437038149157167
IndependentProducer -> 0.778599966068157
IndependentProducer -> 0.39413401137724347
IndependentProducer -> 0.11395726966828834
IndependentProducer -> 0.8021737270773336
IndependentProducer -> 0.8099562159472291
Processor -> [B@4be29709

Thread.sleep(xxx); 模拟慢进程...

如何确定Processor Class是否繁忙并创建另一个实例以加速AnotherConsumer中的输出或消耗?

如何根据延迟级别增加实例数?

我的回复

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerFasterConsumerDelayed {

  public static void main(String... args) {
    int procs = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);

    IndependentProducer producer = new IndependentProducer(executor);
    AnotherConsumer consumer = new AnotherConsumer(producer.getOut());

    executor.execute(producer);
    executor.execute(consumer);

    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
        producer.stop();
      }
    }, 600L);  //Simulate independent turn off
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
        consumer.stop();
      }
    }, 1000L);  //Simulate independent turn off
    //executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).

  }

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
      sb.append(c);
    }
    return sb.toString();
  }

  public static class IndependentProducer implements Runnable {

    private final BlockingQueue<byte[]> out;
    private ExecutorService executor;
    private volatile boolean isRunning = false;

    public IndependentProducer(ExecutorService executor) {
      this.executor = executor;
      this.out = new LinkedBlockingQueue<>();
    }

    public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out) {
      this.executor = executor;
      this.out = out;
    }

    @Override
    public void run() {
      int quantity = 0;
      isRunning = true;
      while (isRunning) {
        try {
          byte[] bytes = new byte[8];
          double value = Math.random();
          System.out.println("\t\tIndependentProducer -> " + value);
          ByteBuffer.wrap(bytes).putDouble(value);
          //bytes to be Write taken or Produced from some method!
          //out.put(bytes);
          Processor processor = new Processor(out, bytes);
          executor.execute(processor);
          Thread.sleep(100);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
        quantity++;
      }
      System.out.println("\tSent:" + quantity);
    }

    public BlockingQueue<byte[]> getOut() {
      return out;
    }

    public void stop() {
      isRunning = false;
    }

  }

  public static class Processor implements Runnable {

    private final BlockingQueue<byte[]> in;
    private final byte[] inBytes;

    public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
      this.in = in;
      this.inBytes = inBytes;
    }

    @Override
    public void run() {
      if (inBytes != null) {
        try {
          System.out.println("\t\t\tProcessor -> " + inBytes);
          byte[] outBytes = internalProcessing(inBytes);
          in.put(outBytes);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
      }

    }
  }

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    //Some task processing Input
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
    }
    return out;
  }

  public static class AnotherConsumer implements Runnable {

    private final BlockingQueue<byte[]> in;
    private volatile boolean isRunning = false;

    public AnotherConsumer() {
      this.in = new LinkedBlockingQueue<>();
    }

    public AnotherConsumer(BlockingQueue<byte[]> in) {
      this.in = in;
    }

    @Override
    public void run() {
      int quantity = 0;
      isRunning = true;
      while (isRunning) {
        try {
          byte[] bytes = in.take();
          double value = ByteBuffer.wrap(bytes).getDouble();
          System.out.println("\t\tAnotherConsumer -> " + value);
          Thread.sleep(50);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
        quantity++;
      }
      System.out.println("\tRead:" + quantity);
    }

    public BlockingQueue<byte[]> getIn() {
      return in;
    }

    public void stop() {
      isRunning = false;
    }

  }

}

输出

    IndependentProducer -> 0.2727536875191199
        Processor -> [B@30aa5984
    IndependentProducer -> 0.3907197939463575
        Processor -> [B@343c0758
    IndependentProducer -> 0.17914054098557186
        Processor -> [B@44029c5d
    IndependentProducer -> 0.9639063829785499
        Processor -> [B@6695b54d
    IndependentProducer -> 0.7645697072469784
        Processor -> [B@8730a97
    AnotherConsumer -> 0.2727536875191199
    IndependentProducer -> 0.5127428481615691
        Processor -> [B@3a5232f4
    AnotherConsumer -> 0.3907197939463575
Sent:6
    AnotherConsumer -> 0.17914054098557186
    AnotherConsumer -> 0.9639063829785499
    AnotherConsumer -> 0.7645697072469784
    AnotherConsumer -> 0.5127428481615691
Read:6

另一种解决方案,消耗了生成的确切数量。

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ProducerFasterConsumerSlower {

  public static void main(String... args) {
    int procs = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newCachedThreadPool();//Executors.newFixedThreadPool(procs);

    Counter counter = new Counter();
    IndependentProducer producer = new IndependentProducer(executor, counter);
    AnotherConsumer consumer = new AnotherConsumer(producer.getOut(), counter);

    executor.execute(producer);
    executor.execute(consumer);

    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
        producer.stop();
      }
    }, 1200L);
    //executor.shutdown(); // if shutdown no new tasks will be accepted (But we need to add more tasks).

  }

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
      sb.append(c);
    }
    return sb.toString();
  }

  public static class IndependentProducer implements Runnable {

    private final BlockingQueue<byte[]> out;
    private ExecutorService executor;
    private Counter counter;
    private volatile boolean isRunning = false;

    public IndependentProducer(ExecutorService executor, Counter counter) {
      this.executor = executor;
      this.counter = counter;
      this.out = new LinkedBlockingQueue<>();
    }

    public IndependentProducer(ExecutorService executor, BlockingQueue<byte[]> out, Counter counter) {
      this.executor = executor;
      this.counter = counter;
      this.out = out;
    }

    @Override
    public void run() {
      int quantity = 0;
      isRunning = true;
      while (isRunning) {
        try {
          byte[] bytes = new byte[8];
          double value = Math.random();
          System.out.println("\t\tIndependentProducer -> " + value);
          ByteBuffer.wrap(bytes).putDouble(value);
          //bytes to be Write taken or Produced from some method!
          //out.put(bytes);
          Processor processor = new Processor(out, bytes);
          executor.execute(processor);
          Thread.sleep(100);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
        quantity++;
        counter.setValue(quantity);
      }
      System.out.println("\tSent:" + quantity);
    }

    public BlockingQueue<byte[]> getOut() {
      return out;
    }

    public void stop() {
      isRunning = false;
    }

  }

  public static class Processor implements Runnable {

    private final BlockingQueue<byte[]> in;
    private final byte[] inBytes;

    public Processor(BlockingQueue<byte[]> in, byte[] inBytes) {
      this.in = in;
      this.inBytes = inBytes;
    }

    @Override
    public void run() {
      if (inBytes != null) {
        try {
          System.out.println("\t\t\tProcessor -> " + inBytes);
          byte[] outBytes = internalProcessing(inBytes);
          in.put(outBytes);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
      }

    }
  }

  private static byte[] internalProcessing(byte[] in) {
    byte[] out = in;
    //Some task processing Input
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
    }
    return out;
  }

  public static class AnotherConsumer implements Runnable {

    private final BlockingQueue<byte[]> in;
    private Counter counter;
    private volatile boolean isRunning = false;

    public AnotherConsumer(Counter counter) {
      this.in = new LinkedBlockingQueue<>();
      this.counter = counter;
    }

    public AnotherConsumer(BlockingQueue<byte[]> in, Counter counter) {
      this.in = in;
      this.counter = counter;
    }

    @Override
    public void run() {
      int quantity = 0;
      isRunning = true;
      while (quantity == 0 || (quantity > 0 && quantity < counter.getValue())) {
        try {
          byte[] bytes = in.take();
          double value = ByteBuffer.wrap(bytes).getDouble();
          System.out.println("\t\tAnotherConsumer -> " + value);
          Thread.sleep(50);
        } catch (Exception e) {
          //Handle Exceptions
          e.printStackTrace();
        }
        quantity++;
      }
      System.out.println("\tRead:" + quantity);
    }

    public BlockingQueue<byte[]> getIn() {
      return in;
    }

    public void stop() {
      isRunning = false;
    }

  }

  public static class Counter {
    private final ReadWriteLock rwLock;
    private Integer value;

    public Counter() {
      this.value = 0;
      this.rwLock = new ReentrantReadWriteLock();
    }

    public Integer getValue() {
      Lock readLock = rwLock.readLock();
      readLock.lock();
      try {
        return value;
      } finally {
        readLock.unlock();
      }

    }

    public void setValue(Integer value) {
      Lock writeLock = rwLock.writeLock();
      writeLock.lock();
      try {
        this.value = value;
      } finally {
        writeLock.unlock();
      }
    }

  }

}

输出

--- exec-maven-plugin:1.5.0:exec (default-cli) @ MPMC ---
    IndependentProducer -> 0.9331107666615924
        Processor -> [B@692e1ca8
    IndependentProducer -> 0.5493090855970735
        Processor -> [B@42d792c2
    IndependentProducer -> 0.20757600749100447
        Processor -> [B@c1e719b
    IndependentProducer -> 0.36075202290656716
        Processor -> [B@189e7143
    IndependentProducer -> 0.8367933009439126
        Processor -> [B@202ad687
    AnotherConsumer -> 0.9331107666615924
    IndependentProducer -> 0.769980227177382
        Processor -> [B@63d677d8
    AnotherConsumer -> 0.5493090855970735
    IndependentProducer -> 0.8042202343547955
        Processor -> [B@4c1074e8
    AnotherConsumer -> 0.20757600749100447
    IndependentProducer -> 0.41744652947246386
        Processor -> [B@2d99e68
    AnotherConsumer -> 0.36075202290656716
    IndependentProducer -> 0.6401181943036195
        Processor -> [B@16598c20
    AnotherConsumer -> 0.8367933009439126
    IndependentProducer -> 0.5042132162738169
        Processor -> [B@2eed3cf2
    AnotherConsumer -> 0.769980227177382
    IndependentProducer -> 0.32807737872103193
        Processor -> [B@26d4e5c6
    AnotherConsumer -> 0.8042202343547955
    IndependentProducer -> 0.843270828872334
        Processor -> [B@482ff9b2
    AnotherConsumer -> 0.41744652947246386
Sent:12
    AnotherConsumer -> 0.6401181943036195
    AnotherConsumer -> 0.5042132162738169
    AnotherConsumer -> 0.32807737872103193
    AnotherConsumer -> 0.843270828872334
Read:12