多线程一直是编程中的重要的工具,本文介绍了线程同步相关的知识,和一些比较有趣的应用。互斥量、条件变量、原子操作等基本概念,以及读写锁、双Buffer等应用,尽在本文~

多线程一直是编程中的重要的工具,它可以分充分的利用硬件资源,是我们用更少的时间去完成更多的事情。在之前的博客中,我有介绍了OpenMP的基本使用,OpenMP可以理解为多线程的一个合理和高效的一套抽象工具。这次,打算仔细的介绍多线程编程中的常见的概念和典型的案例。

典型的案例

说到多线程,最核心的问题就是保证数据的读写安全。为了达到此目的,我们需要多很多常见的数据结构做一些改造,从而适应多线程的场景。以下是我工作中比较常见到的一些使用场景:

  1. 线程池
  2. 读写锁
  3. 消息队列
  4. ConcurrentCache
  5. PingPang Buffer

在具体介绍这些使用场景之前,我们还是需要了解需要使用到的一些基本的工具:互斥量、条件变量、原子操作等。

互斥量

互斥量,顾名思义,就是互斥的数据,一个线程持有的时候,其他线程就必须等待。

在C++11中,使用<mutex>头文件引入。以下是一个简单的计数器的实现。

emit函数通过mutex_进行加锁,使得同一时间仅有一个线程可以执行++ x_的操作,从而保证了计数的正确性。

std::lock_guard是个工具类,lck在构造时,调用了lock函数,在析构时调用了unlock,从而避免我们自行调用的时候忘记unlock。

#include <mutex>
#include <thread>
#include <iostream>

class Counter {
public:
    Counter(): x_(0) {}
    void emit() {
        mutex_.lock();
        ++ x_;
        mutex_.unlock();
        // or
        // std::lock_guard<std::mutex> lck(mutex_);
        // ++ x_;
    }
    int count() {
        return x_;
    }
private:
    int x_;
    std::mutex mutex_;
};

int main() {
    Counter c;
    std::thread t1([&c]{
        for (int i = 0; i < 10000000; ++ i) {
            c.emit();
        }
    });
    std::thread t2([&c]{
        for (int i = 0; i < 10000000; ++ i) {
            c.emit();
        }
    });
    t1.join();
    t2.join();
    std::cout << c.count() << std::endl; // 20000000
}

基于Mutex,我们可以方便的实现读写锁。读写锁的作用是,保证数据可以供多个线程并发读,仅一个线程允许写。在存在线程读的情况下,写的线程会阻塞,直到没有任何线程有读操作。

读写锁

首先读写锁会存在一个write_mutex,读线程和写线程都需要抢占这个mutex,从而保证读和写不会同时进行。但是只需要第一个读线程抢占write_mutex即可,其他的读线程不需要再抢占(抢占的话,就不支持并发读了)。当不存在读线程的时候,需要释放write_mutex,这才运行写线程抢占。

因此我们还需要一个计数器,记录当前读线程的个数,并使用另一个read_mutex保证计数器的准确。

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>

class ReadWriteLock {
public:
    ReadWriteLock():reader_count_(0) {}
    void lock_read() {
        read_mutex_.lock();
        if (reader_count_ == 0) {
            write_mutex_.lock();
        }
        ++ reader_count_;
        read_mutex_.unlock();
    }
    void unlock_read() {
        read_mutex_.lock();
        -- reader_count_;
        if (reader_count_ == 0) {
            write_mutex_.unlock();
        }
        read_mutex_.unlock();
    }
    void lock_write() {
        write_mutex_.lock();
    }
    void unlock_write() {
        write_mutex_.unlock();
    }
private:
    std::mutex read_mutex_;
    std::mutex write_mutex_;
    int64_t reader_count_;
};

ReadWriteLock rw_lock;

void read_fn(int idx, int start, int end) {
    std::this_thread::sleep_for(std::chrono::seconds(start));
    rw_lock.lock_read();
    std::cout << "read thread #" << idx << ": read data" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds(end - start));
    std::cout << "read thread #" << idx << ": read over" << std::endl;
    rw_lock.unlock_read();
}

void write_fn(int idx, int start, int end) {
    std::this_thread::sleep_for(std::chrono::seconds(start));
    rw_lock.lock_write();
    std::cout << "write thread #" << idx << ": write data" << std::endl;
    std::this_thread::sleep_for (std::chrono::seconds(end - start));
    std::cout << "write thread #" << idx << ": write over" << std::endl;
    rw_lock.unlock_write
版权声明:本文为idiotgroup原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/idiotgroup/p/15032036.html