C++11 多线程相关

Linux操作系统:进程与线程中介绍了Linux系pthread创建和管理线程基本方法,在C++11中,又引入了五个头文件支持多线程编程,它们分别是<thread><future><atomic><mutex><condition_variable>,带来不少实用的新特性,值得学习。其函数基本使用标准命名空间std,不赘述。

thread

thread用于创建一个线程,其定义了相关构造函数,具体如下:

1
2
3
4
5
6
7
8
thread() noexcept; //无参构造函数,一个空线程

template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args); //有参构造,线程+参数

thread(const thread&) = delete; //禁用拷贝构造

thread(thread&& x) noexcept; //移动构造函数

thread的普通构造

thread支持广泛函数类型的线程创建,包括普通函数、匿名函数、成员函数、仿函数等,

thread管理普通函数

因为时间片、CPU调度影响,输出应该是完全乱序的,之所以得到比较有序的输出因为此例是比较简单的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>
using namespace std;

void func(){
cout<<"Hello_World!"<<endl;
}

void func1(int age){
cout<<"I am "<<age<<" years old"<<endl;
}

int main(){
thread t(func); //子线程1
thread t1(func1,18); //子线程2,带参数
cout<<"OH My God"<<endl; //主线程

t.join();
t1.join();
return 0;
}

thread管理匿名函数

匿名函数:注意,这里thread_vp[i]= thread(...)不会触发拷贝构造,而遍历时必须使用引用auto &i,否则会触发拷贝构造无法编译。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <vector>
using namespace std;

int main(){
int num = 18;
thread t([&num](){ //单个匿名函数
cout<<num<<endl;
});

vector<thread> thread_vp(4); //多个匿名函数
for(int i=0; i<4; i++){
thread_vp[i] = thread([i](string name){ //匿名函数捕获变量、参数
cout<<"I am "<<name<<"\t";
cout<<"I am "<<i<<" years old"<<endl;
},"Eden"); //thread传递参数
}

t.join();

for(auto &i:thread_vp){ //注意必须使用引用
i.join();
}

return 0;
}

thread管理成员函数、仿函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <thread>
#include <vector>
using namespace std;

class Person{
private:
string name;
int age;

public:
void operator()(string name,int age){ //仿函数
cout << "name: "<<name<<" age: "<<age<<endl;
}

void get_name(string name){ //非静态成员
this->name = name;
cout<<"call get_name"<<endl;
}

static void get_age(int age){ //静态成员
cout<<"call get_age"<<endl;
}
};

int main(){
Person person;
thread t1(person,"Eden",18); //仿函数
thread t2(Person::get_name,&person,"Eden"); //非静态成员额外参数指定类对象
thread t3(Person::get_age,18); //静态成员

t1.join();
t2.join();
t3.join();

return 0;
}

thread的移动构造

通过移动构造转移thread的控制权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
using namespace std;

void func(){
cout<<"Hello_World!"<<endl;
}

int main(){
thread t(func); //子线程1
thread t1(move(t)); //移动语义

cout<<(t.joinable()==false)<<(t1.joinable()==true)<<endl; //t已经join掉了,返回false

t1.join();
return 0;
}

detach()、join()、joinable()

特性和Linux下接口一致:当thread被join或者detach,thread对应的资源会在线程结束时立刻被回收,因此调用joinable均会返回false。

future

future&promise

在linux多线程中,通常使用pthread_exitpthread_join方式接受线程返回值,为了在线程之中更灵活地进行值传递,C++11引入了promisefuture,能够进行值的填充和获取。

如下创建主线程、计算线程、打印线程,计算线程填充计算结果,主线程绑定promisefuture对象,打印进程接受future对象,打印结果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <future>
using namespace std;

void Calculate(int a,int b,promise<xint>&& value){
value.set_value(a+b); //生产者线程:set_value()
}

void Print(future<int>&& fvalue){
int result = fvalue.get(); //消费者线程:get()
cout<<"value="<<result<<endl;
}

int main(){
promise<int>value;
future<int>fvalue = value.get_future(); //绑定唯一对象

thread t(Calculate,1,2,move(value)); //子线程1
thread t1(Print,move(fvalue));

t.join();
t1.join();
return 0;
}

futurepromise的使用要注意以下细节:

    1. futurepromise是天然同步的,promise通过set_value填充值,future才会get并且返回,否则future会阻塞等待。
    1. futurepromise都不支持复制,确保每个promise只有唯一的future对应,因此使用时应该使用move
    1. 对象的set_valueget只使用一次,调用后就不应该再使用了。

future&async

使用promise时,需要设计值填充的策略和时机,具有比较大的灵活性,而有时候我们只关心执行异步线程,并且获取返回结果,这时候可以使用异步程度更高的async模板

1
std::future<R> std::async(std::launch policy, Function&& f, Args&&... args);
async接受策略policy、函数、参数,返回一个future对象,通过get等函数可以获取返回结果。policy有两种,分别是std::launch::asyncstd::launch::deferred前者是一种完全异步对象,调用后会立刻在新线程执行函数任务;后者则是一种延迟执行策略,调用后在使用get或者wait获取对象时,会在本线程下运行该函数并且获取结果

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream> 
#include <thread>
#include <future>
using namespace std;

int Calculate(int a,int b){
return a+b;
}

int main(){
future<int> fvalue = async(launch::async,Calculate,1,1); //新线程异步执行
cout<<"result:"<<fvalue.get()<<endl; //执行并且接收结果

future<void>fvalue1 = async(launch::deferred,[](string slogan){ //延迟执行
cout<<slogan<<endl;
},"We Are The Champion~马飞");
fvalue1.wait(); //只执行,不获取返回结果

return 0;
}

atomic

实现共享资源的同步、安全访问有多种方式,例如互斥锁mutex、PV操作、条件变量等,但是其开销一般很大,而且需要谨慎地设计资源数、粒度、时机等,避免死锁局面,往往适用于对代码段的保护,C++11 引入了原子类型的概念,允许对变量实现原子操作,从一些锁的底层原理中知道,例如自旋锁的底层实际上也是原子的硬件指令,这表明了一些场合可以通过原子类型实现简洁高效的互斥机制。

atomic_flag

atomic_flag是最简单的原子类型,为原子布尔类型。其特点

  1. 天然原子的,无需互斥量保证其读写,这种特性称为免锁/无锁(lock-free)

  2. 禁用拷贝构造、移动构造、赋值函数,只有默认构造函数,但没有定义初始化为set还是clear,默认初始化使用宏ATOMIC_FLAG_INIT

  3. 操作接口只有test_and_setclear,它们也是免锁的,只有一个线程能够成功不支持值的读取或者写入。

调用test_and_set:无论atomic_flag是true还是false,都会尝试将其置为true,但返回时返回的是置位前的标志。换句话说,如果返回原来是true,说明已经被其他线程占用,本线程无法抢占;而如果返回原来是false,则执行本线程逻辑,如下轮询+原子标志位实现自旋锁逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <atomic>

using namespace std;

atomic_flag aflag = ATOMIC_FLAG_INIT;

void func(string name){
while(aflag.test_and_set()); //返回true说明其他线程占用,自旋等待

cout<<this_thread::get_id<<" :I am "<<name<<endl; //返回false继续执行逻辑
aflag.clear(); //清除true让其他线程获得
}

int main(){
thread t1(func,"Eden");
thread t2(func,"Mike");

t1.join();
t2.join();

return 0;
}

拓展

更高阶的原子操作可能需要考虑内存同步问题,test_and_set()clear()均支持三个参数:

1
2
3
std::memory_order_relaxed:松散内存序,提供了最弱的保证,允许最大程度的优化,但不提供同步;
std::memory_order_release:与std::memory_order_acquire配对使用,用于同步的写操作,保证释放操作之前的写入对其他线程可见;
std::memory_order_seq_cst(默认):顺序一致性,提供了最强的保证,确保所有线程观察到的内存操作顺序一致,但可能会限制优化;

atomic<T>

atomic接受的类条件

C++ 11定义了几种类类型,用于描述其在内存中的不同布局特性,分别是平凡类型(Trivial Type)、平凡可复制类型(TrivialCopyable)、标准布局类型(Standard-layout Type)

  • 平凡类型构造函数、析构函数、拷贝构造、赋值运算符等都没有自定义实现无虚函数、虚基类、指针类型成员等堆空间分配策略,完全按照默认编译器行为;

  • 平凡可复制类型无虚函数、虚基类可自定义部分函数,需要保证拷贝对象时内存按bit拷贝(memmove、memcpy等底层都是按bit)(一说C++17起可允许按byte拷贝),可以通过函数is_trivially_copyable判断;

  • 标准布局类型:不同编译平台布局是一致的,对跨平台友好,表现在无虚函数、虚基类动态分配对象,非静态成员都是public数据成员访问权限一致等。

并不是所有的类模板T都能够作为atomic,例如string就不可以,必须满足平凡可复制条件,需要遵循五个条件检验:

  • std::is_trivially_copyable<T>::value:平凡可复制

  • std::is_copy_constructible<T>::value:拷贝构造可访问;

  • std::is_move_constructible<T>::value:移动构造可访问;

  • std::is_copy_assignable<T>::value:拷贝赋值可访问;

  • std::is_move_assignable<T>::value:移动赋值可访问;

atomic接口函数

原子操作要么通过原子硬件指令,要么通过加锁,前者性能更高、开销更低,与atomic_flag不同,atomic<T>的原子性不一定是免锁的,可能通过互斥量的方法,因为编译器、硬件支持等原因,因此C++ 11提供了一种运行时判断的方法is_lock_free()判断原子类型是否免锁,C++ 17引入了is_always_lock_free用于编译时判断原子类型是否免锁。

atomic常用操作接口如下,记录了如何对值进行原子存取操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
atomic() noexcept = default; //默认构造,atomic<T>();

//constexpr代表可用常量初始化、在编译时执行
constexpr atomic (T val) noexcept; //有参构造,atomic<int>(5)

//赋值操作符
T operator= (T val) noexcept; //atomic<int>a = 5;
T operator= (T val) volatile noexcept;

//重载括号,相当于调用了load获取值
operator T() const volatile noexcept; //atomic<int>a = 5;cout<<a;
operator T() const noexcept;

//被禁用的
atomic (const atomic&) = delete; //禁用拷贝构造
atomic& operator= (const atomic&) = delete; //禁用赋值构造

//函数接口,默认内存序是一致序
void store(T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
void store(T val, memory_order sync = memory_order_seq_cst) noexcept; //a.store(5),存入5

T load (memory_order sync = memory_order_seq_cst) const volatile noexcept;
T load (memory_order sync = memory_order_seq_cst) const noexcept; //int ret = a.load(),返回值

T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept; //int ret = a.exchange(6),返回旧值5,存入6;

此外,还有两种接口实现compare_exchange_strongcompare_exchange_weak判断、修改,见例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <atomic>
using namespace std;

int main(){
atomic<int>a(5);
int expect = 5;
//判断a和expect是否相等(本质是memcmp,空间位位相等),如果相等a改为7
bool is_change = a.compare_exchange_strong(expect,7); //expect不能传入常量
cout << is_change<<'\t'<<a.load()<<endl;

//a是否等于expect,相等时a改为8;
while(!a.compare_exchange_weak(expect,8));
cout <<a.load()<<endl;

return 0;
}
注意几个细节

  1. compare_exchange_strongcompare_exchange_weak的区别是:weak判断相等后,也可能返回false,称伪返回,因此一般放在循环中,确保相等时发生赋值;strong则没有这个问题,返回false时一定是因为值不相等,一般而言weak失败概率是可接受的,因此使用weak效率更高;

  2. atomic<int>可以对应替代成其他类类型,第一个参数不能常量直接写入。

mutex、condition_variable

基本特性还是同pthread系的一致,wait仍然是wait,signal变成了notify、lock和unlock仍然不变,此外还多了一些更高阶对象,权当了解。看一个老例子,交换线程和打印线程同步执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

using namespace std;

vector<int>shareNum={1,2,3,4,5,6,7,8};

void Print(){
do{
for(int i=0; i<shareNum.size();i++){
cout<<shareNum[i]<<"\t";
}
cout<<endl;
}while(1);
}

int main(){
thread t(Print);

//主线程执行交换
do{
for(int i=0; i<4; i++){
swap(shareNum[i],shareNum[7-i]);
}
}while(1);

t.join();
return 0;
}

为了实现交换一次、打印一次的逻辑,加上互斥锁和条件变量,互斥锁由于保证交换、打印都是原子的操作,条件变量保证了线程能够切换执行,此时仍然不能保证二者均执行一次,因为主线程获取锁的优先级没有限制,能够反复交换多次再在某次执行打印,这也是老问题了,C++11没有信号量机制,因此这里可以通过标志位解决,再复杂一点只能引入另一个条件变量了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>

using namespace std;

vector<int>shareNum={1,2,3,4,5,6,7,8};
mutex mlock;
condition_variable cond;
atomic<bool> aflag = 0; //确保子线程先拿到锁

void Print(){
do{
unique_lock<mutex>umlock(mlock); //unique_lock构造即加锁
aflag.store(1); //允许父进程持锁
cond.wait(umlock); //交出锁等待notify
for(int i=0; i<shareNum.size();i++){ //打印
cout<<shareNum[i]<<"\t";
}
cout<<endl; //作用域结束自动解锁
}while(1);
}

int main(){
thread t(Print);

//主线程执行交换
do{
if(aflag==1){
aflag.store(0);
unique_lock<mutex>umlock(mlock); //
for(int i=0; i<4; i++){ //交换
swap(shareNum[i],shareNum[7-i]);
} //作用域结束自动解锁
cond.notify_one(); //唤醒一个线程
}
}while(1);

t.join();
return 0;
}
可见程序能够交替输出1——8和8——1;

因为这里的wait函数只接受unique_lock类型,因此互斥同步只能结合这样写了,而实际上C++ 11的mutex有另外几种类型,也存在传统的lock和unlock写法:

锁类型:

  • mutex:普通互斥锁,可lock()unlock()try_lock()try_locklock的区别是try_lock获取锁失败会返回false,而lock一直阻塞等待直到拿到锁;

  • recursive_mutex递归锁同一线程内可以多次上锁,常常lock_guard一起使用,自动管理上锁和析构,当嵌套加锁、解锁次数相当时锁才会被真正释放,否则其他线程仍然无法获得锁。

    1
    2
    3
    4
    5
    6
    7
    void recur_lock(int depth){  //嵌套depth深度的锁
    recursive_mutex rmtx;
    lock_guard<recursive_mutex> rlock(rmtx); //上锁
    if(depth>0){
    recur_lock(depth-1);
    }
    }

  • time_mutex定时互斥锁,只会在某段时间、某个时间点前尝试获取互斥锁避免无限期的互斥等待或者直接返回,配合成员函数try_lock_fortry_lock_util使用:

    1
    2
    3
    4
    5
    6
    7
    time_mutex tmtx;
    if(tmtx.try_lock_for(std::chrono::seconds(1))){ //1s内获得锁返回true,否则false
    ...
    }
    else{
    ...
    }

  • recursive_timed_mutex:既可递归、也可定时的锁。

除了lock、unlock、try_lock这种传统手动上锁管理,以上提到了两种RAII资源锁管理unique_locklock_guard,都能接受四种锁类型,它们区别是:

  • unique_lock<lock_T> : 构造自动上锁、作用域结束自动解锁和析构,且支持手动加锁和解锁(构造了手写lock、unlock),不可拷贝但支持移动语义,配合条件变量、wait等使用,灵活性比较高;

  • lock_guard<lock_T>:构造自动上锁、作用域结束自动解锁和析构,不支持手动管理不可拷贝也不支持移动拷贝;简单实现RAII;

参考链接:

  1. C++中的atomic:原子

  2. C++ 多线程:原子类型(std::atomic)

  3. C++之std::is_trivially_copyable(平凡可复制类型检测)

  4. C++11 并发指南三(std::mutex 详解)

  5. C++11 并发指南五(std::condition_variable 详解)