Rust 异步编程基础

Rust 异步编程基础

本文介绍 Rust 异步编程模型的核心概念,包括 Future、Poll 机制、async/await 语法以及 Tokio 运行时架构。


1. 同步 vs 异步

1.1 同步 I/O 的问题

传统的同步 I/O 模型中,当程序执行 I/O 操作(如网络请求、文件读写)时,线程会被阻塞,直到操作完成:

sequenceDiagram participant T1 as Thread 1 participant IO as I/O 操作 T1->>IO: 发起请求 Note over T1: 阻塞等待... IO-->>T1: 完成 T1->>T1: 继续执行

问题

  • 每个并发任务需要一个线程
  • 线程切换开销大(~1-10μs)
  • 内存占用高(每个线程 ~1MB 栈空间)

1.2 异步 I/O 的优势

异步 I/O 模型允许在等待 I/O 时切换到其他任务:

graph LR subgraph thread["单线程执行"] A["任务 A
发起 I/O"] --> B["任务 B
发起 I/O"] B --> C["任务 A
I/O 完成"] C --> D["任务 C
发起 I/O"] D --> E["任务 B
I/O 完成"] end

优势

  • 单线程可处理大量并发
  • 无线程切换开销
  • 内存占用低

2. Future 与 Poll 机制

2.1 Future Trait

Rust 的异步模型基于 Future trait,这是一种 惰性求值 的设计:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),    // 计算完成,返回结果
    Pending,     // 计算未完成,需要稍后再试
}

2.2 Poll 状态机

stateDiagram-v2 [*] --> Created: async fn 创建 Created --> Polling: executor 调用 poll() Polling --> Ready: 返回 Poll::Ready(result) Polling --> Pending: 返回 Poll::Pending Pending --> Polling: Waker 通知后再次 poll Ready --> [*]: 获取结果

2.3 关键概念

概念说明
惰性(Lazy)Future 创建后不会立即执行,必须被 poll 才会推进
协作式(Cooperative)Future 主动让出控制权,而非被抢占
零成本抽象编译时转换为状态机,无运行时开销

3. async/await 语法糖

3.1 基本用法

async/await 是对 Future 的语法糖,使异步代码看起来像同步代码:

// 使用 async/await
async fn fetch_and_process() -> Result<Data> {
    let response = client.get("http://api.example.com").await?;
    let data = response.json::<Data>().await?;
    Ok(data)
}

3.2 状态机转换

编译器将 async 函数转换为状态机:

stateDiagram-v2 Start --> WaitingForResponse: client.get() 发起请求 WaitingForResponse --> WaitingForJson: 响应到达 WaitingForJson --> Done: JSON 解析完成 note right of WaitingForResponse: 网络 I/O 进行中 note right of WaitingForJson: JSON 解析中

编译器生成的状态机

enum FetchAndProcessState {
    Start,
    WaitingForResponse { client: Client },
    WaitingForJson { response: Response },
    Done,
}

4. Tokio 异步运行时

4.1 核心组件

Tokio 是 Rust 生态中最流行的异步运行时,Dynamo 选择 Tokio 作为底层运行时。

graph TB subgraph tokio["Tokio Runtime"] subgraph scheduler["Scheduler 调度器"] MT["Multi-Thread
多线程调度器"] CT["Current-Thread
单线程调度器"] end subgraph driver["I/O Driver"] EP["Epoll/Kqueue/IOCP
系统事件循环"] TIM["Timer
定时器"] end subgraph task["Task System"] Q1["Work Queue 1"] Q2["Work Queue 2"] GQ["Global Queue"] end BP["Blocking Pool
阻塞线程池"] end MT --> Q1 MT --> Q2 MT --> GQ EP --> MT TIM --> MT

4.2 Work-Stealing 调度

Tokio 的多线程调度器使用 Work-Stealing 算法来平衡负载:

graph TB subgraph workers["Worker 线程"] W1["Worker 1
任务 A, B, C"] W2["Worker 2
任务 D, E"] W3["Worker 3
空闲"] end W3 --> |steal| W1 GQ["Global Inject Queue
新任务首先进入"] GQ --> W1 GQ --> W2 GQ --> W3

Work-Stealing 的优势

优势说明
负载均衡空闲 Worker 主动获取任务
缓存友好任务优先在创建它的 Worker 上执行
低竞争每个 Worker 有自己的队列

4.3 Blocking 线程池

某些操作无法异步化(如文件系统操作、CPU 密集计算),Tokio 提供 Blocking 线程池:

// 在 blocking 线程池中执行同步操作
let result = tokio::task::spawn_blocking(|| {
    // 这里的代码在独立线程中运行
    std::fs::read_to_string("large_file.txt")
}).await?;
graph LR subgraph async_rt["Async Runtime"] AT1["Async Task 1"] AT2["Async Task 2"] end subgraph blocking["Blocking Pool"] BT1["Blocking Task 1"] BT2["Blocking Task 2"] end AT1 --> |spawn_blocking| BT1 BT1 --> |JoinHandle| AT1

5. Dynamo 中的应用

5.1 Runtime 配置

Dynamo 基于 Tokio 构建运行时:

// Dynamo Runtime 的核心结构
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,      // 主任务运行时
    secondary: RuntimeType,    // 后台任务运行时
    cancellation_token: CancellationToken,
}

5.2 双运行时设计

graph TB subgraph primary["Primary Runtime - 多线程"] P1["推理请求处理"] P2["HTTP 请求处理"] P3["数据预处理"] end subgraph secondary["Secondary Runtime - 单线程"] S1["etcd Keep-Alive"] S2["NATS 订阅"] S3["健康检查"] end Client["客户端请求"] --> P1 ETCD[("etcd")] <--> S1 NATS[("NATS")] <--> S2

分离的原因

  1. 防止阻塞:后台任务不会抢占业务任务的 CPU
  2. 资源控制:可以独立配置两个运行时的线程数
  3. 故障隔离:后台任务的问题不会直接影响主业务

小结

本文介绍了 Rust 异步编程的核心概念:

  1. Future 与 Poll:惰性求值、协作式调度
  2. async/await:编译器将异步代码转换为状态机
  3. Tokio 运行时:Work-Stealing 调度、Blocking 线程池
  4. Dynamo 应用:双运行时设计,隔离业务和后台任务

下一篇

继续阅读 02-分布式系统基础,了解分布式系统的核心概念。