H2O 生产者 java 线程锁,可重入锁

H2O producer java threads lock , reentrant lock

我想通过三个线程连续生成 h2o,第一个线程将生成 h,第二个线程将生成 h,第三个线程将生成 o。我怎样才能用 lock , consumer producer

        package com.threads.reentrantlock.consumerproducer;

        import java.util.concurrent.locks.Condition;
        import java.util.concurrent.locks.Lock;
        import java.util.concurrent.locks.ReentrantLock;

        public class H2OProducer {
            static Lock lock = new ReentrantLock(true);
            static Condition condition = lock.newCondition();

            public static void main(String[] args) {
                try {
                    Thread h1 = new Thread(() -> {
                        try {
                            hydrogenProducer();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                    Thread h2 = new Thread(() -> {
                        try {
                            hydrogenProducer();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                    Thread o = new Thread(() -> {
                        try {
                            hydrogenProducer();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                    h1.start();
                    h2.start();
                    o.start();

                    try {
                        h1.join();
                        h2.join();
                        o.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (Exception e) {
                }

            }

            public static void hydrogenProducer() throws InterruptedException {
                try {
                    lock.lock();
                        System.out.println("h");
                condition.signalAll();
            } finally {
                lock.unlock();
            }

        }

        public static void oxygenProducer() throws InterruptedException {
            try {
                lock.lock();
                System.out.println("o");
                    condition.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }

我做错了什么

线程异常 "Thread-2"h java.lang.IllegalMonitorStateException 在 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261) 在 java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457) 在 com.threads.reentrantlock.consumerproducer.H2OProducer.hydrogenProducer(H2OProducer.java:56) 在 com.threads.reentrantlock.consumerproducer.H2OProducer.lambda$2(H2OProducer.java:29) 在 java.lang.Thread.run(Thread.java:745)

您正在发出信号,但没有相应的等待。此外,还有一个错字——从两个线程调用 hydrogenProducer()(线程 o 和线程 h

我假设您想在生成 O 之前生成两个 H。两个 Hs 是由同一个线程还是两个不同的线程生成并不重要。我已经使用 randomSleep() 来演示这种情况。

import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class H2OProducer {
    static final int H2O_COUNT = 1_000;
    static final Random rand = new Random();

    static final Lock lock = new ReentrantLock(true);
    static final Condition oxzWait = lock.newCondition();
    static final Condition hydWait = lock.newCondition();

    static volatile int hydCount = 0;

    public static void main(String[] args) {
        try {
            Thread h1 = new Thread(() -> {
                try {
                    hydrogenProducer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread h2 = new Thread(() -> {
                try {
                    hydrogenProducer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread o = new Thread(() -> {
                try {
                    oxygenProducer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            h1.setName("H1-Producer");
            h2.setName("H2-Producer");
            o.setName("Ox-Producer");

            h1.start();
            h2.start();
            o.start();

            try {
                h1.join();
                h2.join();
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
        }

    }

    public static void hydrogenProducer() throws InterruptedException {
        for (int i = 0; i < H2O_COUNT; i++) {
            lock.lock();
            try {
                while (hydCount == 2) {
                    hydWait.await();
                }

                hydCount++;
                System.out.println(Thread.currentThread().getName()+ ": H produced - " + i);

                if (hydCount == 2) {
                    oxzWait.signalAll();
                }
            } finally {
                lock.unlock();
            }

            randomSleep();
        }
    }

    public static void oxygenProducer() throws InterruptedException {
        for (int i = 0; i < H2O_COUNT; i++) {
            lock.lock();
            try {
                while (hydCount < 2) {
                    oxzWait.await();
                }

            hydCount = 0;
            System.out.println(Thread.currentThread().getName()+ ": O produced - " + i);
            System.out.println("");
            hydWait.signalAll();
            } finally {
                lock.unlock();
            }

            randomSleep();
        }
    } 

    public static void randomSleep() {
        int ms = rand.nextInt(500);
        try { 
            Thread.sleep(ms);
        } catch (InterruptedException ex) {
        }
    }
}

但是,如果您希望每个 H 制作人为每个 H2O 作品制作一个 H,那么您可以查看 CyclicBarrier。如果需要维护顺序,也可以进行线程链接,例如 T1 -> T2 -> T3 -> T1 -> T2 -> T3 ->.

上面 Java 示例中采用的方法强制 hydrogenProducer 和 OxygenProducer 在生产开始之前知道要生产多少项目。另一种设计方法通过计算水分子的最终输出来集中了解何时停止生产,这对应于 "water factory" 中最终生产项目的计数。在真正的制造控制系统中,关于何时停止生产的决定应该是集中的,而不是留给系统中的每个单独的组件。

以下用 Ada 编写的示例展示了这种集中控制。该解决方案不是使用信号来指示氢气或氧气的产生,而是实际上将氢气和氧气的符号元素从生产者传递给消费者,消费者控制执行并对最终元素生产进行计数。

Ada 解决方案采用 Rendezvous 机制,允许生产者以严格控制的方式直接与消费者通信。

生产者任务类型在名为 Elements 的包中定义。与 Java 不同,Ada 强制分离接口和实现。 Elements 包的接口定义为:

   package Elements is
   type Element_Type is (Hydrogen, Oxygen);

   task type Hydrogen_Producer is
      Entry Stop;
      Entry Get_Element(Atom : out Element_Type);
   end Hydrogen_Producer;

   task type Oxygen_Producer is
      Entry Stop;
      Entry Get_Element(Atom : out Element_Type);
   end Oxygen_Producer;

   end Elements;

Elements 接口规范顶部的类型定义定义了一个名为 Element_Type 的数据类型,它具有两个值,Hydrogen 和 Oxygen。定义了两种任务类型,一种用于生产氢气,一种用于生产氧气。每种任务类型都有两个条目。条目是允许一个任务(或线程)直接与另一个任务通信的机制。条目 Stop 告诉任务何时停止执行。条目 Get_Element 获取任务生成的元素的实例。

Rendezvous机制自动将调用条目的任务与被调用任务同步。任务类型的实现展示了任务间通信是如何进行的。

with Ada.Numerics.Float_Random; use Ada.Numerics.Float_Random;

package body Elements is
   Seed : Generator;
   -----------------------
   -- Hydrogen_Producer --
   -----------------------

   task body Hydrogen_Producer is
      Element : constant Element_Type := Hydrogen;
   begin
      loop
         select
            accept Stop;
            exit;
         or
            accept Get_Element(Atom : out Element_Type) do
               Atom := Element;
            end Get_Element;
         end select;
         delay Duration(Random(Seed) * 0.1);
      end loop;
   end Hydrogen_Producer;

   ---------------------
   -- Oxygen_Producer --
   ---------------------

   task body Oxygen_Producer is
      Element : constant Element_Type := Oxygen;
   begin
      loop
         select
            accept Stop;
            exit;
         or
            accept Get_Element(Atom : out Element_Type) do
               Atom := Element;
            end Get_Element;
         end select;
         delay Duration(Random(Seed) * 0.1);
      end loop;
   end Oxygen_Producer;
begin
   reset(Seed);
end Elements;

在实现任务类型的任务主体中,声明了一个名为 seed 的变量。变量 seed 是包 Ada.Numerics.Float_Random 中定义的类型 Generator 的一个实例。该变量将保存用于为生产者任务生成随机延迟的随机数种子。在任何生产者任务开始执行之前,种子在任务文件的底部被初始化。

除了Hydrogen_Producer只产生氢气和Oxygen_Producer只产生氧气外,这两个任务完全相同。 这两个任务都包含一个无限循环,只有在调用 Stop 条目时才会中断。调用 Stop 后,循环退出由 exit 命令命令。我们还希望能够从每个生产者那里获取数据,以便通过接受 Get_Element 条目并传递生产的元素来处理该角色。 显然,我们要么会收到 Stop entry call,要么会收到 Get_Element entry call,要么不会收到 entry call。 Select 命令允许我们的程序处理 Stop 或 Get_Element 中的任何一个,而不偏爱其中一个。当两个条目都没有被调用时会发生什么?生产者在 select 块中等待要调用的条目之一,从而与调用者同步执行。

我们现在需要等效于 "main" 的方法来创建可执行程序。 Ada 允许程序员任意命名程序入口点。不需要命名为 "main".

-----------------------------------------------------------------------
-- H2O production using 2 Hydrogen tasks and 1 Oxygen task
-----------------------------------------------------------------------

with Ada.Text_IO; use Ada.Text_IO;
with Elements; use Elements;

procedure Three_Task_H2O is
   H1 : Hydrogen_Producer;
   H2 : Hydrogen_Producer;
   Oxy : Oxygen_Producer;
   New_Atom    : Element_Type;
   Water_Count : natural := 0;

begin
   while Water_Count < 1000 loop
      H1.Get_Element(New_Atom);
      H2.Get_element(New_Atom);
      Oxy.Get_Element(New_Atom);
      Water_Count := Water_Count + 1;
      if Water_Count mod 20 = 0 then
         Put_Line("Water Produced:" & Water_Count'Image);
      end if;
   end loop;
   H1.Stop;
   H2.Stop;
   Oxy.Stop;
end Three_Task_H2o;

过程 Three_Task_H2O 创建了两个 Hydrogen_Producer 的实例,名为 H1 和 H2。它还创建了一个名为 Oxy 的 Oxygen_Producer 实例。任务立即开始执行。在 Java 中找不到 thread.start 语法的等价物。 当水分子数小于 1000 时,Tree_Task_H2O 循环。循环的每次迭代都会为每个生产者调用 Get_Element 条目。如果生产者没有准备好怎么办?毕竟,每个生产者在生产其元素时都会经历随机延迟。结果是调用(Consuming)任务(Three_Task_H2O)被挂起,直到处理完每个入口调用。 每产生 20 个水分子,就会输出有关产水进度的信息。当产生 1000 个水分子时,循环终止并调用所有三个任务的停止条目,按顺序终止每个任务。