使用信号量进行线程同步

Thread synchronisation using semaphores

这是一道面试题,如有帮助将不胜感激

如何同步两个线程,其中一个线程递增一个值,另一个线程显示它(P.S。显示该值的线程必须仅在其为新值时显示一个值)

例如:int x = 5;

T1 :增加到 6

T2 : 必须显示6(只能显示一次),当变成7时必须再次显示

我回答说我会使用类似这样的信号量:

int c=0; // variable that I used to synchronize

// In T1
if( c = 0 )
{
   c++;
   x++; // value that is incremented
}

// in T2
if( c == 1 )
{
   cout<<x;
   c--;
}

然后他问如果在将 c 设置为 1 之后但在递增 x 之前有从线程 T1T2 的上下文切换,你会怎么做(在那种情况下它会在递增之前进入​​ P2 x )

这部分我无法回答。任何帮助将不胜感激。

很好的运动。

您没有在问题中指定 c++ 标记,但问题本身包含 cout<<x,因此您可能正在面试 C++ 职位。尽管如此,我还是会在 Java 中回答,因为这是一个面试问题,只要我避免使用对 Java.

来说过于具体的任何内容,语言就不会太重要。

正如您的面试官所指出的,同步必须在两个方向上发生:

  • 打印线程必须等待递增线程完成它的工作
  • 递增线程必须等待打印线程完成它的工作

所以我们需要一些东西让我们知道打印机已经完成(所以增量器可以 运行),另一个让我们知道增量器已经完成。我为此使用了两个信号量:

Working version on Ideone

import java.util.concurrent.Semaphore;

class IncrementDemo {
    static int x = 0;

    public static void main(String[] args) {
        Semaphore incrementLock = new Semaphore(0);
        Semaphore printLock = new Semaphore(0);

        Thread incrementer = new Thread(() -> {
            for(;;) {
                incrementLock.acquire(); //Wait to be allowed to increment
                x++;
                printLock.release(); //Allow the printer to print
            }
        });

        Thread printer = new Thread(() -> {
            for (;;) {
                incrementLock.release(); //Let the incrementer to its job
                printLock.acquire(); //Wait to be allowed to print
                System.out.println(x);
            }
        });

        incrementer.setDaemon(false); //Keep the program alive after main() exits
        printer.setDaemon(false);

        incrementer.start(); //Start both threads
        printer.start();
    }

}

(为了便于阅读,我删除了 acquire 周围的 try/catch 块)。

输出:

1
2
3
4
5
6
7
...

这是条件变量的经典用例,有一个小问题,即在线程 2 运行以处理它之前,该值可以在线程 1 中轻松更新多次:

// In some scope common to both threads
int c_ = 0; // variable
std::mutex mutex_();
std::condition_variable cond_();

// Thread 1
{ 
    std::lock_guard<std::mutex> lock(mutex_);
    ++c_;
}
cond_.notify_one();

// Thread 2
{ 
    std::lock_guard<std::mutex> lock( mutex_ );
    int cLocal = c_;
    while ( !done ) { 
        cond_.wait( lock, [] { return c_ != cLocal; } );
        while ( cLocal++ < c_ ) 
            ... // Display new *local* value
    }
}

问题:

并行代码一般有 2 个主要问题。

1.原子性

代码中最小的粒度实际上不是像 i++ 这样的单个操作,而是底层的汇编指令。因此,每个涉及写入的操作都不能从多个线程调用。 (这在您的目标架构上有很大差异,但 x86 与 arm64 相比非常严格)

但幸运的是,c++ 提供了 std::atomic 操作,这为您提供了一种很好的独立于平台的方式来修改来自多个线程的变量。

2。一致性

只要保持 local 线程的一致性,编译器和处理器都可以对任何指令重新排序。那么这是什么意思呢?

看看你的第一个帖子

if( c = 0 )
{
   c++;
   x++; // value that is incremented
}

您有 3 个操作 c == 0c++x++。两个增量不相互依赖,因此允许编译器交换它们。在运行时,核心也可能会重新排序它们,让您处于非常模糊的境地。在顺序世界中,这非常好并且可以提高整体性能(除非它会导致像崩溃这样的安全漏洞)。不幸的是,编译器或 cpu 都不识别并行代码,因此任何优化都可能破坏您的并行程序。

但再一次,c++ 为这个问题提供了一个名为 std::memory_order 的内置解决方案,它强制执行特定的一致性模型。

解决方案:

简单互斥: 互斥量是一种简单但功能强大的工具。它通过提供所谓的关键部分来防止并行执行,从而解决了原子性和一致性问题。这意味着,在给定的示例中,两个线程中的 if 子句是顺序的,永远不会并行执行。 实施有效,但有一个缺陷。如果其中一个线程非常慢,另一个线程将通过不断检查 newValue 标志来浪费大量 cpu 时间。

#include <mutex>

std::mutex mutex;
int value = true;
bool newValue = false;

void producer_thread() {
   while(true) {
       std::lock_guard<std::mutex> lg(mutex);
        if (newValue == false) {
            value++;
            newValue = true;
        }
   }
}

void consumer_thread() {
   while(true) {
       std::lock_guard<std::mutex> lg(mutex);
        if (newValue == true) {
            std::cout << value;
            newValue = false;
        }
   }
}

条件变量:

条件变量基本上只是一个"wait-for-notify"-构造。您可以通过调用 wait 来阻止当前执行,直到另一个线程调用 notify。此实现将是首选方案。

#include <mutex>
#include <condition_variable>

std::mutex mutex;
std::condition_variable cond;
int value = true;
bool newValue = false;

void producer() {
   while(true) {
       std::unique_lock<std::mutex> ul(mutex);

        while (newValue == true) {
            cond.wait(ul);
        }

        value++;
        newValue = true;
        cond.notify_all();
   }
}

void consumer() {
   while(true) {
       std::unique_lock<std::mutex> ul(mutex);

        while (newValue == false) {
            cond.wait(ul);
        }

        std::cout << value;
        newValue = false;
        cond.notify_all();
   }
}