Title here
Summary here
本章将完整跟踪一个请求从用户提交到最终返回的全过程,将前面章节的知识串联起来,帮助读者建立完整的认知图景。
# 用户代码
from vllm import LLM, SamplingParams
llm = LLM(model="meta-llama/Llama-2-7b-hf")
prompts = ["The capital of France is"]
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=50)
outputs = llm.generate(prompts, sampling_params)# vllm/entrypoints/llm.py
def generate(self, prompts, sampling_params, ...):
# 1. 处理输入
for prompt in prompts:
# Tokenize prompt
prompt_token_ids = self.tokenizer.encode(prompt)
# 创建请求
request_id = str(next(self.request_counter))
self._add_request(
request_id=request_id,
prompt=prompt,
prompt_token_ids=prompt_token_ids,
params=sampling_params,
)# vllm/v1/engine/llm_engine.py
def add_request(self, request_id, prompt, params, ...):
# 构建 EngineCoreRequest
engine_request = EngineCoreRequest(
request_id=request_id,
prompt_token_ids=prompt_token_ids,
sampling_params=params,
arrival_time=time.time(),
eos_token_id=self.tokenizer.eos_token_id,
)
# 发送到 EngineCore
self.engine_core.add_request(engine_request)# vllm/v1/core/sched/scheduler.py
def add_request(self, request: EngineCoreRequest) -> None:
# 1. 创建内部 Request 对象
internal_request = Request(
request_id=request.request_id,
prompt_token_ids=request.prompt_token_ids,
sampling_params=request.sampling_params,
)
# 2. 计算 block hashes(用于前缀缓存)
if self.enable_caching:
internal_request.block_hashes = compute_block_hashes(
internal_request.prompt_token_ids,
self.block_size,
)
# 3. 加入 waiting 队列
internal_request.status = RequestStatus.WAITING
self.waiting.append_request(internal_request)
# 4. 记录到请求字典
self.requests[request.request_id] = internal_request# vllm/v1/core/sched/scheduler.py :: schedule()
request = self.waiting.peek_request()
new_computed_blocks, num_cached_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)
# 例如:prompt 有 100 tokens,前 64 个已缓存# 计算需要处理的 token 数
num_new_tokens = request.num_tokens - num_cached_tokens
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_new_computed_tokens=num_cached_tokens,
new_computed_blocks=new_computed_blocks,
)
if new_blocks is None:
# 内存不足,请求继续等待
return# 从 waiting 移除
request = self.waiting.pop_request()
self.running.append(request)
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_cached_tokens# vllm/v1/worker/gpu_model_runner.py
def execute_model(self, scheduler_output: SchedulerOutput):
# 1. 准备 input_ids
input_ids = self._prepare_input_ids(scheduler_output)
# 2. 准备 positions
positions = self._prepare_positions(scheduler_output)
# 3. 准备 attention metadata
attn_metadata = self._prepare_attention_metadata(scheduler_output)
# 4. 更新 block table
self._update_block_table(scheduler_output) # 5. 前向传播
with torch.inference_mode():
hidden_states = self.model(
input_ids=input_ids,
positions=positions,
kv_caches=self.kv_caches,
attn_metadata=attn_metadata,
)
# 6. 计算 logits
logits = self.model.compute_logits(hidden_states)
return ModelRunnerOutput(logits=logits, ...)# vllm/v1/executor/abstract.py
def sample_tokens(self, model_output: ModelRunnerOutput) -> SamplerOutput:
# 构建采样元数据
sampling_metadata = self._prepare_sampling_metadata()
# 采样
sampler_output = self.sampler(
model_output.logits,
sampling_metadata,
)
return sampler_output# vllm/v1/core/sched/scheduler.py
def update_from_output(self, model_output, sampler_output, scheduler_output):
for req_id, output in sampler_output.items():
request = self.requests[req_id]
# 获取新生成的 token
new_token_ids = output.sampled_token_ids.tolist()
# 追加到请求
request.append_output_token_ids(new_token_ids)
# 更新 computed_tokens
request.num_computed_tokens += 1 # 检查是否完成
finish_reason, stop_str = check_stop(request, self.max_model_len)
if finish_reason is not None:
# 请求完成
self._finish_request(request, finish_reason)
finished_outputs.append(...)
else:
# 继续生成
outputs.append(...)def _finish_request(self, request: Request, reason: FinishReason):
# 1. 释放 KV Cache
self.kv_cache_manager.free(request)
# 2. 从 running 移除
self.running.remove(request)
# 3. 更新状态
request.status = RequestStatus.FINISHED
# 4. 记录完成
self.finished_req_ids.add(request.request_id)# vllm/v1/engine/llm_engine.py
def _process_outputs(self, engine_outputs: EngineCoreOutputs):
results = []
for output in engine_outputs.outputs:
request = self.requests[output.request_id]
# 增量解码
new_text = self.detokenizer.decode(
request,
output.new_token_ids,
)
# 更新请求的输出文本
request.output_text += new_text
results.append(...)
return resultsdef _make_request_output(self, request: Request, finished: bool):
return RequestOutput(
request_id=request.request_id,
prompt=request.prompt,
prompt_token_ids=request.prompt_token_ids,
outputs=[
CompletionOutput(
index=0,
text=request.output_text,
token_ids=request.output_token_ids,
finish_reason=request.finish_reason,
logprobs=request.logprobs,
)
],
finished=finished,
)# vllm/entrypoints/llm.py
def _run_engine(self, use_tqdm: bool):
outputs = []
while self.llm_engine.has_unfinished_requests():
step_outputs = self.llm_engine.step()
for output in step_outputs:
if output.finished:
outputs.append(output)
return sorted(outputs, key=lambda x: int(x.request_id))用户输入
↓
prompt: str
↓ Tokenize
prompt_token_ids: list[int]
↓ 创建请求
EngineCoreRequest
↓ 调度器内部
Request (internal)
↓ 调度
SchedulerOutput
↓ 执行
ModelRunnerOutput (logits)
↓ 采样
SamplerOutput (token_ids)
↓ 更新
EngineCoreOutput
↓ Detokenize
RequestOutput
↓
用户输出本章我们完整跟踪了一个请求的生命周期:
提交阶段:
调度阶段:
执行阶段:
更新阶段:
返回阶段:
通过这个完整的流程分析,我们可以看到 vLLM 的各个组件是如何协同工作的,以及为什么它能够实现高效的 LLM 推理。