引言:并发编程的时代
在现代软件开发中,多核处理器已成为标配,充分利用多核性能成为提升应用程序性能的关键手段。C++11标准引入的线程库彻底改变了C++并发编程的面貌,使得开发者无需依赖平台特定的API即可编写可移植的多线程代码。
本文将从基础概念出发,逐步深入到高级并发模式,帮助读者掌握C++现代并发编程的核心技术和最佳实践。
一、C++线程基础
1.1 创建和管理线程
C++11引入的std::thread类是并发编程的基石。创建线程非常简单,只需将可调用对象(函数、lambda、函数对象)传递给thread构造函数:
#include <iostream>
#include <thread>
#include <vector>
void worker(int id) {
std::cout << "Worker " << id << " is running on thread "
<< std::this_thread::get_id() << std::endl;
}
int main() {
std::vector<std::thread> threads;
// 创建多个工作线程
for (int i = 0; i < 4; ++i) {
threads.emplace_back(worker, i);
}
// 等待所有线程完成
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
return 0;
}
std::terminate终止程序。使用RAII模式管理线程生命周期是最佳实践。
1.2 线程参数传递
向线程函数传递参数时需要注意引用和拷贝的语义。默认情况下,参数会被拷贝到线程的内部存储:
void process(std::string& data); std::string data = "important data"; // 使用std::ref传递引用 std::thread t(process, std::ref(data)); // 或者使用指针 std::thread t2(process, &data);
二、互斥与同步
2.1 互斥锁基础
当多个线程访问共享数据时,必须使用同步机制防止数据竞争。std::mutex是最基本的互斥原语:
#include <mutex>
#include <atomic>
class Counter {
private:
std::mutex mtx;
int count = 0;
public:
void increment() {
std::lock_guard<std::mutex> lock(mtx);
++count;
}
int get() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
std::lock_guard和std::unique_lock提供了异常安全的锁管理方式,确保在任何退出路径上锁都会被释放。
2.2 条件变量
条件变量用于线程间的信号通知,是实现生产者-消费者模式的关键组件:
#include <condition_variable>
#include <queue>
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue;
std::mutex mtx;
std::condition_variable cv;
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(mtx);
queue.push(std::move(value));
}
cv.notify_one(); // 通知等待的线程
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return !queue.empty(); });
T value = std::move(queue.front());
queue.pop();
return value;
}
bool try_pop(T& value, std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mtx);
if (!cv.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
return false;
}
value = std::move(queue.front());
queue.pop();
return true;
}
};
三、原子操作与无锁编程
3.1 原子类型
对于简单的计数器或标志位,使用原子操作比互斥锁更高效:std::atomic提供了硬件级别的原子操作支持:
#include <atomic>
class AtomicCounter {
private:
std::atomic<int> count{0};
public:
void increment() {
count.fetch_add(1, std::memory_order_relaxed);
}
void increment_seq_cst() {
++count; // 默认使用memory_order_seq_cst
}
int get() const {
return count.load(std::memory_order_acquire);
}
bool compare_and_swap(int expected, int desired) {
return count.compare_exchange_strong(expected, desired);
}
};
memory_order_relaxed性能最好,但错误使用可能导致数据竞争。除非你完全理解内存模型的复杂性,否则建议使用默认的memory_order_seq_cst。
3.2 无锁数据结构
无锁队列是展示原子操作强大能力的经典示例:
template<typename T>
class LockFreeStack {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(T val) : data(std::move(val)), next(nullptr) {}
};
std::atomic<Node*> head{nullptr};
public:
void push(T value) {
Node* new_node = new Node(std::move(value));
new_node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(
new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
std::unique_ptr<T> pop() {
Node* old_head = head.load(std::memory_order_acquire);
while (old_head && !head.compare_exchange_weak(
old_head, old_head->next.load(std::memory_order_relaxed),
std::memory_order_release,
std::memory_order_relaxed)) {
// 重试直到成功
}
if (!old_head) return nullptr;
std::unique_ptr<T> result(new T(std::move(old_head->data)));
delete old_head;
return result;
}
};
四、异步编程与Future
4.1 async与future
C++11的std::async和std::future提供了更高级的异步编程抽象:
#include <future>
#include <vector>
int compute(int x) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return x * x;
}
int main() {
// 启动异步任务
std::future<int> result = std::async(std::launch::async, compute, 42);
// 执行其他工作...
std::cout << "Doing other work..." << std::endl;
// 获取结果(阻塞直到完成)
int value = result.get();
std::cout << "Result: " << value << std::endl;
// 并行计算多个值
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.push_back(std::async(std::launch::async, compute, i));
}
for (auto& f : futures) {
std::cout << f.get() << " ";
}
return 0;
}
4.2 Promise与自定义异步操作
std::promise允许你在一个线程中设置值,在另一个线程中通过future获取:
void producer(std::promise<std::string>& promise) {
try {
// 模拟工作
std::this_thread::sleep_for(std::chrono::seconds(2));
promise.set_value("Data processed successfully");
} catch (...) {
promise.set_exception(std::current_exception());
}
}
void consumer(std::future<std::string>& future) {
try {
std::string result = future.get();
std::cout << "Received: " << result << std::endl;
} catch (const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
}
}
五、线程池实现
线程池是管理线程资源、避免频繁创建销毁线程开销的标准模式。以下是基于C++17的完整线程池实现:
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <functional>
#include <condition_variable>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads = std::thread::hardware_concurrency())
: stop_flag(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop_flag || !tasks.empty();
});
if (stop_flag && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using return_type = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop_flag) {
throw std::runtime_error("Cannot enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return result;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop_flag = true;
}
condition.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop_flag;
};
六、C++20新特性
6.1 协程(Coroutines)
C++20引入的协程为异步编程提供了更优雅的解决方案:
#include <coroutine>
#include <optional>
template<typename T>
struct Generator {
struct promise_type {
T current_value;
auto get_return_object() {
return Generator{std::coroutine_handle<promise_type>::from_promise(*this)};
}
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() { std::terminate(); }
void return_void() {}
auto yield_value(T value) {
current_value = std::move(value);
return std::suspend_always{};
}
};
using handle_type = std::coroutine_handle<promise_type>;
handle_type handle;
explicit Generator(handle_type h) : handle(h) {}
~Generator() { if (handle) handle.destroy(); }
Generator(Generator&& other) : handle(other.handle) {
other.handle = nullptr;
}
class Iterator {
handle_type handle;
public:
explicit Iterator(handle_type h) : handle(h) {}
Iterator& operator++() {
handle.resume();
if (handle.done()) handle = nullptr;
return *this;
}
T& operator*() const { return handle.promise().current_value; }
bool operator!=(const Iterator& other) const { return handle != other.handle; }
};
Iterator begin() {
if (handle && !handle.done()) handle.resume();
return Iterator(handle.done() ? nullptr : handle);
}
Iterator end() { return Iterator(nullptr); }
};
// 使用协程生成斐波那契数列
Generator<long long> fibonacci(int n) {
long long a = 0, b = 1;
for (int i = 0; i < n; ++i) {
co_yield a;
auto next = a + b;
a = b;
b = next;
}
}
6.2 信号量(Semaphores)
C++20引入了计数信号量和二元信号量:
#include <semaphore>
class BoundedBuffer {
static constexpr size_t capacity = 10;
std::vector<int> buffer;
size_t read_pos = 0, write_pos = 0;
std::counting_semaphore<capacity> empty_slots{capacity};
std::counting_semaphore<capacity> filled_slots{0};
std::binary_semaphore mutex{1};
public:
BoundedBuffer() : buffer(capacity) {}
void produce(int item) {
empty_slots.acquire();
mutex.acquire();
buffer[write_pos] = item;
write_pos = (write_pos + 1) % capacity;
mutex.release();
filled_slots.release();
}
int consume() {
filled_slots.acquire();
mutex.acquire();
int item = buffer[read_pos];
read_pos = (read_pos + 1) % capacity;
mutex.release();
empty_slots.release();
return item;
}
};
七、性能优化与最佳实践
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 简单计数器 | std::atomic | 无锁,性能最优 |
| 复杂数据结构 | std::mutex + std::lock_guard | 简单安全,避免死锁 |
| 读多写少 | std::shared_mutex | 支持并发读 |
| 任务队列 | 条件变量 + 线程池 | 高效的任务分发 |
| 异步计算 | std::async / 协程 | 代码简洁,易于维护 |
- 死锁:确保锁的获取顺序一致,使用std::lock同时获取多个锁
- 虚假唤醒:使用条件变量时总是在循环中检查条件
- 数据竞争:即使是bool或int的读写也需要同步
- 线程局部存储滥用:过度使用thread_local可能导致内存膨胀
结语
C++现代并发编程从C++11到C++20经历了巨大的演进,为开发者提供了从底层原子操作到高层协程抽象的完整工具链。掌握这些技术不仅需要理解API的使用,更需要深入理解内存模型、同步原语的语义以及并发设计的原则。
在实际项目中,建议遵循"简单优先"的原则:只有当性能分析证明需要优化时,才考虑使用无锁数据结构或精细的内存序控制。正确的并发程序首先是正确的,其次才是快的。