引言:协程范式的演进
C++20协程是现代C++最重要的特性之一,它为异步编程提供了语言级别的支持。相比于回调函数和future/promise模式,协程让异步代码看起来像同步代码一样自然。
本文将深入探讨协程的内部机制、设计模式以及在实际系统中的应用。
一、协程基础概念
1.1 协程的核心组件
C++协程由三个核心组件构成:
- coroutine_handle:控制协程的生命周期
- promise_type:定义协程的返回值和生命周期钩子
- awaitable:可等待的对象
struct LazyGenerator {
struct promise_type {
int current_value;
auto get_return_object() {
return LazyGenerator{
std::coroutine_handle<promise_type>::from_promise(*this)
};
}
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() {
std::terminate();
}
void return_void() {}
auto yield_value(int value) {
current_value = value;
return std::suspend_always{};
}
};
std::coroutine_handle<promise_type> handle;
explicit LazyGenerator(std::coroutine_handle<promise_type> h)
: handle(h) {}
~LazyGenerator() {
if (handle) handle.destroy();
}
// 禁用拷贝
LazyGenerator(const LazyGenerator&) = delete;
LazyGenerator& operator=(const LazyGenerator&) = delete;
// 支持移动
LazyGenerator(LazyGenerator&&& other) noexcept
: handle(other.handle) {
other.handle = nullptr;
}
int next() {
if (handle) {
handle.resume();
if (handle.done()) return -1;
return handle.promise().current_value;
}
return -1;
}
};
重要区别:协程分为suspend_always和suspend_never两种。使用suspend_always时,协程在首次创建后立即暂停,需手动调用resume();使用suspend_never时,协程会立即执行。
二、async/await实现
2.1 基础异步任务
让我们实现一��类似async/await的任务系统:
struct AsyncTask {
struct promise_type {
std::variant<std::monostate, int, std::exception_ptr> result;
auto get_return_object() {
return AsyncTask{
std::coroutine_handle<promise_type>::from_promise(*this)
};
}
auto initial_suspend() { return std::suspend_never{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() {
result = std::current_exception();
}
void return_value(int value) {
result = value;
}
};
std::coroutine_handle<promise_type> handle;
int get_result() {
if (!handle || handle.done()) return -1;
auto& result = handle.promise().result;
if (std::holds_alternative<int>(result)) {
return std::get<int>(result);
}
if (std::holds_alternative<std::exception_ptr>(result)) {
std::rethrow_exception(std::get<std::exception_ptr>(result));
}
return -1;
}
bool is_ready() const {
return handle && handle.done();
}
void await_coroutine(std::coroutine_handle<promise_type> h) {
h();
}
};
2.2 自定义Awaitable
实现可等待对象的关键是定义await_suspend方法:
class SleepAwaitable {
std::chrono::milliseconds duration;
public:
explicit SleepAwaitable(std::chrono::milliseconds d)
: duration(d) {}
bool await_ready() const {
return duration.count() <= 0;
}
void await_suspend(std::coroutine_handle<> handle) {
std::thread([handle, this]() {
std::this_thread::sleep_for(duration);
handle.resume();
}).detach();
}
void await_resume() const {}
};
SleepAwaitable sleep(std::chrono::milliseconds ms) {
return SleepAwaitable(ms);
}
// 使用示例
AsyncTask my_task() {
std::cout << "Start" << std::endl;
co_await sleep(1s);
std::cout << "Wake up after 1 second" << std::endl;
co_return 42;
}
三、异步IO实现
3.1 异步文件操作
协程与IO多路复用结合是实现高性能服务器的基础:
class AsyncFileReader {
int fd;
std::vector<char> buffer;
public:
explicit AsyncFileReader(const char* path) {
fd = open(path, O_RDONLY);
if (fd < 0) throw std::runtime_error("Failed to open file");
buffer.resize(4096);
}
~AsyncFileReader() {
if (fd >= 0) close(fd);
}
struct ReadOperation {
AsyncFileReader& reader;
std::coroutine_handle<> continuation;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) {
continuation = handle;
// 在实际系统中,这里应注册到IO多路复用器
}
size_t await_resume() {
return read(reader.fd, reader.buffer.data(), reader.buffer.size());
}
};
ReadOperation read_async() {
return ReadOperation{*this, std::coroutine_handle<>{}};
}
};
AsyncTask read_file_content(const char* path) {
AsyncFileReader reader(path);
auto data = co_await reader.read_async();
std::string content(data);
co_return content.size();
}
3.2 异步TCP服务器
基于协程的echo服务器实现:
class AsyncSocket {
int fd;
public:
explicit AsyncSocket(int sockfd) : fd(sockfd) {}
struct AwaitableSend {
int fd;
std::span<const uint8_t> data;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) {
// 实际实现中注册到epoll
}
size_t await_resume() {
return send(fd, data.data(), data.size(), 0);
}
};
AwaitableSend send_async(std::span<const uint8_t> data) {
return AwaitableSend{fd, data};
}
};
AsyncTask handle_client(int client_fd) {
AsyncSocket socket(client_fd);
std::vector<uint8_t> buffer(4096);
while (true) {
auto bytes_read = co_await socket.receive_async(buffer);
if (bytes_read <= 0) break;
auto bytes_sent = co_await socket.send_async(
std::span<uint8_t>(buffer.data(), bytes_read)
);
}
}
性能注意:协程本身是轻量的,但真正的性能提升来自于避免阻塞IO。在实际部署中,需要结合IO多路复用(epoll/kqueue/IOCP)才能发挥协程的全部潜力。
四、生成器与流式处理
4.1 惰性生成器
协程生成器是实现惰性求值的完美工具:
template<typename T>
class LazySequence {
public:
struct promise_type {
T current_value;
auto get_return_object() {
return LazySequence{
std::coroutine_handle<promise_type>::from_promise(*this)
};
}
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() { std::terminate(); }
void return_void() {}
auto yield_value(T value) {
current_value = std::move(value);
return std::suspend_always{};
}
};
private:
std::coroutine_handle<promise_type> handle;
public:
explicit LazySequence(std::coroutine_handle<promise_type> h)
: handle(h) {}
~LazySequence() { if (handle) handle.destroy(); }
T next() {
if (!handle) return T{};
handle.resume();
if (handle.done()) return T{};
return std::move(handle.promise().current_value);
}
bool has_next() const {
return handle && !handle.done();
}
};
// 生成斐波那契数列
LazySequence<long long> fibonacci() {
long long a = 0, b = 1;
while (true) {
co_yield a;
long long next = a + b;
a = b;
b = next;
}
}
// 生成质数
LazySequence<int> primes(int limit) {
std::vector<bool> is_prime(limit + 1, true);
for (int i = 2; i * i <= limit; ++i) {
if (is_prime[i]) {
for (int j = i * i; j <= limit; j += i) {
is_prime[j] = false;
}
}
}
for (int i = 2; i <= limit; ++i) {
if (is_prime[i]) co_yield i;
}
}
4.2 流式数据处理管道
结合生成器实现函数式数据处理:
// 管道操作组合
template<typename T>
class Pipeline {
public:
// map���作
template<typename F>
auto map(F&& f) {
return [this, &f]() -> T {
auto value = next();
return f(value);
};
}
// filter操作
auto filter(auto&& predicate) {
T value;
do {
value = next();
} while (!predicate(value) && has_next());
return value;
}
// take操作
auto take(size_t n) {
size_t count = 0;
return [this, &count, n]() -> T {
if (count++ >= n) return T{};
return next();
};
}
};
int main() {
auto gen = fibonacci();
while (gen.has_next()) {
long long v = gen.next();
if (v > 1000) break;
std::cout << v << " ";
}
return 0;
}
五、错误处理与资源管理
5.1 协程中的异常传播
AsyncTask unsafe_operation() {
try {
auto result = co_await risky_task();
co_return result;
} catch (const std::exception& e) {
// 记录日志
log_error(e.what());
co_return -1; // 返回默认值或传播异常
}
}
struct SafeTask {
struct promise_type {
std::exception_ptr error;
auto get_return_object() {
return SafeTask{
std::coroutine_handle<promise_type>::from_promise(*this)
};
}
auto initial_suspend() { return std::suspend_never{}; }
auto final_suspend() noexcept { return std::suspend_always{}; }
void unhandled_exception() {
error = std::current_exception();
}
void return_void() {}
};
std::coroutine_handle<promise_type> handle;
void throw_if_error() {
if (handle && handle.promise().error) {
std::rethrow_exception(handle.promise().error);
}
}
};
5.2 RAII与协程结合
class ScopedCoroutine {
std::coroutine_handle<> handle;
public:
explicit ScopedCoroutine(std::coroutine_handle<> h)
: handle(h) {}
~ScopedCoroutine() {
if (handle) handle.destroy();
}
// 支持移动
ScopedCoroutine(ScopedCoroutine&& other) noexcept
: handle(other.handle) {
other.handle = nullptr;
}
void resume() {
if (handle) handle.resume();
}
};
class DatabaseConnection {
public:
class Awaitable {
DatabaseConnection& conn;
std::coroutine_handle<> continuation;
public:
Awaitable(DatabaseConnection& c) : conn(c) {}
bool await_ready() const { return conn.is_connected(); }
void await_suspend(std::coroutine_handle<> h) {
continuation = h;
conn.connect_async([this] {
continuation.resume();
});
}
void await_resume() {
if (!conn.is_connected()) {
throw std::runtime_error("Connection failed");
}
}
};
Awaitable connect_async() { return Awaitable(*this); }
};
六、性能对比与最佳实践
| 场景 | 传统方案 | 协程方案 | 性能提升 |
|---|---|---|---|
| 异步IO | callback/promise | async/await | 代码可读性↑ |
| 流式处理 | 迭代器 | 生成器 | 内存↓ |
| 并发模型 | 线程池 | 协程+IO多路复用 | 1:1000 |
| 状态机 | switch/函数指针 | 协程状态保存 | 代码量↓ |
最佳实践:
- 协程handle的生命周期必须严格管理,使用RAII包装
- 避免在协程中创建过大的局部变量
- 合理选择suspend_always和suspend_never
- 协程不等于线程,不要用协程替代线程
结语
C++20协程为异步编程带来了革命性的变化。它不仅让异步代码更容易编写和维护,更重要的是,它使得构建高性能异步系统成为可能。
在实际应用中,协程通常需要与IO多路复用(epoll/kqueue/IOCP)结合使用,才能发挥最大性能。理解协程的挂起-恢复机制对于设计高效的异步系统至关重要。