You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

23 KiB

线程同步锁、Condvar和信号量

在多线程编程中,同步性极其的重要,当你需要同时访问一个资源、控制不同线程的执行次序时,都需要使用到同步性。

在Rust中有多种方式可以实现同步性。在上一节中讲到的消息传递就是同步性的一种实现方式例如我们可以通过消息传递来控制不同线程间的执行次序。还可以使用共享内存来实现同步性例如通过锁和原子操作等并发原语来实现多个线程同时且安全地去访问一个资源。

该如何选择

共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:

  • 共享内存相对消息传递能节省多次内存拷贝的成本
  • 共享内存的实现简洁的多
  • 共享内存的锁竞争更多

消息传递适用的场景很多,我们下面列出了几个主要的使用场景:

  • 需要可靠和简单的(简单不等于简洁)实现时
  • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
  • 需要一个任务处理流水线(管道)时,等等

而使用共享内存(并发原语)的场景往往就比较简单粗暴:需要简洁的实现以及更高的性能时。

总之,消息传递类似一个单所有权的系统:一个值同时只能有一个所有者,如果另一个线程需要该值的所有权,需要将所有权通过消息传递进行转移。而共享内存类似于一个多所有权的系统:多个线程可以同时访问同一个值。

互斥锁Mutex

既然是共享内存,那并发原语自然是重中之重,先来一起看看皇冠上的明珠: 互斥锁Mutex(mutual exclusion的缩写)。

Mutex让多个线程并发的访问同一个值变成了排队访问:同一时间,只允许一个线程A访问该值,其它线程需要等待A访问完成后才能继续。

单线程中使用Mutex

先来看看单线程中Mutex该如何使用:

use std::sync::Mutex;

fn main() {
    // 使用`Mutex`结构体的关联函数创建新的互斥锁实例
    let m = Mutex::new(5);

    {
        // 获取锁然后deref为`m`的引用
        // lock返回的是Result
        let mut num = m.lock().unwrap();
        *num = 6;
        // 锁自动被drop
    }

    println!("m = {:?}", m);
}

在注释中,已经大致描述了代码的功能,不过有一点需要注意:和Box类似,数据被Mutex所拥有,要访问内部的数据,需要使用方法m.lock()m申请一个锁, 该方法会阻塞当前线程,直到获取到锁,因此当多个线程同时访问该数据时,只有一个线程能获取到锁,其它线程只能阻塞着等待,这样就保证了数据能被安全的修改!

m.lock()方法也有可能报错,例如当前正在持有锁的线程panic了。在这种情况下,其它线程不可能再获得锁,因此lock方法会返回一个错误。

这里你可能奇怪,m.lock明明返回一个锁,怎么就变成我们的num数值了?聪明的读者可能会想到智能指针,没错,因为Mutex<T>是一个智能指针,准确的说是m.lock()返回一个智能指针MutexGuard<T>:

  • 它实现了Deref特征,会被自动解引用后获得一个引用类型,该引用指向Mutex内部的数据
  • 它还实现了Drop特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁

正因为智能指针的使用,使得我们无需任何操作就能获取其中的数据。 如果释放锁,你需要做的仅仅是做好锁的作用域管理,例如上述代码的内部花括号使用,建议读者尝试下去掉内部的花括号,然后再次尝试获取第二个锁num1,看看会发生什么,友情提示:不会报错,但是主线程会永远阻塞,因为不幸发生了死锁。

多线程中使用Mutex

单线程中使用锁,说实话纯粹是为了演示功能,毕竟多线程才是锁的舞台。 现在,我们再来看看,如何在多线程下使用Mutex来访问同一个资源.

无法运行的Rc<T>
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    // 通过`Rc`实现`Mutex`的多所有权
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        // 创建子线程,并将`Mutex`的所有权拷贝传入到子线程中
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    // 等待所有子线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 输出最终的计数结果
    println!("Result: {}", *counter.lock().unwrap());
}

由于子线程需要通过move拿走锁的所有权,因此我们需要使用多所有权来保证每个线程都拿到数据的独立所有权,恰好智能指针Rc<T>可以做到(上面代码会报错!具体往下看,别跳过-, -)。

以上代码实现了在多线程中计数的功能由于多个线程都需要去修改该计数器因此我们需要使用锁来保证同一时间只有一个线程可以修改计数器否则会导致脏数据想象一下A线程和B线程同时拿到计数器获取了当前值1, 并且同时对其进行了修改,最后值变成2,你会不会在风中凌乱?毕竟正确的值是3因为两个线程各自加1。

可能有人会说,有那么巧的事情吗?事实上,对于人类来说,因为干啥啥慢,并没有那么多巧合,所以人总会存在巧合心理。但是对于计算机而言,每秒可以轻松运行上亿次,在这种频次下,一切巧合几乎都将必然发生,因此千万不要有任何侥幸心理。

如果事情有变坏的可能,不管这种可能性有多小,它都会发生! - 在计算机领域歪打正着的墨菲定律

事实上,上面的代码会报错:

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely 
                // `Rc`无法在线程中安全的传输
   --> src/main.rs:11:22
    |
11  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads safely
12  | |             let mut num = counter.lock().unwrap();
13  | |
14  | |             *num += 1;
15  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
    = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
     // `Rc`没有实现`Send`特征
    = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`

错误中提到了一个关键点:Rc<T>无法在线程中传输,因为它没有实现Send特征(在下一节将详细介绍),而该特征可以确保数据在线程中安全的传输。

多线程安全的Arc

好在,我们有Arc<T>,得益于它的内部计数器是多线程安全的,因此可以在多线程环境中使用:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

以上代码可以顺利运行:

Result: 10

内部可变性

在之前章节,我们提到过内部可变性,其中Rc<T>RefCell<T>的结合,可以实现单线程的内部可变性。

现在我们又有了新的武器,由于Mutex<T>可以支持修改内部数据,当结合Arc<T>一起使用时,可以实现多线程的内部可变性。

简单总结下:Rc<T>/RefCell<T>用于单线程内部可变性, Arc<T>/Mutext<T>用于多线程内部可变性。

需要小心使用的Mutex

如果有其它语言的编程经验,就知道互斥锁这家伙不好对付,想要正确使用,你得牢记在心:

  • 在使用数据前必须先获取锁
  • 在数据使用完成后,必须及时的释放锁,比如文章开头的例子,使用内部语句块的目的就是为了及时的释放锁

这两点看起来不起眼但要正确的使用其实是相当不简单的对于其它语言忘记释放锁是经常发生的虽然Rust通过智能指针的drop机制帮助我们避免了这一点,但是由于不及时释放锁导致的性能问题也是常见的。

正因为这种困难性导致很多用户都热衷于使用消息传递的方式来实现同步例如Go语言直接把channel内置在语言特性中,甚至还有无锁的语言,例如erlang,完全使用Actor模型依赖消息传递来完成共享和同步。幸好Rust的类型系统、所有权机制、智能指针等可以很好的帮助我们减轻使用锁时的负担。

另一个值的注意的是在使用Mutex<T>Rust无法帮我们避免所有的逻辑错误例如在之前章节我们提到过使用Rc<T>可能会导致循环引用的问题。类似的,Mutex<T>也存在使用上的风险,例如创建死锁(deadlock):当一个操作试图锁住两个资源,然后两个线程各自获取其中一个锁,并试图获取另一个锁时,就会造成死锁。

死锁

在Rust中有多种方式可以创建死锁了解这些方式有助于你提前规避可能的风险一起来看看。

单线程死锁

这种死锁比较容易规避,但是当代码复杂后还是有可能遇到:

use std::sync::Mutex;

fn main() {
    let data = Mutex::new(0);
    let d1 = data.lock();
    let d2 = data.lock();
} // d1锁在此处释放

非常简单,只要你在另一个锁还未被释放时去申请新的锁,就会触发,当代码复杂后,这种情况可能就没有那么显眼。

多线程死锁

当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁:

use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;

use lazy_static::lazy_static;
lazy_static! {
    static ref MUTEX1: Mutex<i64> = Mutex::new(0);
    static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}

fn main() {
    // 存放子线程的句柄
    let mut children = vec![];
    for i_thread in 0..2 {
        children.push(thread::spawn(move || {
            for _ in 0..1 {
                // 线程1
                if i_thread % 2 == 0 {
                    // 锁住mutex1
                    let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();

                    println!("线程 {} 锁住了mutex1接着准备去锁mutex2 !", i_thread);

                    // 当前线程睡眠一小会儿等待线程2锁住mutex2
                    sleep(Duration::from_millis(10));

                    // 去锁mutex2
                    let guard = MUTEX2.lock().unwrap();
                // 线程2
                } else {
                    // 锁住mutex2
                    let _guard = MUTEX2.lock().unwrap();

                    println!("线程 {} 锁住了mutex2, 准备去锁mutex1", i_thread);

                    let _guard = MUTEX1.lock().unwrap();
                }
            }
        }));
    }

    // 等子线程完成
    for child in children {
        let _ = child.join();
    }

    println!("死锁没有发生");
}

在上面的描述中,我们用了"可能"二字,原因在于死锁在这段代码中不是必然发生的,总有一次运行你能看到最后一行打印输出。这是由于子线程的初始化顺序和执行速度并不确定,我们无法确定哪个线程中的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。

但是可以简单的说明下死锁发生的必然条件线程1锁住了mutex1并且线程2锁住了mutex2然后线程1试图去访问mutex2,同时线程2试图去访问mutex1,就会锁住。 因为线程2需要等待线程1释放mutex1后,才会释放mutex2而与此同时线程1需要等待线程2释放mutex2后才能释放mutex1,这种情况造成了两个线程都无法释放对方需要的锁,最终锁死。

那么为何某些时候死锁不会发生原因很简单线程2在线程1锁mutex1之前就已经全部执行完了随之线程2的mutex2mutex1被全部释放线程1对锁的获取将不再有竞争者。 同理线程1若全部被执行完那线程2也不会被锁因此我们在线程1中间加一个睡眠增加死锁发生的概率。如果你在线程2中同样的位置也增加一个睡眠那死锁将必然发生!

try_lock

lock方法不同,try_lock尝试去获取一次锁,如果无法获取会返回一个错误,因此不会发生阻塞:

use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;

use lazy_static::lazy_static;
lazy_static! {
    static ref MUTEX1: Mutex<i64> = Mutex::new(0);
    static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}

fn main() {
    // 存放子线程的句柄
    let mut children = vec![];
    for i_thread in 0..2 {
        children.push(thread::spawn(move || {
            for _ in 0..1 {
                // 线程1
                if i_thread % 2 == 0 {
                    // 锁住mutex1
                    let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();

                    println!("线程 {} 锁住了mutex1接着准备去锁mutex2 !", i_thread);

                    // 当前线程睡眠一小会儿等待线程2锁住mutex2
                    sleep(Duration::from_millis(10));

                    // 去锁mutex2
                    let guard = MUTEX2.try_lock();
                    println!("线程1获取mutex2锁的结果: {:?}",guard);
                // 线程2
                } else {
                    // 锁住mutex2
                    let _guard = MUTEX2.lock().unwrap();

                    println!("线程 {} 锁住了mutex2, 准备去锁mutex1", i_thread);
                    sleep(Duration::from_millis(10));
                    let guard = MUTEX1.try_lock();
                    println!("线程2获取mutex1锁的结果: {:?}",guard);
                }
            }
        }));
    }

    // 等子线程完成
    for child in children {
        let _ = child.join();
    }

    println!("死锁没有发生");
}

为了演示try_lock的作用,我们特定使用了之前必定会死锁的代码,并且将lock替换程try_lock,与之前的结果不同,这段代码将不会再有死锁发生:

线程 0 锁住了mutex1接着准备去锁mutex2 !
线程 1 锁住了mutex2, 准备去锁mutex1
线程2获取mutex1锁的结果: Err("WouldBlock")
线程1获取mutex2锁的结果: Ok(0)
死锁没有发生

如上所示,当try_lock失败时,会报出一个错误:Err("WouldBlock"),接着线程中的剩余代码会继续执行,不会被阻塞。

一个有趣的命名规则在Rust标准库中使用try_xxx都会尝试进行一次操作,如果无法完成,就立即返回,不会发生阻塞。例如消息传递章节中的try_recv以及本章节中的try_lock

读写锁RwLock

Mutex会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex就无法满足需求了,此时就可以使用RwLock:

use std::sync::RwLock;

fn main() {
    let lock = RwLock::new(5);

    // 同一时间允许多个读
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        assert_eq!(*r1, 5);
        assert_eq!(*r2, 5);
    } // 读锁在此处被drop

    // 同一时间只允许一个写
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        assert_eq!(*w, 6);
        
        // 以下代码会panic因为读和写不允许同时存在
        // 写锁w直到该语句块结束才被释放因此下面的读锁依然处于`w`的作用域中
        // let r1 = lock.read();
        // println!("{:?}",r1);
    }// 写锁在此处被drop
}

RwLock在使用上和Mutex区别不大,需要注意的是,当读写同时发生时,程序会直接panic(本例是单线程,实际上多个线程中也是如此),因为会发生死锁:

thread 'main' panicked at 'rwlock read lock would result in deadlock', /rustc/efec545293b9263be9edfb283a7aa66350b3acbf/library/std/src/sys/unix/rwlock.rs:49:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

好在我们可以使用try_writetry_read来尝试进行一次写/读,若失败则返回错误:

Err("WouldBlock")

简单总结下RwLock:

  1. 同时允许多个读,但最多只能有一个写
  2. 读和写不能同时存在
  3. 读可以使用readtry_read,写writetry_write, 在实际项目中,try_xxx会安全的多

Mutex还是RwLock

首先简单性上Mutex完胜,因为使用RwLock你得操心几个问题:

  • 读和写不能同时发生,如果使用try_xxx解决,就必须做大量的错误处理和失败重试机制
  • 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败(writer starvation)
  • RwLock其实是操作系统提供的实现原理要比Mutex复杂的多,因此单就锁的性能而言,比不上原生实现的Mutex

再来简单总结下两者的使用场景:

  • 追求高并发读取时,使用RwLock,因为Mutex一次只允许一个线程去读取
  • 如果要保证写操作的成功性,使用Mutex
  • 不知道哪个合适,统一使用Mutex

需要注意的是,RwLock虽然看上去貌似提供了高并发读取的能力,但这个不能说明它的性能比Mutex高,事实上Mutex性能要好不少,后者唯一的问题也仅仅在于不能并发读取

一个常见的、错误的使用RwLock的场景就是使用HashMap进行简单读写,因为HashMap的读和写都非常快,RwLock的复杂实现和相对低的性能反而会导致整体性能的降低,因此一般来说更适合使用Mutex

总之,如果你要使用RwLock要确保满足以下两个条件:并发读,且需要对读到的资源进行"长时间"的操作HashMap也许满足了并发读的需求,但是往往并不能满足后者:"长时间"的操作。

benchmark永远是你在迷茫时最好的朋友

三方库提供的锁实现

标准库在设计时总会存在取舍,因为往往性能并不是最好的,如果你追求性能,可以使用三方库提供的并发原语:

  • parking_lot, 功能更完善、稳定社区较为活跃star较多更新较为活跃
  • spin, 在多数场景中性能比parking_lot高一点,最近没怎么更新

如果不是追求特别极致的性能,建议选择前者。

用条件(Condvar)控制线程的同步

Mutex用于解决资源安全访问的问题但是我们还需要一个手段来解决资源访问顺序的问题。而Rust考虑到了这一点为我们提供了条件变量(Condition Variables),它经常和Mutex一起使用,可以让线程挂起,直到某个条件发生后再继续执行,其实Condvar我们在之前的多线程章节就已经见到过,现在再来看一个不同的例子:

use std::sync::{Arc,Mutex,Condvar};
use std::thread::{spawn,sleep};
use std::time::Duration;

fn main() {
    let flag = Arc::new(Mutex::new(false));
    let cond = Arc::new(Condvar::new());
    let cflag = flag.clone();
    let ccond = cond.clone();

    let hdl = spawn(move || {
        let mut m = { *cflag.lock().unwrap() };
        let mut counter = 0;

        while counter < 3 {
            while !m {
                m = *ccond.wait(cflag.lock().unwrap()).unwrap();
            }

            {
                m = false;
                *cflag.lock().unwrap() = false;
            }

            counter += 1;
            println!("inner counter: {}", counter);
        }
    });

    let mut counter = 0;
    loop {
        sleep(Duration::from_millis(1000));
        *flag.lock().unwrap() = true;
        counter += 1;
        if counter > 3 {
            break;
        }
        println!("outside counter: {}", counter);
        cond.notify_one();
    }
    hdl.join().unwrap();
    println!("{:?}", flag);
}

例子中通过主线程来触发子线程实现交替打印输出:

outside counter: 1
inner counter: 1
outside counter: 2
inner counter: 2
outside counter: 3
inner counter: 3
Mutex { data: true, poisoned: false, .. }

信号量Semaphore

在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。想象一下,当一个新游戏刚开服时(有些较火的老游戏也会,比如wow),往往会控制游戏内玩家的同时在线数,一旦超过某个临界值,就开始进行排队进服。而在实际使用中,也有很多时候,我们需要通过信号量来控制最大并发数,防止服务器资源被撑爆。

本来Rust在标准库中有提供一个信号量实现, 但是由于各种原因这个库现在已经不再推荐使用了,因此我们推荐使用tokio中提供的Semaphone实现: tokio::sync::Semaphore

use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(tokio::spawn(async move {
            //
            // 在这里执行任务...
            //
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await.unwrap();
    }
}

上面代码创建了一个容量为3的信号量当正在执行的任务超过3时剩下的任务需要等待正在执行任务完成并减少信号量后到3以内时才能继续执行。

这里的关键其实说白了就在于:信号量的申请和归还,使用前需要申请信号量,如果容量满了,就需要等待;使用后需要释放信号量,以便其它等待者可以继续。

总结

在很多时候,消息传递都是非常好用的手段,它可以让我们的数据在任务流水线上不断流转,实现起来非常优雅。

但是它并不能优雅的解决所有问题,因为我们面临的真实世界是非常复杂的,无法用某一种银弹统一解决。当面临消息传递不太适用的场景时,或者需要更好的性能和简洁性时,我们往往需要用锁来解决这些问题,因为锁允许多个线程同时访问同一个资源,简单粗暴。

除了锁之外其实还有一种并发原语可以帮助我们解决并发访问数据的问题那就是原子类型Atomic在下一章节中我们会对其进行深入讲解。