Linux:同步与锁

本文最后更新于 2024年5月13日 晚上

一、互斥量

  • 在Linux中要实现互斥锁mutex最简单的办法是通过操作系统提供的信号量来实现,但是目前在C++中调用pthread_mutex_lock并不是直接通过操作系统的信号量原语来实现的,而是使用了优化过的futex来实现。

1.1 futex基本概念

  • Futex 是Fast Userspace muTexes的缩写,在传统的Unix系统中当进程间要同步的时候必须要通过系统调用(如semop())在内核中完成。而大部分时候其实并没有另外的程序在竞争这个信号量,但是在这种情况下,这个进程也要陷入内核去看看有没有进程和它竞争,退出的时侯还要陷入内核去看看有没有进程等待在同一同步变量上。这些不必要的系统调用(或者说内核陷入)造成了大量的性能开销。
  • Futex是一种用户态和内核态混合的同步机制。首先,同步的进程间通过mmap共享一段内存,futex变量就位于这段共享的内存中且操作是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,如果没有竞争发生,则只修改futex,而不用再执行系统调用了。
  • 当通过访问futex变量告诉进程有竞争发生,则还是得执行系统调用去完成相应的处理(wait 或者 wake up)。简单的说,futex就是通过在用户态的检查,(motivation)如果了解到没有竞争就不用陷入内核了,大大提高了low-contention时候的效率。

1.2 futex实现原理

  • futex包含了用户态和内核态的两部分行为,用户态由glibc-nptl库的pthread_mutex实现,内核态由futex系统调用支持。

1.2.1 用户态

  • C++中的std::mutex只是对glibc-nptl库的pthread_mutex_t的一层封装,pthread_mutex_t其实是个union结构,它的主干部分是其中的__pthread_mutex_s,大概成员为:
1
2
3
4
5
6
7
8
9
10
11
typedef union {
struct __pthread_mutex_s {
int __lock; // !< 表示是否被加锁,是否存在竞争
unsigned int __count; //!< __kind代表可重入锁时,重复上锁会对__count进行递增。
int __owner; //!< 持有线程的线程ID。
unsigned int __nusers;
int __kind; //!< 上锁类型。
int __spins;
__pthread_list_t __list;
} __data;
} pthread_mutex_t;
  • __lock是CAS操作对象,其值有三种状态:

    • 0, 表示还没有进程/线程获得这把锁

    • 1, 表示有进程/线程已经获得了这个锁

    • 2, 当lock值已经为1,且又有进程/线程要获取这个锁时,将lock置为2,表示发生了竞争

  • __kind的是一个32位的整数,库的设计者将这个32字节分成几个部分来提通不同的分类方式

    • 其中的第1到第7位,代表一个锁的类别。
      • PTHREAD_MUTEX_TIMED_NP是mutex的默认类型,它是非递归的,这最常用的锁类别。
      • PTHREAD_MUTEX_RECURSIVE_NP则表示可重入锁。
    • 它的第8位表示该锁是用在进程间同步还是线程间同步,通常情况下我们在线程同步中使用mutex,此时只需要将mutex声明在全局数据段即可;但如果是进程间的同步,则需要先开辟一个共享内存,将mutex放入共享内存中,然后不同进程才能操作同一个mutex。
  • __count是给可重入锁使用的,如果同一个线程重复对同一个mutex加锁,则对该属性进行递增即可。

pthread_mutex_lock

  • 加锁lock()的逻辑很简单,首先在用户态利用CAS判断是否发生冲突,如果CAS返回0,那么直接返 回;如果CAS返回1或者2,进入futex系统调用,交由操作系统仲裁

pthread_mutex_unlock

  • 解锁unlock()的逻辑类似,同样需要通过CAS,如果lock的旧值是1,那么只有当前线程占用该锁,直接返回;如果lock的旧值是2,说明超过一个线程发生了竞争,调用futex系统调用唤醒其它线程。

1.2.2 内核态

  • 当一个线程需要等待一个锁而陷入睡眠时,内核会为其分配一个futex_q结构,一切的唤醒、睡眠操作都需要先找到这个futex_q结构。
  • 但如果争抢情况比较激烈,那么内核存在的futex_q结构会有很多,如何快速查找对应的futex_q?内核的解决办法是使用哈希表,因为哈希表的查询、插入操作都是O(1)的时间复杂度。如上图所示,内核使用开链法处理哈希碰撞。
futex.png

hash表的key问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
union futex_key {
struct {
u64 i_seq;
unsigned long pgoff;
unsigned int offset;
} shared; //不同进程间通过文件共享futex变量,表明该变量在文件中的位置

struct {
struct mm_struct *mm;
unsigned long address;
unsigned int offset;
} private; //同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置

struct {
u64 ptr;
unsigned long word;
unsigned int offset;
} both;
};
  • futex_key.private对于非共享的使用虚拟地址作为key

    • 如果被用来进行线程间同步,那么futex的虚拟地址在不同的线程间是相同的,因此只需要使用虚拟地址就可以唯一地标识同一线程组(进程)中的某个futex。

    • 但是内核中的hash表是系统中的所有线程共用的,不只对一个线程组服务,因此需要额外的值来标识不同线程组,linux内核选择使用 mm_struct的虚拟地址(若两个线程属于同一进程,则它们的mm_struct的地址相同,若两个线程分属不同进程,则它们的mm_struct的地址不同)。

    • 最终linux使用(mm_struct的虚拟地址, 锁的虚拟地址)来表示一个private futex的key。

  • futex_key.shared对于进程间的使用物理地址作为key

    • 如果futex被用来进行进程间同行,那么futex的虚拟地址在不同的进程间不一定相同,因此linux内核需要计算它的物理地址,才可能唯一地表示futex。

    • linux使用( inode->i_sequence, page->index, offset_within_page)标识一个shared futex地key。其中inode是用来作为共享内存载体的文件inode, page则是futex虚拟地址所对应的struct page结构(page结构在内核中唯一地对应了一个物理页),offest就是futex结构在实际物理页中偏移量。

futex_wait

  1. 调用set_current_state将进程状态设置为TASK_INTERRUPTIBLE

  2. 调用queue_me将futex_q挂载到hash链表上

  3. 调用freezable_schedule进行冲调度,此后除非有其他线程唤醒本线程,本线程不会被调度器选择 执行。这就是陷入了阻塞状态的含义。(当然这里还有一些timeout机制和提前返回不陷入阻塞的额外处理)

futex_wake

  • 找到对应的futex_q,将其记录的task_struct的进程状态设置为TASK_RUNNING。

二、条件变量

参考文献:
浅析条件变量

  • 条件变量是与互斥量相关联的一种用于多线程之间关于共享数据状态改变的通信机制,当一个动作需要另外一个动作完成时才能进行,即:当一个线程的行为依赖于另外一个线程对共享数据状态的改变时,这时候就可以使用条件变量。其实就是同步的概念。
  • 较为经典的例子是生产者消费者问题,每个线程都必须关注“物品”的数量信息,如果现在还有物品消费者才能继续运行,负责应该等待;如果承载物品的结构还有空余空间,生产者才能继续添加新的物品,否则等待。
  • 这里的等待其实就是通过阻塞的方式来实现,只是我们需要在特定的时候唤醒等待的线程。当然这个生产者消费者例子的核心条件是物品数量,我们可以通过C++20引入的信号量std::counting_semaphore来实现,但是如果我们是要针对一些条件判断来进行同步操作,则使用std::condition_variable

2.1 condition_variable

  • std::condition_variable是需要结合一个std::mutex使用的,并且传入的参数应该是std::unique_lock<std::mutex>,这是类似std::lock_guard是管理锁的辅助类工具,都是RAII风格;它们是在定义时获得锁,在析构时释放锁。它们的主要区别在于unique_lock锁机制更加灵活,可以再需要的时候进行lock或者unlock调用,不非得是析构或者构造时。

等待

  • wait:阻塞当前线程,直到条件变量被唤醒
  • wait_for:阻塞当前线程,直到条件变量被唤醒,或到指定时限之后
  • wait_until:阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点

唤醒

  • notify_one:通知一个等待线程
  • notify_all:通知所有的等待线程

使用说明

  • wait操作必须传入一个锁,条件变量在阻塞时会主动放弃锁的持有权,因此我们是需要传入std::unique_lock<std::mutex>这种更加灵活的包装。
1
2
3
4
5
void wait(std::unique_lock<std::mutex>& lock);

//Predicate 谓词函数,可以普通函数或者lambda表达式
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
  • 普通调用方式传入这个锁就可以了,但是C++还提供了一种可以再传入一个lambda表达式的接口,这个和条件变量存在的虚假唤醒有关系,需要我们通过while循环检测条件是否满足。
  • 而第二个接口其实就是封装了这个循环检测过程,通过lambda表达式的方式传入这个条件判断语句。
  • notify操作则不需要任何如何传入参数,只是唤醒别的在这个条件变量上阻塞的线程。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bool work_;
std::mutex cvMutex;
std::condition_variable cv;

//缓存区
std::deque<int> data_deque;
//缓存区最大数目
size_t max_num;
//数据
int next_index;

void producer_thread() {
while (work_) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));

//加锁
std::unique_lock<std::mutex> lk(cvMutex);
//当队列未满时,继续添加数据
cv.wait(lk, [this]() { return this->data_deque.size() <= this->max_num; });

next_index++;
data_deque.push_back(next_index);
//唤醒其他线程
cv.notify_all();
//自动释放锁
}
}

void consumer_thread() {
while (work_) {
//加锁
std::unique_lock<std::mutex> lk(cvMutex);
//检测条件是否达成
cv.wait(lk, [this] { return !this->data_deque.empty(); });

//互斥操作,消息数据
int data = data_deque.front();
data_deque.pop_front();
//唤醒其他线程
cv.notify_all();
//自动释放锁
}
}
  • 这里是一个生产者消费者的例子,公共区域是一个双向队列,那么我们访问公共区域必然是需要加锁来保证互斥访问的,也就用这里声明的cvMutex来保证互斥访问了,无论是生产者还是消费者在每次循环中都是先通过std::unique_lock<std::mutex>加锁来保证互斥访问。这里引出了CondVar需要绑定Mutex的第一个原因,我们在操作公共资源或者是在判断条件的时候都是需要互斥访问的,而Mutex完成了这个工作。
  • 但如果是对atomic这样的共享变量,是否可以不需要Mutex呢?依然不能,这是CondVar设计的核心要点,后面介绍
  • 注意wait操作在阻塞时会释放掉锁,并且重新被唤醒并退出wait函数时是会重新持有锁的

2.2 CondVar为什么需要绑定Mutex

  • 假设我们现在只有阻塞自己和唤醒别人的两个接口,不绑定Mutex,那能否实现相同的功能?我们依然是先加锁访问共享区域,如果需要阻塞等待就释放锁并阻塞自己。也就是先释放资源锁,再阻塞自己,且不是一个原子操作。
  • 这里存在一个问题,以消费者生产者为例,消费者条件不满足,于是释放锁并准备阻塞自己,但刚完成解锁的操作,就被生产者获取到了并开始执行,这时,因为消费者还未挂起自己,来不及将自己的标识符保存在某个位置,所以生产者在执行唤醒操作的时候并没有唤醒这个即将阻塞的消费者。这时,切换到消费者后,消费者将永远的等待下去,虽然队列中有产品。而生产者因为队列中有产品可能也一直的等待下去,形成了死锁。
  • 解决方法是必须让解锁、保存线程标识符、挂起这一系列操作成为原子操作。(当然我们不可能先挂起自己再释放锁)
  • glibc怎么做的呢?每一个condvar内部有一个额外的mutex,在pthread_cond_wait的逻辑中,首先是condvar的内部mutex lock,再是外部mutex unlock,再是内部futex wait。这样,就保证了,线程B signal一定在线程A wait之后(毕竟线程B signal也需要获取内部mutex)。

2.3 虚假唤醒

  • 在正常情况下,wait类型函数返回时要么是因为被唤醒,要么是因为超时才返回,但是在实际中发现,因此操作系统的原因,wait类型在不满足条件时,它也会返回,这就导致了虚假唤醒。
  • 既然没有满足条件为何会被唤醒呢?原因有很多,较为经典的一个是我们有一个消费者在运行状态不断消耗生产者生产的“物品”。但是在生产者的循环中每次都会去做notify操作,此外还有可能会做notify_all操作。虽然在唤醒的时候确实还有资源可以使用,但是当被唤醒的消费者真正开始执行的时候却发现资源已经被消耗完了。
  • 于是我们需要在wait结束的时候再判断一下是否符合条件,那么设计成一个while结构就可以了。

三、死锁

  • 死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。
  • 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

3.1 产生条件

  • 产生死锁的四个必要条件:

    • 互斥条件:一个资源每次只能被一个进程使用。

    • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。

    • 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。

    • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

3.2 解决方案

3.2.1 预防死锁

  • 预防是通过破坏产生死锁的必要条件之一,使系统不会产生死锁。
dead_lock.png
  • 在数据库中对B+树访问进行加锁操作时,利用了破坏循环等待条件的方法,通过指定顺序来进行加锁操作,从而保证不产生死锁。

3.2.2 避免死锁

  • 有一个安全状态的概念,我们将资源分配给不同的用户,如果我们提前知道每个程序最多需要申请多少资源,那当某个程序申请资源的时候,我们就可以判定如果响应这个请求之后,我们剩余的资源是否还能在未来满足某个进程的最大需要,然后等他返回资源之后,还能满足别的程序,最终保证满足所有的请求。
  • 如果我们按照某个顺序响应资源请求,可以保证在任意时刻都可以有足够的剩余资源保障请求程序的需要,那么用于不会有产生死锁的可能,这是安全状态。
  • 如果没有按照这种安全顺序响应请求,会有出现死锁的风险,这时是不安全状态。不安全状态不一定会出现死锁。

银行家算法

  • 银行家算法的实质就是要设法保证系统动态分配资源后不进入不安全状态,以避免可能产生的死锁。即没当进程提出资源请求且系统的资源能够满足该请求时,系统将判断满足此次资源请求后系统状态是否安全,如果判断结果为安全,则给该进程分配资源,否则不分配资源,申请资源的进程将阻塞。

3.2.3 检测死锁和解除

  • 保存有关资源的请求和分配信息,提供一种算法通过这些信息来检测系统是否已经进入死锁状态
  • 检测到死锁之后应该通过某种机制让部分进程被终止,从而打破死锁

具体的例子看RocksDB中在事务管理上进行加锁操作时,采用PessimisticTransaction方式时如何对锁进行跟踪和死锁检测。本质上来说是检测有向图中是否存在环,采用BFS算法来实现。

四、可重入锁

  • std::recursive_mutex:允许在同一个线程中同一个互斥量多次被 lock() ,(但是递归加锁的次数是有限制的,太多可能会报异常),效率要比mutex低。
  • 实现方式其实就是记录持有锁的线程ID,也是调用pthread_mutex_t,只不过相对于std::mutex调用pthread_mutex_t的传入参数不一样。

分析的c++标准库的std::shared_mutex的实现

  • c++在c++14以上的版本引入了shared_mutex,实现了读写锁的语义。实际上,它的读写锁有两种实现:第一种就是封装操作系统的,其实就是封装了glibc的读写锁实现(只不过这种实现是依托于操作系统本身是posix还是windows的,所以多加了一层shared_mutex来实现统一);第二种是c++自己的实现。

  • 它们的区别就是是否定义了_GLIBCXX_USE_PTHREAD_RWLOCK_T这个宏。如果定义了,就直接用glibc中的pthread_rwlock_t;如果没定义,就用libstdc++中实现的__shared_mutex_cv

五、读写锁

5.1 优先倾向

读者优先

  • 如果当前读者持有锁,那么读者想进就优先进,不用管写者是不是有想进的;如果当前是写者持有锁,那么写者释放锁之后优先唤醒读者。

写者优先

  • 如果当前是写持有锁,那么写者直接排在队列最前面;如果当前是读者持有锁,那么写者设置一个意向。读者上锁时,如果有这个意向存在,那么也不能进入。

公平

  • 如果当前读者持有锁,那么读者和作者在同一个队列上排布。当出现可以进入时,操作系统来决定让读者或者作者进入。

  • 如果单纯判断当前的read_count个数,比如read_count > 1,那么一定进入,此时就是读者优先;如果此时还要判断作者的意向,比如write_entered = true时不进入,此时就不是读者优先,但也不一定是写者优先

  • 如果作者解锁的时候优先唤醒读者,那显然是读者优先;如果写者解锁的时候优先唤醒写者,那显然是写者优先;如果写者解锁的时候随机唤醒,那么就是公平策略。

5.2 libstdc++的实现

5.2.1 类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class shared_mutex
{
mutex _M_mut; // 保护对_M_state、gate1_、gate2_的访问
cond_var gate1_; // 用于在writer存在时(或reader达到最大值时,只不过这种情况几乎不会发生) 阻塞reader/writer
cond_var gate2_; // 用于在reader非0时 阻塞writer
unsigned _M_state; // 最高bit表示writer已经获取锁或正在排队等待获取锁(即write-entered被设置) 其余bit表示reader锁定的个数

static constexpr unsigned _S_write_entered
= 1U << (sizeof(unsigned)*__CHAR_BIT__ - 1); // 二进制表现为1后面31个0 相当于一个筛选器,用于按位与
static constexpr unsigned _S_max_readers = ~_S_write_entered; // 二进制表现为0后面31个1 用于按位与

public:

shared_mutex() : state_(0) {}

// Exclusive ownership

void lock();
bool try_lock();
bool timed_lock(nanoseconds rel_time);
void unlock();

// Shared ownership

void lock_shared();
bool try_lock_shared();
bool timed_lock_shared(nanoseconds rel_time);
void unlock_shared();
};
  • 成员变量的核心在两个条件变量和状态标识符上

    • gate1_用于有写线程时阻塞其他的读/写线程,gate2_用于有读线程时阻塞写线程。

    • 状态标识符号表示目前是否有写线程在等待或者占用,此外还表示当前读线程的数量。

5.2.2 读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void shared_mutex::lock_shared()
{
// 1.首先锁住_M_mut,保护对_M_state的独占访问(这里上锁的原因可以参考为什么condvar需要mutex的参与)
unique_lock<mutex> __lk(_M_mut);
// 2. 判断reader是否可以进入
// 这里的_M_state < _S_max_readers其实有两层含义
// 2.1 如果_M_state最高位为1,此时等式不成立,会在这里锁住;这里代表的语义就是writer正在写,reader不要进入
// 2.2 如果_M_state最高位不为1,但最低31位均为1,此时等式不成立,会在这里锁住;这里代表的语义就是reader满了,新的reader不要进入
_M_gate1.wait(__lk, [=]{ return _M_state < _S_max_readers; });
// 3. reader成功进入,增加readers数量
++_M_state;
}

void shared_mutex::unlock_shared()
{
// 1. 首先锁住_M_mut,保护对_M_state的独占修改
lock_guard<mutex> __lk(_M_mut);
__glibcxx_assert( _M_readers() > 0 );
// 2. 修改reader数量,减一
auto __prev = _M_state--;
// 3. 判断当前是否有writer想要进入
if (_M_write_entered())
{
// Wake the queued writer if there are no more readers.
if (_M_readers() == 0)
_M_gate2.notify_one();
// No need to notify gate1 because we give priority to the queued
// writer, and that writer will eventually notify gate1 after it
// clears the write-entered flag.
}
else
{
// Wake any thread that was blocked on reader overflow.
if (__prev == _S_max_readers)
_M_gate1.notify_one();
}
}
  • 加读锁的逻辑是,如果目前没有写锁占用或者写锁等待,就直接加锁。所以我们只需要在gate1_上面进行条件等待

  • 解锁的时候,需要唤醒在等待的写锁请求,因此需要去唤醒gate2_;这里还有一个else的情况是针对加读锁超过最大数量的情况下唤醒gate2_,一般不可能存在这种情况

5.2.3 写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void shared_mutex::lock()
{
// 1. 首先锁住_M_mut,保护对_M_state的独占访问(这里上锁的原因可以参考为什么condvar需要mutex的参与)
unique_lock<mutex> __lk(_M_mut);
// 2. 如果有其他writer正在占用,则阻塞在_M_gate1上
_M_gate1.wait(__lk, [=]{ return !_M_write_entered(); });
// 3. 设置_M_state最高位为1,让writer保持"想要访问"的状态
_M_state |= _S_write_entered;
// 4. 如果由readers正在占用,则阻塞在_M_gate2上
_M_gate2.wait(__lk, [=]{ return _M_readers() == 0; });
}

void shared_mutex::unlock()
{
// 1. 首先锁住_M_mut,保护对_M_state的独占访问(这里上锁的原因可以参考为什么condvar需要mutex的参与)
lock_guard<mutex> __lk(_M_mut);
__glibcxx_assert( _M_write_entered() );
// 2. 设置_M_state最高位为0
_M_state = 0;
// 3. 唤醒沉睡在_M_gate1上的线程
_M_gate1.notify_all();
}
  • 写锁请求存在两个阻塞点,首先会被写锁阻塞,这里是被阻塞在gate1_上的;第二种情况是被读锁阻塞的,被阻塞在gate2_上。需要注意,我们在gate1_这一步上因为需要判断_S_write_entered是否为0,所以永远只可能存在1个写请求等待在gate2_上。

  • 并且当持有写锁时,gate2_上不会有写锁请求存在,后面到达的写锁请求只会阻塞到gate1_上。因此解锁阶段,只需要唤醒gate1_即可。

5.2.4 公平保证

  • 只要有写线程进入gate2_开始等待,_S_write_entered就会被设置,此后有读请求到来时因为存在_M_state < _S_max_readers;条件,就会被一直阻塞在gate1_上。达成这种状态的情况是当前读锁占用,然后到达的第一个写锁请求。

  • 但是如果当前被写锁占用,所有后到达的读/写锁请求都被阻塞在gate1_,那么就是公平竞争的。

5.3 glibc的实现

 

六、乐观锁与悲观锁

  • 这两种概念并不是指一种具体的锁实现,而是指一种控制并发访问的理念,是程序设计上的概念。

悲观锁

  • 其实互斥锁、自旋锁、读写锁,都是属于悲观锁。
  • 悲观锁做事比较悲观,它认为多线程同时修改共享资源的概率比较高,于是很容易出现冲突,所以访问共享资源前,先要上锁

乐观锁

  • 那相反的,如果多线程同时修改共享资源的概率比较低,就可以采用乐观锁。
  • 乐观锁做事比较乐观,它假定冲突的概率很低,它的工作方式是:先修改完共享资源,再验证这段时间内有没有发生冲突,如果没有其他线程在修改资源,那么操作完成,如果发现有其他线程已经修改过这个资源,就放弃本次操作
  • 放弃后如何重试,这跟业务场景息息相关,虽然重试的成本很高,但是冲突的概率足够低的话,还是可以接受的。乐观锁全程并没有加锁,所以它也叫无锁编程

Linux:同步与锁
https://lluvialuo.github.io/2024/05/12/Linux-同步与锁/
作者
Lluvia Luo
发布于
2024年5月12日
许可协议