引言:并发编程的时代

在现代软件开发中,多核处理器已成为标配,充分利用多核性能成为提升应用程序性能的关键手段。C++11标准引入的线程库彻底改变了C++并发编程的面貌,使得开发者无需依赖平台特定的API即可编写可移植的多线程代码。

本文将从基础概念出发,逐步深入到高级并发模式,帮助读者掌握C++现代并发编程的核心技术和最佳实践。

一、C++线程基础

1.1 创建和管理线程

C++11引入的std::thread类是并发编程的基石。创建线程非常简单,只需将可调用对象(函数、lambda、函数对象)传递给thread构造函数:

#include <iostream>
#include <thread>
#include <vector>

void worker(int id) {
    std::cout << "Worker " << id << " is running on thread " 
              << std::this_thread::get_id() << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    
    // 创建多个工作线程
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(worker, i);
    }
    
    // 等待所有线程完成
    for (auto& t : threads) {
        if (t.joinable()) {
            t.join();
        }
    }
    
    return 0;
}
重要提示:每个可joinable的线程必须被join或detach,否则在thread对象析构时会调用std::terminate终止程序。使用RAII模式管理线程生命周期是最佳实践。

1.2 线程参数传递

向线程函数传递参数时需要注意引用和拷贝的语义。默认情况下,参数会被拷贝到线程的内部存储:

void process(std::string& data);

std::string data = "important data";
// 使用std::ref传递引用
std::thread t(process, std::ref(data));

// 或者使用指针
std::thread t2(process, &data);

二、互斥与同步

2.1 互斥锁基础

当多个线程访问共享数据时,必须使用同步机制防止数据竞争。std::mutex是最基本的互斥原语:

#include <mutex>
#include <atomic>

class Counter {
private:
    std::mutex mtx;
    int count = 0;
    
public:
    void increment() {
        std::lock_guard<std::mutex> lock(mtx);
        ++count;
    }
    
    int get() const {
        std::lock_guard<std::mutex> lock(mtx);
        return count;
    }
};
RAII锁管理:std::lock_guardstd::unique_lock提供了异常安全的锁管理方式,确保在任何退出路径上锁都会被释放。

2.2 条件变量

条件变量用于线程间的信号通知,是实现生产者-消费者模式的关键组件:

#include <condition_variable>
#include <queue>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            queue.push(std::move(value));
        }
        cv.notify_one();  // 通知等待的线程
    }
    
    T pop() {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [this] { return !queue.empty(); });
        
        T value = std::move(queue.front());
        queue.pop();
        return value;
    }
    
    bool try_pop(T& value, std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mtx);
        if (!cv.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
            return false;
        }
        value = std::move(queue.front());
        queue.pop();
        return true;
    }
};

三、原子操作与无锁编程

3.1 原子类型

对于简单的计数器或标志位,使用原子操作比互斥锁更高效:std::atomic提供了硬件级别的原子操作支持:

#include <atomic>

class AtomicCounter {
private:
    std::atomic<int> count{0};
    
public:
    void increment() {
        count.fetch_add(1, std::memory_order_relaxed);
    }
    
    void increment_seq_cst() {
        ++count;  // 默认使用memory_order_seq_cst
    }
    
    int get() const {
        return count.load(std::memory_order_acquire);
    }
    
    bool compare_and_swap(int expected, int desired) {
        return count.compare_exchange_strong(expected, desired);
    }
};
内存序警告:虽然memory_order_relaxed性能最好,但错误使用可能导致数据竞争。除非你完全理解内存模型的复杂性,否则建议使用默认的memory_order_seq_cst

3.2 无锁数据结构

无锁队列是展示原子操作强大能力的经典示例:

template<typename T>
class LockFreeStack {
private:
    struct Node {
        T data;
        std::atomic<Node*> next;
        Node(T val) : data(std::move(val)), next(nullptr) {}
    };
    
    std::atomic<Node*> head{nullptr};
    
public:
    void push(T value) {
        Node* new_node = new Node(std::move(value));
        new_node->next = head.load(std::memory_order_relaxed);
        
        while (!head.compare_exchange_weak(
            new_node->next, new_node,
            std::memory_order_release,
            std::memory_order_relaxed));
    }
    
    std::unique_ptr<T> pop() {
        Node* old_head = head.load(std::memory_order_acquire);
        
        while (old_head && !head.compare_exchange_weak(
            old_head, old_head->next.load(std::memory_order_relaxed),
            std::memory_order_release,
            std::memory_order_relaxed)) {
            // 重试直到成功
        }
        
        if (!old_head) return nullptr;
        
        std::unique_ptr<T> result(new T(std::move(old_head->data)));
        delete old_head;
        return result;
    }
};

四、异步编程与Future

4.1 async与future

C++11的std::asyncstd::future提供了更高级的异步编程抽象:

#include <future>
#include <vector>

int compute(int x) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return x * x;
}

int main() {
    // 启动异步任务
    std::future<int> result = std::async(std::launch::async, compute, 42);
    
    // 执行其他工作...
    std::cout << "Doing other work..." << std::endl;
    
    // 获取结果(阻塞直到完成)
    int value = result.get();
    std::cout << "Result: " << value << std::endl;
    
    // 并行计算多个值
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 10; ++i) {
        futures.push_back(std::async(std::launch::async, compute, i));
    }
    
    for (auto& f : futures) {
        std::cout << f.get() << " ";
    }
    
    return 0;
}

4.2 Promise与自定义异步操作

std::promise允许你在一个线程中设置值,在另一个线程中通过future获取:

void producer(std::promise<std::string>& promise) {
    try {
        // 模拟工作
        std::this_thread::sleep_for(std::chrono::seconds(2));
        promise.set_value("Data processed successfully");
    } catch (...) {
        promise.set_exception(std::current_exception());
    }
}

void consumer(std::future<std::string>& future) {
    try {
        std::string result = future.get();
        std::cout << "Received: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Error: " << e.what() << std::endl;
    }
}

五、线程池实现

线程池是管理线程资源、避免频繁创建销毁线程开销的标准模式。以下是基于C++17的完整线程池实现:

#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <functional>
#include <condition_variable>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads = std::thread::hardware_concurrency())
        : stop_flag(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] {
                            return stop_flag || !tasks.empty();
                        });
                        
                        if (stop_flag && tasks.empty()) return;
                        
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using return_type = decltype(f(args...));
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> result = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop_flag) {
                throw std::runtime_error("Cannot enqueue on stopped ThreadPool");
            }
            tasks.emplace([task]() { (*task)(); });
        }
        
        condition.notify_one();
        return result;
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop_flag = true;
        }
        condition.notify_all();
        
        for (auto& worker : workers) {
            worker.join();
        }
    }
    
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop_flag;
};
使用示例:ThreadPool pool(4); auto future = pool.enqueue([](int x) { return x * x; }, 42); int result = future.get();

六、C++20新特性

6.1 协程(Coroutines)

C++20引入的协程为异步编程提供了更优雅的解决方案:

#include <coroutine>
#include <optional>

template<typename T>
struct Generator {
    struct promise_type {
        T current_value;
        
        auto get_return_object() {
            return Generator{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        
        auto initial_suspend() { return std::suspend_always{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        void unhandled_exception() { std::terminate(); }
        void return_void() {}
        
        auto yield_value(T value) {
            current_value = std::move(value);
            return std::suspend_always{};
        }
    };
    
    using handle_type = std::coroutine_handle<promise_type>;
    handle_type handle;
    
    explicit Generator(handle_type h) : handle(h) {}
    ~Generator() { if (handle) handle.destroy(); }
    
    Generator(Generator&& other) : handle(other.handle) {
        other.handle = nullptr;
    }
    
    class Iterator {
        handle_type handle;
    public:
        explicit Iterator(handle_type h) : handle(h) {}
        
        Iterator& operator++() {
            handle.resume();
            if (handle.done()) handle = nullptr;
            return *this;
        }
        
        T& operator*() const { return handle.promise().current_value; }
        bool operator!=(const Iterator& other) const { return handle != other.handle; }
    };
    
    Iterator begin() {
        if (handle && !handle.done()) handle.resume();
        return Iterator(handle.done() ? nullptr : handle);
    }
    
    Iterator end() { return Iterator(nullptr); }
};

// 使用协程生成斐波那契数列
Generator<long long> fibonacci(int n) {
    long long a = 0, b = 1;
    for (int i = 0; i < n; ++i) {
        co_yield a;
        auto next = a + b;
        a = b;
        b = next;
    }
}

6.2 信号量(Semaphores)

C++20引入了计数信号量和二元信号量:

#include <semaphore>

class BoundedBuffer {
    static constexpr size_t capacity = 10;
    std::vector<int> buffer;
    size_t read_pos = 0, write_pos = 0;
    
    std::counting_semaphore<capacity> empty_slots{capacity};
    std::counting_semaphore<capacity> filled_slots{0};
    std::binary_semaphore mutex{1};
    
public:
    BoundedBuffer() : buffer(capacity) {}
    
    void produce(int item) {
        empty_slots.acquire();
        mutex.acquire();
        
        buffer[write_pos] = item;
        write_pos = (write_pos + 1) % capacity;
        
        mutex.release();
        filled_slots.release();
    }
    
    int consume() {
        filled_slots.acquire();
        mutex.acquire();
        
        int item = buffer[read_pos];
        read_pos = (read_pos + 1) % capacity;
        
        mutex.release();
        empty_slots.release();
        return item;
    }
};

七、性能优化与最佳实践

场景 推荐方案 原因
简单计数器 std::atomic 无锁,性能最优
复杂数据结构 std::mutex + std::lock_guard 简单安全,避免死锁
读多写少 std::shared_mutex 支持并发读
任务队列 条件变量 + 线程池 高效的任务分发
异步计算 std::async / 协程 代码简洁,易于维护
常见陷阱:
  • 死锁:确保锁的获取顺序一致,使用std::lock同时获取多个锁
  • 虚假唤醒:使用条件变量时总是在循环中检查条件
  • 数据竞争:即使是bool或int的读写也需要同步
  • 线程局部存储滥用:过度使用thread_local可能导致内存膨胀

结语

C++现代并发编程从C++11到C++20经历了巨大的演进,为开发者提供了从底层原子操作到高层协程抽象的完整工具链。掌握这些技术不仅需要理解API的使用,更需要深入理解内存模型、同步原语的语义以及并发设计的原则。

在实际项目中,建议遵循"简单优先"的原则:只有当性能分析证明需要优化时,才考虑使用无锁数据结构或精细的内存序控制。正确的并发程序首先是正确的,其次才是快的。