引言:协程范式的演进

C++20协程是现代C++最重要的特性之一,它为异步编程提供了语言级别的支持。相比于回调函数和future/promise模式,协程让异步代码看起来像同步代码一样自然。

本文将深入探讨协程的内部机制、设计模式以及在实际系统中的应用。

一、协程基础概念

1.1 协程的核心组件

C++协程由三个核心组件构成:

  • coroutine_handle:控制协程的生命周期
  • promise_type:定义协程的返回值和生命周期钩子
  • awaitable:可等待的对象
struct LazyGenerator {
    struct promise_type {
        int current_value;
        
        auto get_return_object() {
            return LazyGenerator{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        
        auto initial_suspend() { return std::suspend_always{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        
        void unhandled_exception() {
            std::terminate();
        }
        
        void return_void() {}
        
        auto yield_value(int value) {
            current_value = value;
            return std::suspend_always{};
        }
    };
    
    std::coroutine_handle<promise_type> handle;
    
    explicit LazyGenerator(std::coroutine_handle<promise_type> h) 
        : handle(h) {}
    
    ~LazyGenerator() { 
        if (handle) handle.destroy(); 
    }
    
    // 禁用拷贝
    LazyGenerator(const LazyGenerator&) = delete;
    LazyGenerator& operator=(const LazyGenerator&) = delete;
    
    // 支持移动
    LazyGenerator(LazyGenerator&&& other) noexcept 
        : handle(other.handle) {
        other.handle = nullptr;
    }
    
    int next() {
        if (handle) {
            handle.resume();
            if (handle.done()) return -1;
            return handle.promise().current_value;
        }
        return -1;
    }
};
重要区别:协程分为suspend_always和suspend_never两种。使用suspend_always时,协程在首次创建后立即暂停,需手动调用resume();使用suspend_never时,协程会立即执行。

二、async/await实现

2.1 基础异步任务

让我们实现一��类似async/await的任务系统:

struct AsyncTask {
    struct promise_type {
        std::variant<std::monostate, int, std::exception_ptr> result;
        
        auto get_return_object() {
            return AsyncTask{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        
        auto initial_suspend() { return std::suspend_never{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        
        void unhandled_exception() {
            result = std::current_exception();
        }
        
        void return_value(int value) {
            result = value;
        }
    };
    
    std::coroutine_handle<promise_type> handle;
    
    int get_result() {
        if (!handle || handle.done()) return -1;
        
        auto& result = handle.promise().result;
        if (std::holds_alternative<int>(result)) {
            return std::get<int>(result);
        }
        if (std::holds_alternative<std::exception_ptr>(result)) {
            std::rethrow_exception(std::get<std::exception_ptr>(result));
        }
        return -1;
    }
    
    bool is_ready() const { 
        return handle && handle.done(); 
    }
    
    void await_coroutine(std::coroutine_handle<promise_type> h) {
        h();
    }
};

2.2 自定义Awaitable

实现可等待对象的关键是定义await_suspend方法:

class SleepAwaitable {
    std::chrono::milliseconds duration;
    
public:
    explicit SleepAwaitable(std::chrono::milliseconds d) 
        : duration(d) {}
    
    bool await_ready() const {
        return duration.count() <= 0;
    }
    
    void await_suspend(std::coroutine_handle<> handle) {
        std::thread([handle, this]() {
            std::this_thread::sleep_for(duration);
            handle.resume();
        }).detach();
    }
    
    void await_resume() const {}
};

SleepAwaitable sleep(std::chrono::milliseconds ms) {
    return SleepAwaitable(ms);
}

// 使用示例
AsyncTask my_task() {
    std::cout << "Start" << std::endl;
    co_await sleep(1s);
    std::cout << "Wake up after 1 second" << std::endl;
    co_return 42;
}

三、异步IO实现

3.1 异步文件操作

协程与IO多路复用结合是实现高性能服务器的基础:

class AsyncFileReader {
    int fd;
    std::vector<char> buffer;
    
public:
    explicit AsyncFileReader(const char* path) {
        fd = open(path, O_RDONLY);
        if (fd < 0) throw std::runtime_error("Failed to open file");
        buffer.resize(4096);
    }
    
    ~AsyncFileReader() { 
        if (fd >= 0) close(fd); 
    }
    
    struct ReadOperation {
        AsyncFileReader& reader;
        std::coroutine_handle<> continuation;
        
        bool await_ready() const { return false; }
        
        void await_suspend(std::coroutine_handle<> handle) {
            continuation = handle;
            // 在实际系统中,这里应注册到IO多路复用器
        }
        
        size_t await_resume() {
            return read(reader.fd, reader.buffer.data(), reader.buffer.size());
        }
    };
    
    ReadOperation read_async() {
        return ReadOperation{*this, std::coroutine_handle<>{}};
    }
};

AsyncTask read_file_content(const char* path) {
    AsyncFileReader reader(path);
    
    auto data = co_await reader.read_async();
    std::string content(data);
    
    co_return content.size();
}

3.2 异步TCP服务器

基于协程的echo服务器实现:

class AsyncSocket {
    int fd;
    
public:
    explicit AsyncSocket(int sockfd) : fd(sockfd) {}
    
    struct AwaitableSend {
        int fd;
        std::span<const uint8_t> data;
        
        bool await_ready() const { return false; }
        
        void await_suspend(std::coroutine_handle<>) {
            // 实际实现中注册到epoll
        }
        
        size_t await_resume() {
            return send(fd, data.data(), data.size(), 0);
        }
    };
    
    AwaitableSend send_async(std::span<const uint8_t> data) {
        return AwaitableSend{fd, data};
    }
};

AsyncTask handle_client(int client_fd) {
    AsyncSocket socket(client_fd);
    std::vector<uint8_t> buffer(4096);
    
    while (true) {
        auto bytes_read = co_await socket.receive_async(buffer);
        if (bytes_read <= 0) break;
        
        auto bytes_sent = co_await socket.send_async(
            std::span<uint8_t>(buffer.data(), bytes_read)
        );
    }
}
性能注意:协程本身是轻量的,但真正的性能提升来自于避免阻塞IO。在实际部署中,需要结合IO多路复用(epoll/kqueue/IOCP)才能发挥协程的全部潜力。

四、生成器与流式处理

4.1 惰性生成器

协程生成器是实现惰性求值的完美工具:

template<typename T>
class LazySequence {
public:
    struct promise_type {
        T current_value;
        
        auto get_return_object() {
            return LazySequence{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        
        auto initial_suspend() { return std::suspend_always{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        void unhandled_exception() { std::terminate(); }
        void return_void() {}
        
        auto yield_value(T value) {
            current_value = std::move(value);
            return std::suspend_always{};
        }
    };
    
private:
    std::coroutine_handle<promise_type> handle;
    
public:
    explicit LazySequence(std::coroutine_handle<promise_type> h) 
        : handle(h) {}
    
    ~LazySequence() { if (handle) handle.destroy(); }
    
    T next() {
        if (!handle) return T{};
        handle.resume();
        if (handle.done()) return T{};
        return std::move(handle.promise().current_value);
    }
    
    bool has_next() const {
        return handle && !handle.done();
    }
};

// 生成斐波那契数列
LazySequence<long long> fibonacci() {
    long long a = 0, b = 1;
    while (true) {
        co_yield a;
        long long next = a + b;
        a = b;
        b = next;
    }
}

// 生成质数
LazySequence<int> primes(int limit) {
    std::vector<bool> is_prime(limit + 1, true);
    for (int i = 2; i * i <= limit; ++i) {
        if (is_prime[i]) {
            for (int j = i * i; j <= limit; j += i) {
                is_prime[j] = false;
            }
        }
    }
    for (int i = 2; i <= limit; ++i) {
        if (is_prime[i]) co_yield i;
    }
}

4.2 流式数据处理管道

结合生成器实现函数式数据处理:

// 管道操作组合
template<typename T>
class Pipeline {
public:
    // map���作
    template<typename F>
    auto map(F&& f) {
        return [this, &f]() -> T {
            auto value = next();
            return f(value);
        };
    }
    
    // filter操作
    auto filter(auto&& predicate) {
        T value;
        do {
            value = next();
        } while (!predicate(value) && has_next());
        return value;
    }
    
    // take操作
    auto take(size_t n) {
        size_t count = 0;
        return [this, &count, n]() -> T {
            if (count++ >= n) return T{};
            return next();
        };
    }
};

int main() {
    auto gen = fibonacci();
    
    while (gen.has_next()) {
        long long v = gen.next();
        if (v > 1000) break;
        std::cout << v << " ";
    }
    
    return 0;
}

五、错误处理与资源管理

5.1 协程中的异常传播

AsyncTask unsafe_operation() {
    try {
        auto result = co_await risky_task();
        co_return result;
    } catch (const std::exception& e) {
        // 记录日志
        log_error(e.what());
        co_return -1;  // 返回默认值或传播异常
    }
}

struct SafeTask {
    struct promise_type {
        std::exception_ptr error;
        
        auto get_return_object() {
            return SafeTask{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        
        auto initial_suspend() { return std::suspend_never{}; }
        auto final_suspend() noexcept { return std::suspend_always{}; }
        
        void unhandled_exception() {
            error = std::current_exception();
        }
        
        void return_void() {}
    };
    
    std::coroutine_handle<promise_type> handle;
    
    void throw_if_error() {
        if (handle && handle.promise().error) {
            std::rethrow_exception(handle.promise().error);
        }
    }
};

5.2 RAII与协程结合

class ScopedCoroutine {
    std::coroutine_handle<> handle;
    
public:
    explicit ScopedCoroutine(std::coroutine_handle<> h) 
        : handle(h) {}
    
    ~ScopedCoroutine() {
        if (handle) handle.destroy();
    }
    
    // 支持移动
    ScopedCoroutine(ScopedCoroutine&& other) noexcept
        : handle(other.handle) {
        other.handle = nullptr;
    }
    
    void resume() {
        if (handle) handle.resume();
    }
};

class DatabaseConnection {
public:
    class Awaitable {
        DatabaseConnection& conn;
        std::coroutine_handle<> continuation;
        
    public:
        Awaitable(DatabaseConnection& c) : conn(c) {}
        
        bool await_ready() const { return conn.is_connected(); }
        
        void await_suspend(std::coroutine_handle<> h) {
            continuation = h;
            conn.connect_async([this] { 
                continuation.resume(); 
            });
        }
        
        void await_resume() {
            if (!conn.is_connected()) {
                throw std::runtime_error("Connection failed");
            }
        }
    };
    
    Awaitable connect_async() { return Awaitable(*this); }
};

六、性能对比与最佳实践

场景 传统方案 协程方案 性能提升
异步IO callback/promise async/await 代码可读性↑
流式处理 迭代器 生成器 内存↓
并发模型 线程池 协程+IO多路复用 1:1000
状态机 switch/函数指针 协程状态保存 代码量↓
最佳实践:
  • 协程handle的生命周期必须严格管理,使用RAII包装
  • 避免在协程中创建过大的局部变量
  • 合理选择suspend_always和suspend_never
  • 协程不等于线程,不要用协程替代线程

结语

C++20协程为异步编程带来了革命性的变化。它不仅让异步代码更容易编写和维护,更重要的是,它使得构建高性能异步系统成为可能。

在实际应用中,协程通常需要与IO多路复用(epoll/kqueue/IOCP)结合使用,才能发挥最大性能。理解协程的挂起-恢复机制对于设计高效的异步系统至关重要。