第三部分:Scheduler——vLLM 的大脑 简介 Scheduler 是 vLLM 的核心调度器。在每个微秒级的时间窗口内,它都要做出关键决策:处理哪些 request、计算多少个 token、何时 preempt request,以及如何最大化 GPU 利用率。本文将深入探讨 Scheduler 的算法与实现。
调度挑战 问题空间 在任意时刻,Scheduler 必须处理:
等待中的 request :等待处理的新 prompt
运行中的 request :正处于 decode 阶段的持续生成任务
资源限制 :有限的 GPU 内存和计算预算
目标冲突 :最小化延迟 vs. 最大化吞吐量
动态负载 :request 异步到达和完成
没有简单的解决方案 与传统批处理不同,LLM 推理服务面临独特挑战:
请求长度不固定 :无法预测完成时间
内存随序列长度增长 :不仅仅受计算限制
prefill 与 decode 的不对称性 :prefill 每个 token 耗时约为 decode 的 100 倍
共享 KV cache :内存决策影响所有 request
Continuous Batching vLLM 的核心创新:continuous batching (又称迭代级批处理)。
传统静态批处理 1 2 3 4 5 6 7 8 9 10 11 12 13 batch = [] while len (batch) < batch_size: batch.append(wait_for_request()) outputs = model.forward(batch) while any_request_incomplete(batch): outputs = model.forward(batch)
问题 :
队头阻塞 :快速 request 需等待慢速 request
低 GPU 利用率 :随着 request 完成,batch 规模不断缩小
高延迟 :必须等待 batch 填满
Continuous Batching 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 running = [] while True : running = [r for r in running if not r.finished] while can_fit_new_request() and waiting_requests: running.append(waiting_requests.pop()) outputs = model.forward(running)
优点 :
无队头阻塞 :request 完成后立即离队
持续高 GPU 利用率 :始终维持最大 batch 大小
更低延迟 :新 request 可立即开始处理
Scheduler 架构 文件位置 :vllm/v1/core/sched/scheduler.py
核心数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Scheduler : def __init__ (self, vllm_config, ... ): self.requests: dict [str , Request] = {} self.waiting: RequestQueue = create_request_queue(policy) self.running: list [Request] = [] self.skipped_waiting: RequestQueue = create_request_queue(policy) self.kv_cache_manager = KVCacheManager(...) self.encoder_cache_manager = EncoderCacheManager(...) self.max_num_running_reqs = scheduler_config.max_num_seqs self.max_num_scheduled_tokens = scheduler_config.max_num_batched_tokens self.max_model_len = model_config.max_model_len
Request 状态 request 按以下状态流转:
1 2 3 4 5 WAITING ↓ RUNNING ↓ FINISHED
可能的状态转换包括:
WAITING → RUNNING:首次被调度
RUNNING → RUNNING:继续留在 batch 中
RUNNING → PREEMPTED:暂时移出(内存紧张时)
PREEMPTED → WAITING:重新返回等待队列
RUNNING → FINISHED:生成完毕
Request 队列 等待队列 :等待启动的新 request
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class RequestQueue : def __init__ (self, policy: SchedulingPolicy ): if policy == SchedulingPolicy.FCFS: self.queue = deque() elif policy == SchedulingPolicy.PRIORITY: self.queue = [] def push (self, request: Request ): if self.policy == SchedulingPolicy.FCFS: self.queue.append(request) else : heapq.heappush(self.queue, (-request.priority, request))
运行列表 :当前正在处理的 request
1 2 self.running: list [Request] = [req1, req2, ..., req_n]
调度算法 整体流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 def schedule (self ) -> SchedulerOutput: """Main scheduling function called every iteration.""" scheduled_new_reqs = [] scheduled_running_reqs = [] preempted_reqs = [] token_budget = self.max_num_scheduled_tokens for req in self.running: num_tokens_to_compute = min ( req.num_tokens - req.num_computed_tokens, token_budget ) if num_tokens_to_compute > 0 : blocks = self.kv_cache_manager.allocate_slots( req, num_tokens_to_compute ) if blocks is not None : scheduled_running_reqs.append(req) token_budget -= num_tokens_to_compute else : preempted_reqs.append(req) while token_budget > 0 and self.waiting: req = self.waiting.pop() if self.can_fit_request(req, token_budget): scheduled_new_reqs.append(req) token_budget -= req.num_prompt_tokens else : break return SchedulerOutput( scheduled_new_reqs=scheduled_new_reqs, scheduled_running_reqs=scheduled_running_reqs, preempted_reqs=preempted_reqs, num_scheduled_tokens=self.max_num_scheduled_tokens - token_budget )
第一步:Token Budget 每次迭代都有一个 token budget(通常为 2048-8192):
1 2 3 4 5 6 token_budget = self.max_num_scheduled_tokens
权衡 :
更大的 budget :更高吞吐量,但延迟更高
更小的 budget :更低延迟,但吞吐量降低
第二步:调度运行中的 Request 运行中的 request 拥有调度优先权:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for request in self.running: num_remaining = request.num_tokens - request.num_computed_tokens if num_remaining == 0 : self.finish_request(request) continue num_to_schedule = min (num_remaining, token_budget) blocks, num_cached = self.kv_cache_manager.allocate_slots( request, num_tokens=request.num_computed_tokens + num_to_schedule, num_computed_tokens=request.num_computed_tokens ) if blocks is None : if self.should_preempt(request): preempted_reqs.append(request) self.preempt_request(request) continue scheduled_running_reqs.append(request) request.num_scheduled_tokens = num_to_schedule token_budget -= num_to_schedule
关键洞察 :运行中的 request 通常处于 decode 模式(每次迭代产出 1 个 token),因此消耗的 token budget 极少。
第三步:调度等待中的 Request 若 token budget 仍有剩余,则接纳新的 request:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 while token_budget > 0 and len (self.waiting) > 0 : request = self.waiting.peek() if not self.can_admit_request(request, token_budget): break request = self.waiting.pop() cached_blocks, num_cached = \ self.kv_cache_manager.get_computed_blocks(request) num_to_compute = request.num_prompt_tokens - num_cached blocks = self.kv_cache_manager.allocate_slots( request, num_tokens=num_to_compute, cached_blocks=cached_blocks ) if blocks is None : self.waiting.push_front(request) break scheduled_new_reqs.append(request) token_budget -= num_to_compute self.running.append(request)
第四步:处理 Preemption 当一个 request 被 preempt 时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def preempt_request (self, request: Request ): """Preempt a running request to free memory.""" for block in request.blocks: self.kv_cache_manager.free(block) request.blocks = [] request.num_computed_tokens = 0 self.running.remove(request) self.waiting.push(request) request.num_preemptions += 1
Preemption 代价高昂 :所有已完成的计算都会丢失!
策略 :优先 preempt 进度最少的 request,以最小化浪费。
高级调度策略 Chunked Prefill 长 prompt 被分割成多个 chunk 分批处理:
1 2 3 4 5 6 7 8 9 10 11 schedule(request, num_tokens=2048 ) schedule(request, num_tokens=2048 )
优点 :
更低 TTFT :第一个 chunk 处理更快
公平调度 :不会阻塞所有其他 request
内存高效 :将计算分散到多次迭代中
实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 if request.num_computed_tokens < request.num_prompt_tokens: max_prefill_tokens = min ( self.max_num_scheduled_tokens, self.chunk_size ) num_to_schedule = min ( request.num_prompt_tokens - request.num_computed_tokens, max_prefill_tokens ) else : num_to_schedule = 1
优先级调度 request 可以设置优先级:
1 2 3 4 5 6 7 8 9 10 11 request_high_priority = Request( request_id="urgent" , priority=100 ) request_low_priority = Request( request_id="background" , priority=1 )
使用场景 :
付费等级 :付费用户获得优先权
交互式 vs 批处理 :交互式请求优先处理
截止时间 :有时效性的 request 会被提升优先级
支持 Speculative Decoding 对于 speculative decoding,需要调度 draft 模型和 target 模型:
1 2 3 4 5 6 7 8 9 10 11 12 draft_tokens = draft_model.generate(k=5 ) verified = main_model.verify(draft_tokens) if verified == 5 : request.num_tokens += 5 else : request.num_tokens += verified
Scheduler 必须 :
为 draft token 分配 KV cache
在验证失败时处理回滚
Request 重排序 动态重排序以优化缓存命中率:
1 2 3 4 5 6 7 8 9 10 11 12 13 def reorder_for_cache_hits (waiting: list [Request] ) -> list [Request]: """Reorder waiting requests to maximize prefix cache hits.""" scored = [ (estimate_cache_hit_rate(req), req) for req in waiting ] scored.sort(reverse=True ) return [req for _, req in scored]
资源管理 KV Cache 分配 Scheduler 必须确保 KV cache 容量充足:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def can_admit_request (self, request: Request, token_budget: int ) -> bool : """Check if request can be admitted.""" num_tokens_needed = request.num_prompt_tokens if num_tokens_needed > token_budget: return False num_blocks_needed = (num_tokens_needed + block_size - 1 ) // block_size num_free_blocks = self.kv_cache_manager.get_num_free_blocks() if num_blocks_needed > num_free_blocks: return False if len (self.running) >= self.max_num_running_reqs: return False return True
内存压力处理 当内存紧张时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if self.kv_cache_manager.get_num_free_blocks() < MIN_FREE_BLOCKS: preempt_candidates = sorted ( self.running, key=lambda r: r.num_computed_tokens ) for req in preempt_candidates[:NUM_TO_PREEMPT]: self.preempt_request(req) self.kv_cache_manager.evict_cached_blocks(NUM_BLOCKS_TO_EVICT)
Encoder Cache 管理 对于多模态模型(图像、音频):
1 2 3 4 5 6 7 def can_fit_encoder_inputs (self, request: Request ) -> bool : """Check if encoder cache has capacity.""" num_encoder_tokens = len (request.encoder_input_ids) available = self.encoder_cache_manager.get_available_capacity() return num_encoder_tokens <= available
性能指标 Scheduler 统计信息 Scheduler 会跟踪以下性能指标:
1 2 3 4 5 6 7 8 9 10 11 12 13 @dataclass class SchedulerStats : num_running: int num_waiting: int num_preempted: int num_scheduled_tokens: int token_budget: int utilization: float kv_cache_usage: float num_cache_hits: int num_cache_misses: int
优化目标 Scheduler 需要在多个目标之间取得平衡:
最大化吞吐量 :尽可能多地调度 token
最小化延迟 :优先处理运行中的 request,使用 chunked prefill
最大化缓存命中率 :对具有相同前缀的 request 进行重排序
公平性 :不让低优先级的 request 饿死
内存效率 :最小化 preemption 次数
示例:调度时间线 让我们追踪一个完整的调度场景:
初始状态 1 2 3 4 Waiting: [A (100 tokens), B (50 tokens)] Running: [] Token budget: 200 Free KV blocks: 1000
迭代 1 1 2 3 4 5 6 7 8 9 10 A.num_scheduled_tokens = 100 token_budget = 200 - 100 = 100 B.num_scheduled_tokens = 50 token_budget = 100 - 50 = 50
GPU 计算 :A prefill(100 个 token)+ B prefill(50 个 token)
迭代 2 1 2 3 4 5 6 7 8 9 10 A.num_scheduled_tokens = 1 B.num_scheduled_tokens = 1 token_budget = 200 - 2 = 198 C.num_scheduled_tokens = 198
GPU 计算 :A decode(1 个 token)+ B decode(1 个 token)+ C prefill(198 个 token)
迭代 3 1 2 3 4 5 6 7 8 9 10 B.num_scheduled_tokens = 1 C.num_scheduled_tokens = 200
迭代 10 1 2 3 4 5 6 7 8 9 10 11 self.preempt_request(B) D.num_scheduled_tokens = 500
调度策略 vLLM 支持多种调度策略:
FCFS(先到先服务) 1 2 3 waiting.append(new_request) next_request = waiting.pop(0 )
优点 :简单、公平缺点 :存在队头阻塞
基于优先级 1 2 3 heapq.heappush(waiting, (-priority, request)) priority, next_request = heapq.heappop(waiting)
优点 :重要的 request 优先处理缺点 :低优先级 request 可能饿死
最短任务优先 1 2 3 waiting.sort(key=lambda r: r.max_tokens) next_request = waiting.pop(0 )
优点 :最小化平均延迟缺点 :长 request 可能饿死
排查 Scheduler 问题 常见问题及解决方案:
高 Preemption 率 1 2 3 4 5 6 if stats.num_preempted > THRESHOLD:
低 GPU 利用率 1 2 3 4 5 if stats.utilization < 0.8 :
高延迟 1 2 3 4 5 6 if avg_ttft > TARGET_TTFT:
关键要点
Continuous batching 通过每次迭代动态增减 request,消除了队头阻塞
Token budget 控制 batch 大小,平衡延迟与吞吐量的权衡
运行中的 request 拥有优先权 ,以最小化 decode 延迟
Preemption 是内存紧张时的最后手段,会丢弃所有已完成的进度
Chunked prefill 在处理长 prompt 与交互式 request 之间取得平衡
前缀缓存集成 需要 Scheduler 感知缓存状态,以实现高效的请求接纳控制
下一步 在第四部分中,我们将探索 Request Processing ——request 如何从 tokenization 到流式输出在系统中流转,以及跨迭代的状态管理机制。
参考资料
Scheduler 是 vLLM 的大脑,每秒钟要做出成千上万个瞬间决策,在满足延迟 SLA 的同时最大化 GPU 利用率。接下来,我们将看到 request 如何在系统中端到端地流转。