vLLM Deep Dive Part 3: The Scheduler - Brain of vLLM

第三部分:Scheduler——vLLM 的大脑

简介

Scheduler 是 vLLM 的核心调度器。在每个微秒级的时间窗口内,它都要做出关键决策:处理哪些 request、计算多少个 token、何时 preempt request,以及如何最大化 GPU 利用率。本文将深入探讨 Scheduler 的算法与实现。

调度挑战

问题空间

在任意时刻,Scheduler 必须处理:

  • 等待中的 request:等待处理的新 prompt
  • 运行中的 request:正处于 decode 阶段的持续生成任务
  • 资源限制:有限的 GPU 内存和计算预算
  • 目标冲突:最小化延迟 vs. 最大化吞吐量
  • 动态负载:request 异步到达和完成

没有简单的解决方案

与传统批处理不同,LLM 推理服务面临独特挑战:

  1. 请求长度不固定:无法预测完成时间
  2. 内存随序列长度增长:不仅仅受计算限制
  3. prefill 与 decode 的不对称性:prefill 每个 token 耗时约为 decode 的 100 倍
  4. 共享 KV cache:内存决策影响所有 request

Continuous Batching

vLLM 的核心创新:continuous batching(又称迭代级批处理)。

传统静态批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
# Static batching - wait for batch to fill
batch = []
while len(batch) < batch_size:
batch.append(wait_for_request())

# Process entire batch
outputs = model.forward(batch)

# Wait for ALL requests to finish
while any_request_incomplete(batch):
outputs = model.forward(batch)

# All done - start next batch

问题

  • 队头阻塞:快速 request 需等待慢速 request
  • 低 GPU 利用率:随着 request 完成,batch 规模不断缩小
  • 高延迟:必须等待 batch 填满

Continuous Batching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Continuous batching - add/remove every iteration
running = []

while True:
# Remove finished requests
running = [r for r in running if not r.finished]

# Add new requests if there's capacity
while can_fit_new_request() and waiting_requests:
running.append(waiting_requests.pop())

# Process current batch
outputs = model.forward(running)

# Immediately continue - no waiting!

优点

  • 无队头阻塞:request 完成后立即离队
  • 持续高 GPU 利用率:始终维持最大 batch 大小
  • 更低延迟:新 request 可立即开始处理

Scheduler 架构

文件位置vllm/v1/core/sched/scheduler.py

核心数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Scheduler:
def __init__(self, vllm_config, ...):
# Request storage
self.requests: dict[str, Request] = {}

# Queues
self.waiting: RequestQueue = create_request_queue(policy)
self.running: list[Request] = []
self.skipped_waiting: RequestQueue = create_request_queue(policy)

# Resource managers
self.kv_cache_manager = KVCacheManager(...)
self.encoder_cache_manager = EncoderCacheManager(...)

# Constraints
self.max_num_running_reqs = scheduler_config.max_num_seqs
self.max_num_scheduled_tokens = scheduler_config.max_num_batched_tokens
self.max_model_len = model_config.max_model_len

Request 状态

request 按以下状态流转:

1
2
3
4
5
WAITING

RUNNING

FINISHED

可能的状态转换包括:

  • WAITING → RUNNING:首次被调度
  • RUNNING → RUNNING:继续留在 batch 中
  • RUNNING → PREEMPTED:暂时移出(内存紧张时)
  • PREEMPTED → WAITING:重新返回等待队列
  • RUNNING → FINISHED:生成完毕

Request 队列

等待队列:等待启动的新 request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RequestQueue:
def __init__(self, policy: SchedulingPolicy):
if policy == SchedulingPolicy.FCFS:
# First-Come-First-Serve
self.queue = deque()
elif policy == SchedulingPolicy.PRIORITY:
# Priority-based (higher priority first)
self.queue = [] # heapq

def push(self, request: Request):
if self.policy == SchedulingPolicy.FCFS:
self.queue.append(request)
else:
heapq.heappush(self.queue, (-request.priority, request))

运行列表:当前正在处理的 request

1
2
# Simply a list - all are processed each iteration
self.running: list[Request] = [req1, req2, ..., req_n]

调度算法

整体流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def schedule(self) -> SchedulerOutput:
"""Main scheduling function called every iteration."""

# 1. Allocate resources to requests
scheduled_new_reqs = []
scheduled_running_reqs = []
preempted_reqs = []

# Token budget for this iteration
token_budget = self.max_num_scheduled_tokens

# 2. Schedule running requests (priority!)
for req in self.running:
num_tokens_to_compute = min(
req.num_tokens - req.num_computed_tokens,
token_budget
)

if num_tokens_to_compute > 0:
# Allocate KV cache blocks
blocks = self.kv_cache_manager.allocate_slots(
req, num_tokens_to_compute
)

if blocks is not None:
scheduled_running_reqs.append(req)
token_budget -= num_tokens_to_compute
else:
# Out of memory - preempt
preempted_reqs.append(req)

# 3. Schedule waiting requests (if budget remains)
while token_budget > 0 and self.waiting:
req = self.waiting.pop()

# Can fit in batch?
if self.can_fit_request(req, token_budget):
scheduled_new_reqs.append(req)
token_budget -= req.num_prompt_tokens
else:
break # Can't fit more

# 4. Build output
return SchedulerOutput(
scheduled_new_reqs=scheduled_new_reqs,
scheduled_running_reqs=scheduled_running_reqs,
preempted_reqs=preempted_reqs,
num_scheduled_tokens=self.max_num_scheduled_tokens - token_budget
)

第一步:Token Budget

每次迭代都有一个 token budget(通常为 2048-8192):

1
2
3
4
5
6
token_budget = self.max_num_scheduled_tokens  # e.g., 4096

# Why limit tokens?
# 1. GPU memory for activations
# 2. Computation time (TTFT)
# 3. CUDA kernel efficiency (not too large)

权衡

  • 更大的 budget:更高吞吐量,但延迟更高
  • 更小的 budget:更低延迟,但吞吐量降低

第二步:调度运行中的 Request

运行中的 request 拥有调度优先权:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for request in self.running:
# How many tokens does this request need?
num_remaining = request.num_tokens - request.num_computed_tokens

if num_remaining == 0:
# Request is finished
self.finish_request(request)
continue

# In decode phase: compute 1 token per iteration
# In prefill phase: may compute multiple tokens
num_to_schedule = min(num_remaining, token_budget)

# Try to allocate KV cache blocks
blocks, num_cached = self.kv_cache_manager.allocate_slots(
request,
num_tokens=request.num_computed_tokens + num_to_schedule,
num_computed_tokens=request.num_computed_tokens
)

if blocks is None:
# Out of KV cache memory!
# Preemption decision
if self.should_preempt(request):
preempted_reqs.append(request)
self.preempt_request(request)
continue

# Successfully scheduled
scheduled_running_reqs.append(request)
request.num_scheduled_tokens = num_to_schedule
token_budget -= num_to_schedule

关键洞察:运行中的 request 通常处于 decode 模式(每次迭代产出 1 个 token),因此消耗的 token budget 极少。

第三步:调度等待中的 Request

若 token budget 仍有剩余,则接纳新的 request:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
while token_budget > 0 and len(self.waiting) > 0:
request = self.waiting.peek() # Don't pop yet

# Check if request can fit
if not self.can_admit_request(request, token_budget):
break # Can't fit any more

# Pop and schedule
request = self.waiting.pop()

# Find prefix cache hits
cached_blocks, num_cached = \
self.kv_cache_manager.get_computed_blocks(request)

num_to_compute = request.num_prompt_tokens - num_cached

# Allocate blocks for new tokens
blocks = self.kv_cache_manager.allocate_slots(
request,
num_tokens=num_to_compute,
cached_blocks=cached_blocks
)

if blocks is None:
# Out of memory - back to waiting queue
self.waiting.push_front(request)
break

# Scheduled!
scheduled_new_reqs.append(request)
token_budget -= num_to_compute

# Move to running queue
self.running.append(request)

第四步:处理 Preemption

当一个 request 被 preempt 时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def preempt_request(self, request: Request):
"""Preempt a running request to free memory."""

# Free all KV cache blocks
for block in request.blocks:
self.kv_cache_manager.free(block)

# Clear blocks
request.blocks = []
request.num_computed_tokens = 0 # Must recompute!

# Move back to waiting queue
self.running.remove(request)
self.waiting.push(request)

# Increment preemption counter (for logging)
request.num_preemptions += 1

Preemption 代价高昂:所有已完成的计算都会丢失!

策略:优先 preempt 进度最少的 request,以最小化浪费。

高级调度策略

Chunked Prefill

长 prompt 被分割成多个 chunk 分批处理:

1
2
3
4
5
6
7
8
9
10
11
# Prompt with 10,000 tokens, chunk_size=2048

# Iteration 1: Compute tokens 0-2047
schedule(request, num_tokens=2048)

# Iteration 2: Compute tokens 2048-4095
schedule(request, num_tokens=2048)

# ... continue until all prefill done

# Then switch to decode mode (1 token/iter)

优点

  • 更低 TTFT:第一个 chunk 处理更快
  • 公平调度:不会阻塞所有其他 request
  • 内存高效:将计算分散到多次迭代中

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
if request.num_computed_tokens < request.num_prompt_tokens:
# Still in prefill phase
max_prefill_tokens = min(
self.max_num_scheduled_tokens,
self.chunk_size # e.g., 2048
)
num_to_schedule = min(
request.num_prompt_tokens - request.num_computed_tokens,
max_prefill_tokens
)
else:
# Decode phase - 1 token per iteration
num_to_schedule = 1

优先级调度

request 可以设置优先级:

1
2
3
4
5
6
7
8
9
10
11
request_high_priority = Request(
request_id="urgent",
priority=100 # Higher = more important
)

request_low_priority = Request(
request_id="background",
priority=1
)

# Scheduler processes high priority first

使用场景

  • 付费等级:付费用户获得优先权
  • 交互式 vs 批处理:交互式请求优先处理
  • 截止时间:有时效性的 request 会被提升优先级

支持 Speculative Decoding

对于 speculative decoding,需要调度 draft 模型和 target 模型:

1
2
3
4
5
6
7
8
9
10
11
12
# Draft model proposes k=5 tokens
draft_tokens = draft_model.generate(k=5)

# Main model verifies all k tokens in ONE iteration
verified = main_model.verify(draft_tokens) # Returns 0-5

if verified == 5:
# All accepted! 5x speedup
request.num_tokens += 5
else:
# Partial accept - rollback rest
request.num_tokens += verified

Scheduler 必须

  • 为 draft token 分配 KV cache
  • 在验证失败时处理回滚

Request 重排序

动态重排序以优化缓存命中率:

1
2
3
4
5
6
7
8
9
10
11
12
13
def reorder_for_cache_hits(waiting: list[Request]) -> list[Request]:
"""Reorder waiting requests to maximize prefix cache hits."""

# Score each request by expected cache hit rate
scored = [
(estimate_cache_hit_rate(req), req)
for req in waiting
]

# Sort by score (descending)
scored.sort(reverse=True)

return [req for _, req in scored]

资源管理

KV Cache 分配

Scheduler 必须确保 KV cache 容量充足:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def can_admit_request(self, request: Request, token_budget: int) -> bool:
"""Check if request can be admitted."""

# 1. Check token budget
num_tokens_needed = request.num_prompt_tokens
if num_tokens_needed > token_budget:
return False

# 2. Check if KV cache has enough free blocks
num_blocks_needed = (num_tokens_needed + block_size - 1) // block_size
num_free_blocks = self.kv_cache_manager.get_num_free_blocks()

if num_blocks_needed > num_free_blocks:
return False

# 3. Check max running requests
if len(self.running) >= self.max_num_running_reqs:
return False

return True

内存压力处理

当内存紧张时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if self.kv_cache_manager.get_num_free_blocks() < MIN_FREE_BLOCKS:
# High memory pressure!

# Option 1: Preempt newest running requests
preempt_candidates = sorted(
self.running,
key=lambda r: r.num_computed_tokens # Preempt least progress
)

for req in preempt_candidates[:NUM_TO_PREEMPT]:
self.preempt_request(req)

# Option 2: Evict cached blocks
self.kv_cache_manager.evict_cached_blocks(NUM_BLOCKS_TO_EVICT)

Encoder Cache 管理

对于多模态模型(图像、音频):

1
2
3
4
5
6
7
def can_fit_encoder_inputs(self, request: Request) -> bool:
"""Check if encoder cache has capacity."""

num_encoder_tokens = len(request.encoder_input_ids)
available = self.encoder_cache_manager.get_available_capacity()

return num_encoder_tokens <= available

性能指标

Scheduler 统计信息

Scheduler 会跟踪以下性能指标:

1
2
3
4
5
6
7
8
9
10
11
12
13
@dataclass
class SchedulerStats:
num_running: int
num_waiting: int
num_preempted: int

num_scheduled_tokens: int
token_budget: int
utilization: float # scheduled / budget

kv_cache_usage: float # 0.0 - 1.0
num_cache_hits: int
num_cache_misses: int

优化目标

Scheduler 需要在多个目标之间取得平衡:

  1. 最大化吞吐量:尽可能多地调度 token
  2. 最小化延迟:优先处理运行中的 request,使用 chunked prefill
  3. 最大化缓存命中率:对具有相同前缀的 request 进行重排序
  4. 公平性:不让低优先级的 request 饿死
  5. 内存效率:最小化 preemption 次数

示例:调度时间线

让我们追踪一个完整的调度场景:

初始状态

1
2
3
4
Waiting: [A (100 tokens), B (50 tokens)]
Running: []
Token budget: 200
Free KV blocks: 1000

迭代 1

1
2
3
4
5
6
7
8
9
10
# Schedule A (new)
A.num_scheduled_tokens = 100
token_budget = 200 - 100 = 100

# Schedule B (new)
B.num_scheduled_tokens = 50
token_budget = 100 - 50 = 50

# Running: [A, B]
# Waiting: []

GPU 计算:A prefill(100 个 token)+ B prefill(50 个 token)

迭代 2

1
2
3
4
5
6
7
8
9
10
# Both in decode mode now
A.num_scheduled_tokens = 1
B.num_scheduled_tokens = 1
token_budget = 200 - 2 = 198

# New request C arrives (2000 tokens)
# Can fit with chunked prefill
C.num_scheduled_tokens = 198

# Running: [A, B, C]

GPU 计算:A decode(1 个 token)+ B decode(1 个 token)+ C prefill(198 个 token)

迭代 3

1
2
3
4
5
6
7
8
9
10
# A finishes (generated 50 tokens)
# Free A's KV blocks

# B continues
B.num_scheduled_tokens = 1

# C continues prefill
C.num_scheduled_tokens = 200

# Running: [B, C]

迭代 10

1
2
3
4
5
6
7
8
9
10
11
# C finishes prefill, now in decode
# New request D (500 tokens) with high priority

# Memory pressure! Preempt B
self.preempt_request(B)

# Schedule D
D.num_scheduled_tokens = 500

# Running: [C, D]
# Waiting: [B]

调度策略

vLLM 支持多种调度策略:

FCFS(先到先服务)

1
2
3
# Simple queue
waiting.append(new_request)
next_request = waiting.pop(0)

优点:简单、公平
缺点:存在队头阻塞

基于优先级

1
2
3
# Priority heap
heapq.heappush(waiting, (-priority, request))
priority, next_request = heapq.heappop(waiting)

优点:重要的 request 优先处理
缺点:低优先级 request 可能饿死

最短任务优先

1
2
3
# Sort by estimated completion time
waiting.sort(key=lambda r: r.max_tokens)
next_request = waiting.pop(0)

优点:最小化平均延迟
缺点:长 request 可能饿死

排查 Scheduler 问题

常见问题及解决方案:

高 Preemption 率

1
2
3
4
5
6
# Symptom: Many requests restarting
if stats.num_preempted > THRESHOLD:
# Solutions:
# 1. Increase KV cache size
# 2. Decrease max_num_seqs
# 3. Enable chunked prefill

低 GPU 利用率

1
2
3
4
5
# Symptom: token_budget not fully used
if stats.utilization < 0.8:
# Solutions:
# 1. Increase max_num_seqs
# 2. Reduce chunk_size (more parallelism)

高延迟

1
2
3
4
5
6
# Symptom: TTFT too high
if avg_ttft > TARGET_TTFT:
# Solutions:
# 1. Enable chunked prefill
# 2. Prioritize interactive requests
# 3. Reduce max_num_scheduled_tokens

关键要点

  1. Continuous batching 通过每次迭代动态增减 request,消除了队头阻塞

  2. Token budget 控制 batch 大小,平衡延迟与吞吐量的权衡

  3. 运行中的 request 拥有优先权,以最小化 decode 延迟

  4. Preemption 是内存紧张时的最后手段,会丢弃所有已完成的进度

  5. Chunked prefill 在处理长 prompt 与交互式 request 之间取得平衡

  6. 前缀缓存集成 需要 Scheduler 感知缓存状态,以实现高效的请求接纳控制

下一步

在第四部分中,我们将探索 Request Processing——request 如何从 tokenization 到流式输出在系统中流转,以及跨迭代的状态管理机制。

参考资料


Scheduler 是 vLLM 的大脑,每秒钟要做出成千上万个瞬间决策,在满足延迟 SLA 的同时最大化 GPU 利用率。接下来,我们将看到 request 如何在系统中端到端地流转。