Title here
Summary here
调度器(Scheduler)是 vLLM 的"大脑",它决定了哪些请求可以运行以及每个请求能处理多少 token。一个好的调度策略直接影响系统的吞吐量、延迟和资源利用率。
本章我们将深入了解 vLLM 调度器的工作原理、调度算法和实现细节。
Scheduler 的核心职责:
# vllm/v1/core/sched/scheduler.py
class Scheduler:
def __init__(self, ...):
# 等待队列:存放新到达和被抢占的请求
self.waiting = create_request_queue(self.policy)
# 运行队列:存放正在处理的请求
self.running: list[Request] = []
# 请求字典:通过 request_id 快速查找
self.requests: dict[str, Request] = {}两个队列的关系:
class Scheduler:
def __init__(self, ...):
# 最大并发请求数
self.max_num_running_reqs = self.scheduler_config.max_num_seqs
# 每步最大 token 数
self.max_num_scheduled_tokens = self.scheduler_config.max_num_batched_tokens
# 最大序列长度
self.max_model_len = vllm_config.model_config.max_model_len约束说明:
| 约束 | 默认值 | 说明 |
|---|---|---|
max_num_seqs | 256 | 最多同时运行的请求数 |
max_num_batched_tokens | 2048 | 每步最多处理的 token 数 |
max_model_len | 模型配置 | 单个序列的最大长度 |
# vllm/v1/core/sched/output.py
@dataclass
class SchedulerOutput:
"""调度器的输出,传递给 ModelExecutor"""
# 新调度的请求
scheduled_new_reqs: list[NewRequestData]
# 继续运行的请求
scheduled_cached_reqs: CachedRequestData
# 每个请求调度的 token 数
num_scheduled_tokens: dict[str, int]
# 总调度 token 数
total_num_scheduled_tokens: int
# 抢占的请求 ID
preempted_req_ids: set[str]
# 完成的请求 ID
finished_req_ids: set[str]schedule() 是调度器的核心方法,每个 step 调用一次。
vLLM 的调度没有明确的 "prefill 阶段" 或 "decode 阶段"。
每个请求只有两个关键状态:
- num_computed_tokens: 已计算的 token 数
- num_tokens: 总 token 数(prompt + output)
每一步,调度器尝试分配 token,使 num_computed_tokens 追上 num_tokens。
这种设计足够通用,支持分块预填充、前缀缓存、投机解码等各种优化。def schedule(self) -> SchedulerOutput:
# ... 初始化 ...
# 第一步:处理 RUNNING 请求
req_index = 0
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]
# 计算需要调度的 token 数
num_new_tokens = (
request.num_tokens_with_spec # 总 token 数
+ request.num_output_placeholders # 输出占位符
- request.num_computed_tokens # 已计算的 token 数
)
# 应用长 prefill 分块限制
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold
# 不超过 token 预算
num_new_tokens = min(num_new_tokens, token_budget)
# 不超过模型最大长度
num_new_tokens = min(
num_new_tokens,
self.max_model_len - 1 - request.num_computed_tokens
)
if num_new_tokens == 0:
req_index += 1
continue
# 尝试分配 KV Cache
while True:
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_lookahead_tokens=self.num_lookahead_tokens,
)
if new_blocks is not None:
# 分配成功
break
# 分配失败,执行抢占
preempted_req = self._select_preempt_target()
self._preempt_request(preempted_req, timestamp)
if preempted_req == request:
# 自己被抢占了,无法继续
break
if new_blocks is None:
break
# 调度成功
scheduled_running_reqs.append(request)
token_budget -= num_new_tokens
req_index += 1 # 第二步:处理 WAITING 请求(只有没有抢占时才处理)
if not preempted_reqs:
while self.waiting and token_budget > 0:
# 检查并发限制
if len(self.running) == self.max_num_running_reqs:
break
request = self.waiting.peek_request()
# 检查各种等待状态
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
# 等待远程 KV Cache 加载
...
if request.status == RequestStatus.WAITING_FOR_FSM:
# 等待语法编译
...
# 查找前缀缓存
new_computed_blocks, num_cached = (
self.kv_cache_manager.get_computed_blocks(request)
)
# 计算需要调度的 token 数
num_new_tokens = request.num_tokens - num_cached
num_new_tokens = min(num_new_tokens, token_budget)
# 分配 KV Cache
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_new_computed_tokens=num_cached,
new_computed_blocks=new_computed_blocks,
)
if new_blocks is None:
# 无法分配,停止处理 waiting 队列
break
# 调度成功,移入 running 队列
request = self.waiting.pop_request()
self.running.append(request)
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_cached
token_budget -= num_new_tokens当 KV Cache 内存不足时,调度器需要抢占一些请求来释放空间。
def _select_preempt_target(self) -> Request:
"""选择要抢占的请求"""
if self.policy == SchedulingPolicy.PRIORITY:
# 优先级调度:抢占优先级最低、到达最晚的请求
return max(
self.running,
key=lambda r: (r.priority, r.arrival_time),
)
else:
# FCFS:抢占最后加入的请求
return self.running[-1]def _preempt_request(self, request: Request, timestamp: float) -> None:
"""抢占请求并放回 waiting 队列"""
assert request.status == RequestStatus.RUNNING
# 释放 KV Cache
self.kv_cache_manager.free(request)
self.encoder_cache_manager.free(request)
# 更新请求状态
request.status = RequestStatus.PREEMPTED
request.num_computed_tokens = 0 # 重置计算进度
request.num_preemptions += 1
# 放回 waiting 队列头部(优先恢复)
self.waiting.prepend_request(request)# 默认策略
class SchedulingPolicy(Enum):
FCFS = "fcfs"
PRIORITY = "priority"FCFS 特点:
# 基于优先级的调度
def _select_preempt_target(self):
if self.policy == SchedulingPolicy.PRIORITY:
# 选择优先级最低的请求
return max(
self.running,
key=lambda r: (r.priority, r.arrival_time),
)Priority 特点:
对于长输入,vLLM 支持分块预填充,将 prefill 分成多个 chunk 处理。
传统 Prefill(一次性处理):
step 1: [token_1 ... token_1000] → 处理 1000 个 token
分块 Prefill:
step 1: [token_1 ... token_256] → 处理 256 个 token
step 2: [token_257 ... token_512] → 处理 256 个 token
step 3: [token_513 ... token_768] → 处理 256 个 token
step 4: [token_769 ... token_1000] → 处理 232 个 token# 长 prefill 分块阈值
threshold = self.scheduler_config.long_prefill_token_threshold
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold # 限制每次处理的 token 数def update_from_output(
self,
model_runner_output: ModelRunnerOutput,
...
) -> EngineCoreOutputs:
"""根据模型输出更新请求状态"""
for req_id, sampler_output in model_output.items():
request = self.requests[req_id]
# 追加输出 token
request.append_output_token_ids(sampler_output.sampled_token_ids)
# 检查停止条件
stopped = check_stop(request, self.max_model_len)
if stopped:
# 请求完成
self._free_request(request)
self.finished_req_ids.add(req_id)
outputs.append(...)| 参数 | 说明 | 建议值 |
|---|---|---|
max_num_seqs | 最大并发请求数 | 根据 GPU 内存调整 |
max_num_batched_tokens | 每步最大 token 数 | 2048-8192 |
enable_chunked_prefill | 启用分块预填充 | 建议开启 |
long_prefill_token_threshold | 长 prefill 阈值 | 256-512 |
policy | 调度策略 | fcfs 或 priority |
1. 提高吞吐量
# 增加最大并发数
llm = LLM(model="...", max_num_seqs=512)
llm = LLM(model="...", max_num_batched_tokens=4096)2. 降低延迟
# 启用分块预填充
llm = LLM(
model="...",
enable_chunked_prefill=True,
long_prefill_token_threshold=256,
)3. 处理高优先级请求
# 使用优先级调度
llm = LLM(model="...", policy="priority")
llm.generate(prompt, priority=0) # 高优先级
llm.generate(prompt, priority=10) # 低优先级| 功能 | 文件 | 关键类/函数 |
|---|---|---|
| Scheduler 主类 | vllm/v1/core/sched/scheduler.py | Scheduler |
| schedule 方法 | vllm/v1/core/sched/scheduler.py:313 | schedule() |
| 抢占逻辑 | vllm/v1/core/sched/scheduler.py:892 | _preempt_request() |
| 调度输出 | vllm/v1/core/sched/output.py | SchedulerOutput |
| 请求队列 | vllm/v1/core/sched/request_queue.py | create_request_queue() |
| 调度策略 | vllm/v1/core/sched/request_queue.py | SchedulingPolicy |
本章我们深入了解了 vLLM 调度器的工作原理:
在下一章中,我们将学习连续批处理(Continuous Batching)机制,了解 vLLM 如何实现高效的动态批处理。