You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

11 KiB

线程同步Atomic原子类型与内存顺序

Mutex用起来简单,但是无法并发读,RwLock可以并发读,但是使用场景较为受限且性能不够,那么有没有一种全能性选手呢? 欢迎我们的Atomic闪亮登场。

从Rust1.34版本后就正式支持原子类型。原子指的是一系列不可被CPU上下文交换的机器指令这些指令组合在一起就形成了原子操作。在多核CPU下当某个CPU核心开始运行原子操作时会先暂停其它CPU内核对内存的操作以保证原子操作不会被其它CPU内核所干扰。

由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。

可以看出原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了CAS循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。

CAS全称是Compare and swap, 它通过一条指令读取指定的内存地址,然后判断其中的值是否等于给定的前置值,如果相等,则将其修改为新的值

使用Atomic作为全局变量

原子类型的一个常用场景,就是作为全局变量来使用:

use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;

const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;

static R: AtomicU64 = AtomicU64::new(0);

fn add_n_times(n: u64) -> JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n {
            R.fetch_add(1, Ordering::Relaxed);
        }
    })
}

fn main() {
    let s = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);

    for _ in 0..N_THREADS {
        threads.push(add_n_times(N_TIMES));
    }

    for thread in threads {
        thread.join().unwrap();
    }

    assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));

    println!("{:?}",Instant::now().sub(s));
}

以上代码启动了数个线程每个线程都在疯狂对全局变量进行加1操作, 最后将它与线程数 * 加1次数进行比较如果发生了因为多个线程同时修改导致了脏数据那么这两个必将不相等。好在它没有让我们失望不仅快速的完成了任务而且保证了100%的并发安全性。

当然以上代码的功能其实也可以通过Mutex来实现,但是后者的强大功能是建立在额外的性能损耗基础上的,因此性能会逊色不少:

Atomic实现673ms
Mutex实现: 1136ms

可以看到Atomic实现会比Mutex41%,实际上在复杂场景下还能更快(甚至达到4倍的性能差距)

还有一点值得注意: Mutex一样,Atomic的值具有内部可变性,你无需将其声明为mut

use std::sync::Mutex;
use std::sync::atomic::{Ordering, AtomicU64};
 
struct Counter {
    count: u64
}
 
fn main() {
    let n = Mutex::new(Counter {
        count: 0
    });
 
    n.lock().unwrap().count += 1;
 
    let n = AtomicU64::new(0);
 
    n.fetch_add(0, Ordering::Relaxed);
}

这里有一个奇怪的枚举成员Ordering::Relaxed, 看上去很像是排序作用,但是我们并没有做排序操作啊?实际上它用于控制原子操作使用的内存顺序

内存顺序

内存顺序是指CPU在访问内存时的顺序该顺序可能受以下因素的影响

  • 代码中的先后顺序
  • 编译器优化导致在编译阶段发生改变(内存重排序reordering)
  • 运行阶段因CPU的缓存机制导致顺序被打乱

编译器优化导致内存顺序的改变

对于第二点,我们举个例子:

static mut X: u64 = 0;
static mut Y: u64 = 1;

fn main() {
    ...     // A

    unsafe {
        ... // B
        X = 1;
        ... // C
        Y = 3;
        ... // D
        X = 2;
        ... // E
    }
}

假如在CD代码片段中,根本没有用到X = 1,那么编译器很可能会将X = 1X = 2进行合并:

 ...     // A
 
unsafe {
    ... // B
    X = 2;
    ... // C
    Y = 3;
    ... // D
    ... // E
}

若代码A中创建了一个新的线程用于读取全局静态变量X,则该线程将无法读取到X = 1的结果,因为在编译阶段就已经被优化掉。

CPU缓存导致的内存顺序的改变

假设之前的X = 1没有被优化掉,并且在代码片段A中有一个新的线程:

initial state: X = 0, Y = 1
 
THREAD Main     THREAD A
X = 1;          if X == 1 {
Y = 3;              Y *= 2;
X = 2;          }

我们来讨论下以上线程状态,Y最终的可能值(可能性依次降低):

  • Y = 3: 线程Main运行完后才运行线程A,或者线程A运行完后再运行线程Main
  • Y = 6: 线程MainY = 3运行完,但X = 2还没被运行, 此时线程A开始运行Y *= 2, 最后才运行Main线程的X = 2
  • Y = 2: 线程Main正在运行Y = 3还没结束,此时线程A正在运行Y *= 2, 因此Y取到了值1然后Main的线程将Y设置为3 紧接着就被线程AY = 2所覆盖
  • Y = 2: 上面的还只是一般的数据竞争,这里虽然产生了相同的结果2,但是背后的原理大相径庭: 线程Main运行完Y = 3但是CPU缓存中的Y = 3还没有被同步到其它CPU缓存中此时线程A中的Y *= 2就开始读取Y,结果读到了值1,最终计算出结果2

甚至更改成:

initial state: X = 0, Y = 1
 
THREAD Main     THREAD A
X = 1;          if X == 2 {
Y = 3;              Y *= 2;
X = 2;          }

还是可能出现Y=2,因为Main线程中的XY被同步到其它CPU缓存中的顺序未必一致。

限定内存顺序的5个规则

在理解了内存顺序可能存在的改变后你就可以明白为什么Rust提供了Ordering::Relaxed用于限定内存顺序了事实上该枚举有5个成员:

  • Relaxed 这是最宽松的规则它对编译器和CPU不做任何限制可以乱序
  • Release 释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面
  • Acquire 获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和Release在不同线程中联合使用
  • AcqRel, AcquireRelease的结合,同时拥有它们俩提供的保证。比如你要对一个 atomic 自增 1同时希望该操作之前和之后的读取或写入操作不会被重新排序
  • SeqCst 顺序一致性 SeqCst就像是AcqRel的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到SeqCst的原子操作,线程中该SeqCst操作前的数据操作绝对不会被重新排在该SeqCst操作之后,且该SeqCst操作后的数据操作也绝对不会被重新排在SeqCst操作前。

这些规则由于是系统提供的,因此其它语言提供的相应规则也大同小异,大家如果不明白可以看看其它语言的相关解释。

内存屏障的例子

下面我们以ReleaseAcquire为例使用它们构筑出一对内存屏障防止编译器和CPU将屏障前(Release)和屏障后(Acquire)中的数据操作重新排在屏障围成的范围之外:

use std::thread::{self, JoinHandle};
use std::sync::atomic::{Ordering, AtomicBool};
 
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);
 
fn reset() {
    unsafe {
        DATA = 0;
    }
    READY.store(false, Ordering::Relaxed);
}
 
fn producer() -> JoinHandle<()> {
    thread::spawn(move || {
        unsafe {
            DATA = 100;                                 // A
        }
        READY.store(true, Ordering::Release);           // B: 内存屏障 ↑
    })
}
 
fn consumer() -> JoinHandle<()> {
    thread::spawn(move || {
        while !READY.load(Ordering::Acquire) {}         // C: 内存屏障 ↓
 
        assert_eq!(100, unsafe { DATA });               // D
    })
}
 
 
fn main() {
    loop {
        reset();
 
        let t_producer = producer();
        let t_consumer = consumer();
 
        t_producer.join().unwrap();
        t_consumer.join().unwrap();
    }
}

原则上,Acquire用于读取,而Release用于写入。但是由于有些原子操作同时拥有读取和写入的功能,此时就需要使用AcqRel来设置内存顺序了。在内存屏障中被写入的数据都可以被其它线程读取到不会有CPU缓存的问题。

内存顺序的选择

  1. 不知道怎么选择时,优先使用SeqCst,虽然会稍微减慢速度,但是慢一点也比出现错误好
  2. 多线程只计数fetch_add而不使用该值触发其他逻辑分支的简单使用场景,可以使用Relaxed
    参考 Which std::sync::atomic::Ordering to use?

多线程中使用Atomic

在多线程环境中要使用Atomic需要配合Arc

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{hint, thread};

fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));

    let spinlock_clone = Arc::clone(&spinlock);
    let thread = thread::spawn(move|| {
        spinlock_clone.store(0, Ordering::SeqCst);
    });

    // 等待其它线程释放锁
    while spinlock.load(Ordering::SeqCst) != 0 {
        hint::spin_loop();
    }

    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

Atomic能替代锁吗

那么原子类型既然这么全能,它可以替代锁吗?答案是不行:

  • 对于复杂的场景下,锁的使用简单粗暴,不容易有坑
  • std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型
  • 在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar

Atomic的应用场景

事实上,Atomic虽然对于用户不太常用,但是对于高性能库的开发者、标准库开发者都非常常用,它是并发原语的基石,除此之外,还有一些场景适用:

  • 无锁(lock free)数据结构
  • 全局变量例如全局自增ID, 在后续章节会介绍
  • 跨线程计数器,例如可以用于统计指标

以上列出的只是Atomic适用的部分场景,具体场景需要大家未来根据自己的需求进行权衡选择。