文章

多线程

多线程

[TOC]

1. 基本概念

1.2 进程和线程

进程和线程都是操作系统中的基本概念,它们在多任务操作系统中用于程序的执行和系统资源管理。1

进程是资源分配的单位,线程是执行流的最小单位,它们是现代操作系统中实现并行和并发的基础。

  1. 进程(Process):
    • 进程是操作系统进行资源分配和调度的基本单位。
    • 它是一个程序的运行实例,拥有独立的地址空间、全局变量、打开的文件描述符、子进程、定时器、信号和信号处理程序。2
    • 进程可以拥有多个线程,每个进程至少有一个主线程。
    • 进程间通常隔离,确保了一个进程的崩溃不会直接影响到其他进程。
  2. 线程(Thread):
    • 线程是操作系统进行运算调度的基本单位。
    • 它是进程中的实际运行单位,拥有自己独立的线程ID、程序计数器(PC)、寄存器集合和栈控件(用于存储局部变量和调用栈)2
    • 隶属于统一进程的线程共享其所属进程的地址空间和资源,如进程代码段、公共数据(全局变量等)和文件描述符等。2
    • 线程之间的通信和同步相对容易,因为它们可以直接读写进程的共享数据。

一个进程至少包含一个线程,即主线程,也可以同时包含多个线程。这样的进程称为多线程进程。多线程可以并发执行,提高了资源的利用率和程序的响应速度。

1.2 多线程

多线程(MultiThreading)是指从软件或者硬件上实现多个线程并发执行的技术。在一个程序中,每一个独立运行的程序片段叫作“线程”(Thread),利用它编程的概念就叫作多线程处理。具有多线程能力的计算机,因有硬件支持,而能够在同一时间执行不少于1个线程,进而提升了整体处理性能。3

多线程的优点1

  • 共享资源: 线程间可以直接共享内存和资源,这使得线程间的通信更加高效。
  • 上下文切换效率: 线程间的上下文切换比进程间的切换成本要低,因为它们共享相同的环境。
  • 并发性: 多线程可以利用多核处理器的能力,同时处理多个任务。

多线程的缺点

  • 安全性: 因为线程共享内存和资源,所以需要确保线程安全,防止数据冲突和不一致的问题。
  • 复杂性: 多线程程序设计上比单线程程序要复杂,需要处理同步和互斥等问题。

1.3 协程

协程是一种基于线程之上,但又比线程更加轻量级的存在,这种由程序管理的轻量级线程也被称为用户空间线程,对于内核而言是不可见的。正如同进程中存在多条线程一样,线程中也可以存在多个协程4

协程在运行时也有自己的寄存器、上下文和栈,协程的调度完全由用户控制,协程调度切换时,会将寄存器上下文和栈保存到分配的私有内存区域中,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。

如果把线程比作了工厂工位上的固定工人,那么协程更多的就可以理解为:工厂中固定工位上的不固定工人。一个固定工位上允许有多个不同的工人,当轮到某个工人工作时,就把上一个工人的换下来,把这个要工作的工人换上去。或者当前工人在工作时要上厕所,那么就会先把当前工作的工人撤下去,换另一个工人上来,等这个工人上完厕所回来了,会再恢复它的工作。 协程有些类似于线程的多对一模型。

2. std::thread

使用方法和下面的std::jthread基本一样,参考下面对std::jthread的介绍。

常用函数

(1)构造函数

1
2
3
4
5
6
7
8
// 默认构造函数:创建一个空thread对象(非joinable)
thread() noexcept
// 初始化构造函数:创建一个thread对象,该对象会调用Fn函数,Fn函数的参数由args指定。(joinable)
template <class Fn, class... Args>
// 拷贝构造函数:被禁用,意味着thread对象不可拷贝构造
thread (const thread&) = delete
// move构造函数:移动构造,执行之后x失效,即x的执行信息被移给了新产生的thread对象(非joinable)
thread (thread&& x) noexcept

使用实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void f1(int n){
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 1 executing\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    std::cout << "\t";
}
 
void f2(int& n){
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 2 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}
 
int main()
{
    int n = 0;
    std::thread t1;                   // t1 is not a thread
    std::thread t2(f1, n + 1);        // 值传递
    std::thread t3(f2, std::ref(n));  // 引用传递
    std::thread t4(std::move(t3));    // 移动构造函数
    t2.join();
    t4.join();
    std::cout << "Final value of n is " << n << '\n';
}

(2)join()

调用 join() 会阻塞当前线程,直到被管理的线程执行完毕。

(3)detach()

将线程设置为后台线程。调用 detach() 后,线程将不再受 std::jthread 对象的管理,也就是将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以在后台单独运行。一旦线程执行完毕,它所分配的资源将会被释放。

(4)get_id()

获取线程 ID(ID为线程的唯一标识符)。返回一个类型为std::thread::id的对象。

(5)joinable()

查询线程的状态,即当前线程对象是否表示了一个活动的执行线程。如果线程正在运行且尚未被 join()detach(),则返回 true,否则返回 false

缺省构造的thread对象、已经完成jointhread对象、已经detachthread对象都会返回false

(6)swap()

交换两个 std::jthread 对象的内容。

(7)operator=

将线程与当前 thread 对象关联。

(8)native_handle

该函数返回与std::thread具体实现相关的线程句柄。native_handle_type是连接thread类和操作系统SDK API之间的桥梁,如在Linux g++(libstdc++)里,native_handle_type其实就是pthread里面的pthread_t类型,当thread类的功能不能满足我们的要求的时候(比如改变某个线程的优先级),可以通过thread类实例的native_handle()返回值作为参数来调用相关的pthread函数达到目录。3

(9)sleep_for

线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。

1
2
3
4
5
6
7
int main()
{
	std::cout << "Hello waiter" << std::endl;
	std::chrono::milliseconds dura(2000);
	std::this_thread::sleep_for(dura);
	std::cout << "Waited 2000 ms\n";
}

(10)sleep_until

线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。

1
2
template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );

3. std::jthread

std::jthread 是 C++20 标准库中引入的一种轻量级线程类型,它提供了一种简单的方式来创建和管理线程。std::jthread 在使用上比 std::thread 更方便,它可以自动管理线程的生命周期,并且在销毁时会自动调用线程的 join()detach() 方法。

3.1 基本用法和原理

  1. 创建 std::jthread 对象:可以通过创建 std::jthread 对象来创建一个新的线程。可以直接传递一个可调用对象(函数、函数对象、lambda 表达式等)给 std::jthread 构造函数,并在线程启动时执行该可调用对象。
  2. 线程的自动管理:std::jthread 可以自动管理线程的生命周期。当 std::jthread 对象被销毁时,它会自动调用线程的 join() 方法,等待线程结束并回收资源,或者调用 detach() 方法,使线程成为后台线程而不需要被显式地等待。
  3. 线程的状态查询:通过 std::jthread 对象的成员函数 joinable() 可以查询线程的状态。如果线程正在运行且尚未被 join()detach(),则该函数返回 true,否则返回 false
  4. 线程的等待:可以使用 std::jthread 的成员函数 join() 来等待线程的完成。调用 join() 会阻塞当前线程,直到被管理的线程执行完毕。
  5. 线程的后台化:可以使用 std::jthread 的成员函数 detach() 将线程设置为后台线程。通过调用 detach(),线程将不再受 std::jthread 对象的管理,可以在后台运行而不需要被显式等待。
  6. 安全的线程终止:std::jthread 支持安全的线程终止机制。可以通过调用 request_stop() 方法来请求线程终止,并在线程的入口函数中使用 stop_requested() 方法来检查是否有终止请求。

需要注意的是,std::jthread 在实现上利用了 RAII(资源获取即初始化)原则,它的构造函数会创建一个线程并管理其生命周期,而析构函数会在对象被销毁时等待线程的完成。这种自动管理的特性使得使用 std::jthread 更加方便和安全,不需要手动管理线程的创建和销毁。

3.2 常用函数

std::thread的一样,参考上面即可。

(1)构造函数

1
2
3
// 默认构造函数:创建一个空thread对象,该对象非joinable
thread() noexcept
// 初始化构造函数:创建一个thread对象,该对象会调用Fn函数,Fn函数的参数由args指定,该对象是joinable的

默认构造函数 thread() noexcept 初始化构造函数 template <class Fn, class… Args> 拷贝构造函数 thread (const thread&) = delete move构造函数 thread (thread&& x) noexcept

  • 默认构造函数:创建一个空thread对象,该对象非joinable
  • 初始化构造函数:创建一个thread对象,该对象会调用Fn函数,Fn函数的参数由args指定,该对象是joinable的
  • 拷贝构造函数:被禁用,意味着thread对象不可拷贝构造
  • move构造函数:移动构造,执行成功之后x失效,即x的执行信息被移动到新产生的thread对象,该对象非joinable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void f1(int n){
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 1 executing\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    std::cout << "\t";
}
 
void f2(int& n){
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 2 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}
 
int main()
{
    int n = 0;
    std::thread t1;                   // t1 is not a thread
    std::thread t2(f1, n + 1);        // 值传递
    std::thread t3(f2, std::ref(n));  // 引用传递
    std::thread t4(std::move(t3));    // 移动构造函数
    t2.join();
    t4.join();
    std::cout << "Final value of n is " << n << '\n';
}
  1. 构造函数:std::jthread 类的构造函数可以接受一个可调用对象(函数、函数对象、lambda 表达式等)作为参数,并创建一个新的线程来执行该可调用对象。例如:std::jthread thread(myFunction);
  2. join()join()std::jthread 类的成员函数,用于等待线程的完成。调用 join() 会阻塞当前线程,直到被管理的线程执行完毕。
  3. detach()detach()std::jthread 类的成员函数,用于将线程设置为后台线程。调用 detach() 后,线程将不再受 std::jthread 对象的管理,可以在后台运行而不需要被显式等待。
  4. joinable()joinable()std::jthread 类的成员函数,用于查询线程的状态。如果线程正在运行且尚未被 join()detach(),则返回 true,否则返回 false
  5. get_id()get_id()std::jthread 类的成员函数,用于获取线程的唯一标识符。每个线程都有一个独特的标识符,可以通过这个函数获取。
  6. swap()swap()std::jthread 类的成员函数,用于交换两个 std::jthread 对象的内容。

3.3 例子

(1) 子线程扫描目标路径并在主线程实时更新进度

以下是一个使用 std::jthread 的例子,用于扫描指定路径下的所有文件,并将扫描进度实时同步到一个 DLG 中的进度条上:1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <filesystem>
#include <iostream>
#include <chrono>
#include <atomic>
#include <mutex>
#include <thread>
#include <string>
#include <vector>

// 扫描目标文件夹:
void scanFiles(const std::string& path, std::atomic<int>& progress, std::mutex& mutex) {
    // 用迭代器遍历目录下的所有文件:
    std::filesystem::recursive_directory_iterator iter(path);
    int totalFiles = 0;
    for (auto& file : iter) {
        // 扫描到文件:
        if (std::filesystem::is_regular_file(file)) {
            // 给进度百分比+1
            std::atomic_fetch_add(&progress, 1);
			// 操作输出的时候加一下线程锁(尽管对于本例子是多余的)
            std::lock_guard<std::mutex> lock(mutex);
            std::cout << "Scanning file: " << file.path() << std::endl;
        }
        // 扫描到文件夹:
        else if (std::filesystem::is_directory(file)) {
            /*文件夹跳过*/
        }
        ++totalFiles;
    }

    std::cout << "Total files scanned: " << totalFiles << std::endl;
}

void main() {
    std::string path = "Your/Path/Here";
    std::atomic<int> progress(0);
    std::mutex mutex;
	// 建立一个子线程执行扫描文件夹的事情:
    std::jthread scanThread(scanFiles, path, std::ref(progress), std::ref(mutex));
	
    // 主线程中一直读取线程中修改的progress,更新到进度条中
    while (progress.load() < 100){
        // 输出进度:
        std::cout << "已扫描文件: " << progress.load() << " 个" << std::endl;
        // 主线程每500毫秒检查一下进度条状态:
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
	// 等待子线程执行完毕:
    scanThread.join();
}

(2) 子线程调用成员函数

使用std::jthread来创建一个线程,并且可以用它来调用成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <thread>

class MyClass {
public:
    void memberFunction() { /*此处是成员函数的实现*/ }
};

void main() {
    MyClass myObject;
	// 开一个子线程调用成员函数:
    std::jthread myThread(&MyClass::memberFunction, &myObject);

    // 等待线程执行完成
    myThread.join();
}

4. std::mutex

std::mutex 是 C++ 标准库中的互斥量类,用于实现线程之间的互斥访问。互斥量是一种同步原语,用于保护共享资源,确保在任何给定时刻只有一个线程能够访问被保护的资源,从而避免出现竞态条件(race condition)。1

4.1 基本函数

  1. lock():用于尝试锁定互斥量对象。如果互斥量已经被其他线程锁定,则当前线程会被阻塞,直到互斥量可用。如果互斥量当前没有被其他线程占用,则当前线程将锁定互斥量,并继续执行。
  2. try_lock():用于尝试锁定互斥量对象,但不会阻塞当前线程。如果互斥量已经被其他线程锁定,则 try_lock() 返回 false,并且当前线程可以继续执行其他操作。如果互斥量当前没有被其他线程占用,则 try_lock() 返回 true,并且当前线程锁定互斥量。
  3. unlock():用于解锁互斥量对象,释放其占用状态。只有锁定互斥量的线程可以调用 unlock() 来解锁互斥量。一旦解锁,其他线程可以获得对互斥量的访问权。
  4. std::lock_guard :该方法可以代替 lock()unlock() 方法来控制互斥锁,std::lock_guard 会在构造时自动锁定互斥锁,并在其自身被销毁时自动解锁互斥锁。

4.2 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx; // 创建互斥量对象

void printMessage(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx); // 用 lock_guard 锁定互斥量
    for (int i = 0; i < 5; i++) {
        std::cout << message << std::endl;
    }
    // 离开作用域时,lock_guard 会自动解锁互斥量
}

void main(){
    std::thread t1(printMessage, "Hello");
    std::thread t2(printMessage, "World");
    t1.join();
    t2.join();
}

在这个示例中,我们创建了一个互斥量对象 mtx。在 printMessage 函数中,我们使用 std::lock_guard 对象 lock 来锁定互斥量 mtx,以确保每个线程在执行输出操作时互斥。这样,每个线程都能顺序地输出消息,而不会产生竞态条件。

输出结果可能是:

1
2
3
4
5
6
7
8
9
10
Hello
Hello
Hello
Hello
Hello
World
World
World
World
World

需要注意的是,互斥量的使用应该谨慎,确保在使用互斥量时遵循正确的锁定和解锁操作,以避免死锁和其他线程同步问题。

4.3 其他锁3

(1)std::recursive_mutex

std::recursive_mutex 与 std::mutex 一样,也是一种可以被上锁的对象,但是和 std::mutex 不同的是,std::recursive_mutex 允许同一个线程对互斥量多次上锁(即递归上锁),来获得对互斥量对象的多层所有权,std::recursive_mutex 释放互斥量时需要调用与该锁层次深度相同次数的 unlock(),可理解为 lock() 次数和 unlock() 次数相同,除此之外,std::recursive_mutex 的特性和 std::mutex 大致相同。

(2)std::time_mutex

std::time_mutex 比 std::mutex 多了两个成员函数,try_lock_for(),try_lock_until()。

try_lock_for 函数接受一个时间范围,表示在这一段时间范围之内线程如果没有获得锁则被阻塞住(与 std::mutex 的 try_lock() 不同,try_lock 如果被调用时没有获得锁则直接返回 false),如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。

try_lock_until 函数则接受一个时间点作为参数,在指定时间点未到来之前线程如果没有获得锁则被阻塞住,如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。

(3)std::recursive_timed_mutex

和 std:recursive_mutex 与 std::mutex 的关系一样,std::recursive_timed_mutex 的特性也可以从 std::timed_mutex 推导出来。

(4)std::lock_guard

原理:内部构造时相当于执行了lock,析构时相当于执行unlock。,在其析构函数中进行解锁。最终的结果就是:创建即加锁,作用域结束自动解锁。从而使用lock_guard()就可以替代lock()与unlock()。

(5)std::unique_lock

unique_lock类似于lock_guard,只是unique_lock用法更加丰富,同时支持lock_guard()的原有功能。使用lock_guard后不能手动lock()与手动unlock();使用unique_lock后可以手动lock()与手动unlock();unique_lock的第二个参数,除了可以是adopt_lock,还可以是try_to_lock与defer_lock。

try_to_lock:尝试去锁定,得保证锁处于unlock的状态,然后尝试现在能不能获得锁;尝试用mutx的lock()去锁定这个mutex,但如果没有锁定成功,会立即返回,不会阻塞在那里。

defer_lock: 初始化了一个没有加锁的mutex。

(6)condition_variable

std::condition_variable是C++标准库中提供的条件变量,用于线程之间的同步与通信。它可以配合std::mutex或std::unique_lock一起使用,实现多个线程之间的等待和唤醒机制。

condition_variable头文件有两个variable类,一个是condition_variable,另一个是condition_variable_any。condition_variable必须结合unique_lock使用。condition_variable_any可以使用任何的锁。下面以condition_variable为例进行介绍。

condition_variable条件变量可以阻塞(wait、wait_for、wait_until)调用的线程直到使用(notify_one或notify_all)通知恢复为止。condition_variable是一个类,这个类既有构造函数也有析构函数,使用时需要构造对应的condition_variable对象,调用对象相应的函数来实现上面的功能。具体示例见参考文章3

(7)scoped_lock

在C++中,std::scoped_lock 是C++17标准引入的一个同步原语,用于简化多线程编程中的互斥锁管理。std::scoped_lock 是一个类模板,它提供了一种RAII(资源获取即初始化)机制,用于自动管理一种或多种互斥锁的锁定和解锁过程,帮助避免死锁,并简化代码。1

使用 std::scoped_lock 的好处包括:

  1. 自动锁管理std::scoped_lock 对象在构造时自动获取一个或多个互斥锁,并在对象生命周期结束时自动释放这些锁。这意味着不需要显式调用 lock()unlock()
  2. 异常安全:如果在持有锁时发生异常,std::scoped_lock 的析构函数会保证对应的锁被释放,避免死锁。
  3. 便于处理多锁std::scoped_lock 可以一次性锁定多个互斥量,并且在锁定多个互斥量时采用死锁避免算法。
  4. 无需担心复制或移动std::scoped_lock 不能被复制或移动,这避免了潜在的同步问题。

以下是使用 std::scoped_lock 的一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <mutex>
#include <thread>
#include <iostream>

std::mutex mtx1;
std::mutex mtx2;

void task() {
    // 同时锁定两个互斥量
    std::scoped_lock lock(mtx1, mtx2);
    
    // 以下代码块在mtx1和mtx2释放之前都是安全的
    std::cout << "Thread " << std::this_thread::get_id() << " has entered task." << std::endl;

    // 当函数返回,或者`scoped_lock`离开其作用域时,mtx1和mtx2将自动解锁
}

int main() {
    std::thread t1(task);
    std::thread t2(task);

    t1.join();
    t2.join();

    return 0;
}

在上面这个示例中,std::scoped_lock 将在 task 函数的开始自动锁定两个互斥量 mtx1mtx2。当该函数结束或者 scoped_lock 离开其作用域时,这两个互斥量将会自动解锁。这样,我们就可以确保在多线程环境下,访问共享资源时保持同步。

4.4 乐观锁和悲观锁

乐观锁和悲观锁是两种常见的并发控制策略,用于处理在数据库系统或多线程环境中的数据一致性问题。

  • 悲观锁:适用于写操作多、冲突多的环境,会造成更多的阻塞和等待。
  • 乐观锁:适用于读操作多、冲突少的环境,会有更好的性能,但在高冲突的情况下可能会导致更新失败。

(1)悲观锁(Pessimistic Locking)

悲观锁的核心思想是在数据被读取和写入的过程中,认为冲突很可能发生,并且会通过锁机制来强制防止冲突。在读取或修改数据之前,悲观锁会锁定数据,确保其他任何操作都无法对这些数据进行修改,直到锁被释放。这类似于在文件编辑中加锁,防止其他用户进行编辑。

悲观锁可以细分为几种类型,如共享锁(shared lock)和排他锁(exclusive lock)。共享锁允许多个事务读取同一数据,但在共享锁定时不能修改数据;排他锁确保当一个事务在读取或写入数据时,没有其他事务可以对这些数据进行读取或写入操作。

使用悲观锁策略时,系统的总体性能可能因为锁定操作等待时间而降低,但在高冲突环境中,悲观锁可以防止数据的不一致性。

(2)乐观锁(Optimistic Locking)

与悲观锁相对,乐观锁采取更加宽松的策略。它假设冲突发生的可能性很低,因此在数据操作过程中不会直接锁定资源。相反,乐观锁通常在数据被读取时记录一个标记(如时间戳或版本号),然后在实际提交更新到数据库时,检查该数据自读取以来是否被其他操作修改过。如果检测到冲突(例如,时间戳已经改变或版本号增加),则更新操作会被拒绝,并且通常会重试操作或报告错误。

乐观锁适用于读多写少的应用场景,因为它减少了锁定的开销,提高了系统的吞吐量。但是,在高并发写入的场景中可能会导致大量的冲突和失败的更新操作。

5. 异步编程

首先来看一下异步模型:在异步模型中,允许同一时间发生(处理)多个事件。程序调用一个耗时较长的功能(方法)时,它并不会阻塞程序的执行流程,程序会继续往下执行。当功能执行完毕时,程序能够获得执行完毕的消息或能够访问到执行的结果(如果有返回值或需要返回值时)。5

异步编程是一种编程范式,它允许程序在等待一个长时间操作完成(例如文件读写、网络请求等)时不会阻塞当前线程,从而可以执行其他任务。异步编程能有效提高资源的利用效率,特别是在涉及到高延迟操作或者高并发处理时。1

在异步编程模型中,相关操作通常会在一个不同的线程或者进程中执行,执行完成后通过某种方式(例如回调、事件、promise、future等)通知主程序继续处理结果。

在C++中,异步编程可以通过多种方式实现,其中一种是使用C++11标准引入的std::async函数和std::future类。此外,C++还有其他支持异步编程的库和模型,如Boost.Asio(用于异步I/O)和Intel Threading Building Blocks(TBB)等。

5.1 std::async 和 std::future

std::asyncstd::future是C++11标准库中提供的用于异步编程的工具。1

std::async是一个函数模板,用于创建一个异步任务,它返回一个std::future对象,它表示异步任务的结果。

std::future是一个模板类,用于保存异步任务返回的结果。通过std::future对象,我们可以获取异步任务的返回值,或者等待异步任务的完成。

要注意的是:std::async可能不会总是创建一个新的线程,这取决于第一个参数。当指定为std::launch::async时,会立刻在一个新线程中执行任务;如果不指定为std::launch::async,它可能会延迟执行直到调用get()wait()

5.2 基本用法

  • 使用std::async创建异步任务:

    1
    
    std::future<T> futureObj = std::async(std::launch::async, functionName, args...);
    
    • std::launch::async:表示要求立即在一个新线程中执行任务。若为std::launch::deferred 则表示非异步行为,即当其他线程将来调用get()来访问共享状态时,才会调用函数。
    • functionName:要执行的函数名。
    • args...:函数的参数。
  • 使用std::future获取异步任务的结果:

    1
    
    T result = futureObj.get();
    

    get()函数用于获取异步任务的结果,并将其赋值给result变量。在调用future对象的get()成员函数时,主线程会被阻塞直到异步线程执行结束,并把返回结果传递回来。

    需要注意的是,std::future对象只能获取一次结果。如果需要多次使用异步任务的结果,可以使用std::shared_future来共享结果。

  • 使用shared_future多次获取一步任务的结果:

    futureshard_future的用途都是为了占位,但是两者有些许差别。futureget()成员函数是转移数据所有权;shared_futureget()成员函数是复制数据。

    future对象的get()只能调用一次;无法实现多个线程等待同一个异步线程,一旦其中一个线程获取了异步线程的返回值,其他线程就无法再次获取。

    shared_future对象的get()可以调用多次;可以实现多个线程等待同一个异步线程,每个线程都可以获取异步线程的返回值。

5.3 简单例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <future>
#include <chrono>

// 假设这是一个耗时操作
int long_computation() {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
    return 42; // 假设计算的结果是42
}

int main() {
    // 启动一个异步任务
    std::future<int> result_future = std::async(std::launch::async, long_computation);

    // 这里可以执行其他操作,而long_computation在另一个线程中运行

    // 当需要结果时,等待异步操作完成并获取结果
    int result = result_future.get();

    std::cout << "The result is: " << result << std::endl;

    return 0;
}

5.4 与 std::thread 的联系

std::asyncstd::futurestd::thread都是C++11标准库中用于多线程编程的工具,它们之间有一些区别和适用场景。1

简单概念:

  1. std::asyncstd::future
    • std::async是一个函数模板,用于创建异步任务并返回一个std::future对象,表示异步任务的结果。
    • std::future是一个模板类,用于保存异步任务的结果,并提供获取结果和等待任务完成的函数。
    • std::asyncstd::future组合起来可以方便地创建和管理异步任务,适用于需要获取任务结果或等待任务完成的场景。
  2. std::thread
    • std::thread是一个类,用于创建线程对象,并指定线程要执行的函数或函数对象。
    • std::thread允许我们直接控制线程的创建、启动、结束和加入等操作。
    • std::thread适用于需要对线程进行更细粒度的控制,例如需要手动管理线程的生命周期或需要与其他线程进行协同工作的场景。

区别和建议:

  • std::asyncstd::future封装了线程的创建和管理,更适用于简单的异步任务和获取任务结果的场景。它们提供了更高级别的抽象,简化了多线程编程。
  • std::thread更适用于需要对线程进行更精细控制的场景,例如需要手动管理线程的生命周期、线程与线程之间的同步等。但使用std::thread需要更多的注意和手动处理线程的创建、结束和同步等问题。
  • 对于简单的异步任务,建议使用std::asyncstd::future,因为它们提供了更简单的接口和更高级别的抽象。
  • 如果需要更细粒度的线程控制或与其他线程进行协同工作,建议使用std::thread

5.5 一些问题

(1) 异步操作过多

当异步操作非常多时,使用std::async创建的线程可能会超过处理器能高效处理的线程数,这会导致上下文切换的开销过大,从而可能导致整体性能低于串行执行的性能。此外,如果所有线程都试图同时进行文件I/O,它们可能会遇到资源争夺的问题,因为磁盘I/O是一个共享资源,并且通常是同步执行的。这可能会导致线程在等待磁盘I/O完成时被阻塞,造成线程并没有真正并行运行。

要解决这一问题,可以采取以下几种策略:

  1. 线程池: 使用线程池来限制同时运行的线程数,这可以减少上下文切换的开销,并可能提高缓存利用率。C++11标准库中没有内置的线程池,但可以使用第三方库或者自己实现一个。
  2. 任务批处理: 将要创建和写入的文件分批处理,每个批次处理一定数量的文件。这样可以在保持并发水平和减少资源争夺之间取得平衡。
  3. I/O优化: 如果可能,可以尝试使用更高效的I/O策略,比如内存映射文件(memory-mapped files)或批量写入。
  4. 硬件并发: 如果有多个磁盘驱动器,可以尝试将文件分布到不同的驱动器上进行操作,这样可以提高I/O并发性。
  5. 异步I/O: 在某些平台上,可以使用真正的异步I/O操作,这不依赖于线程来实现并行性,而是由操作系统直接管理。这种方式通常比线程模型更高效,因为它避免了线程创建和上下文切换的开销。

6. 线程池

线程池是一种基于池化技术的多线程管理机制,主要用来减少在应用程序中创建和销毁线程的频繁操作,提高系统资源的重复利用率,并控制发并的线程数量。1

6.1 解决的问题

线程池作为一种高效管理线程的机制,具有如下几点:6

  1. 通过重用已存在的线程,减少对象的创建、销毁的开销,提升性能;
  2. 通过重复利用已创建的线程降低线程创建和销毁造成的消耗,防止消耗过多的内存或系统资源;
  3. 当任务到达时,任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,使应用程序响应更快;
  4. 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。线程池可以进行统一的分配、调优和监控,提高线程的可管理性。

6.2 基本思路

线程池的原理多个线程从一个任务队列中取任务,如果取到任务便执行任务,未取到任务则等待新的任务到来,直到将所有任务取完。

也就是说,线程池需要一个任务队列、一个线程队列,同时,为了保证取任务、添加任务的原子性,需要配套的控制变量(互斥锁、条件变量),具体详述如下:6

  1. 线程池初始化:线程池在创建时,会预先创建一组线程并保存在池中。这些线程通常处于休眠状态,等待任务的到来;
  2. 任务队列:当有新任务到达时,它会被放入一个任务队列中。线程池中的线程会等待新任务到来的通知;
  3. 线程复用:一旦线程执行完一个任务,它不会立即被销毁,而是一直在池内等待新任务的到来
  4. 线程管理:线程池还负责管理线程的生命周期。例如,如果所有线程都在忙碌状态,并且队列中还有新的任务等待处理,线程池可能会选择创建新的线程来处理这些任务。

依据使用场景的不同,存在但不限于如下两种场景

  1. 任务有不同的优先级,优先级高的任务希望能够先被执行,优先级低的任务可以延后执行;
  2. 针对需要执行的任务,有的任务需要结果,有的任务不需要结果。

6.3 线程池开源库

在 C++ 中,有多个成熟的线程池开源库可以使用。以下是一些广泛被社区认可的库:1

  1. Boost.Thread: Boost 是一套被广泛使用的 C++ 库,它包含了一个强大的线程库(Boost.Thread)。该库提供了包括线程池在内的多线程编程的支持。它是跨平台的,并且被社区广泛测试和使用。

    官网: Boost.Thread

  2. ThreadPool: ThreadPool 是一个简单的 C++11 线程池实现。它易于使用并且具备基本的线程池功能。

    GitHub: ThreadPool

  3. CTPL: CTPL 是一个基于 C++11 的线程池库,它可以在任务添加时自动调整线程数量。

    GitHub: CTPL

  4. Intel Threading Building Blocks (TBB): TBB 是一个由 Intel 开发的并行编程框架。它不仅包含了线程池,还提供了一整套并发数据结构和并行算法。

    官网: Intel TBB

  5. folly::CPUThreadPoolExecutor: Folly 是 Facebook 开发和使用的一个开源库,它包含了大量的 C++11 组件,其中包括一个高效的线程池实现。

    GitHub: Folly

  6. asio: Asio 是一个跨平台的 C++ 库,用于网络编程和异步 I/O,它也提供了线程池的功能。

    官网: Asio

选择哪个线程池库取决于您的具体需求以及您希望将线程池如何集成到您的项目中。

例如,如果您已经在使用 Boost 或 TBB 作为项目的一部分,那么选择它们自带的线程池会更加方便。如果您需要一个轻量级的线程池,可能会倾向于使用 ThreadPool 或 CTPL。而如果您的项目需要高级的异步I/O操作,可能会选择 Asio。

7. std::atomic

互斥量的加锁一般是针对一个代码段,而原子操作针对的一般都是一个变量。atomic是一个模板类,使用该模板类实例化的对象,提供了一些保证原子性的成员函数来实现共享数据的常用操作。

在以前,定义了一个共享的变量int i=0,多个线程会操作这个变量,那么每次操作这个变量时,都是用lock加锁,操作完毕使用unlock解锁,以保证线程之间不会冲突;现在,可以通过实例化一个对象automic i=0来代替锁,每次操作这个对象时,就不用lockunlock,这个对象自身就具有原子性,以保证线程之间不会冲突。

atomic对象提供了常见的原子操作(通过调用成员函数实现对数据的原子操作):

  • store是原子写操作。
  • load是原子读操作。
  • exchange是于两个数值进行交换的原子操作。
  • 即使使用了atomic,也要注意执行的操作是否支持原子性。一般atomic原子操作,针对++,–,+=,-=,&=, =,^=是支持的。

代码示例如下:3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
 
atomic_int64_t total = 0; //atomic_int64_t相当于int64_t,但是本身就拥有原子性
 
//线程函数,用于累加
void threadFunc(int64_t endNum) {
	for (int64_t i = 1; i <= endNum; ++i)
	{
		total += i;
	}
}
 
int main() {
	int64_t endNum = 100;
	thread t1(threadFunc, endNum);
	thread t2(threadFunc, endNum);
 
	t1.join();
	t2.join();
 
	cout << "total=" << total << endl; //10100
}

参考文章78910111213141516171819

本文由作者按照 CC BY 4.0 进行授权