LLM 推理性能调优从显存瓶颈到吞吐优化大模型服务的工程化加速一、LLM 推理的性能瓶颈显存墙与计算墙的双重制约大模型推理的性能受两个物理约束制约。显存墙模型权重必须加载到 GPU 显存中才能推理7B 模型需要约 14GB 显存70B 模型需要约 140GB 显存单张 A10080GB无法承载。计算墙自回归生成每个 token 都需要读取全部模型权重计算密度低GPU 的计算单元利用率不足。更具体地推理过程分为两个阶段。预填充Prefill阶段处理输入 prompt 的所有 token计算 KV Cache这一步是计算密集型。解码Decode阶段逐个生成输出 token每步读取 KV Cache 和模型权重这一步是显存带宽密集型。两个阶段的瓶颈不同优化策略也不同。实际生产中推理服务的性能指标是吞吐量tokens/s和首 token 延迟TTFT。优化目标是在满足延迟 SLA 的前提下最大化吞吐量。二、推理性能优化的技术栈LLM 推理性能优化需要在模型层、引擎层和系统层三个层面协同进行。flowchart TD A[LLM 推理性能优化] -- B[模型层优化] A -- C[引擎层优化] A -- D[系统层优化] B -- B1[量化: INT8/INT4 降低显存占用] B -- B2[蒸馏: 小模型替代大模型] B -- B3[剪枝: 移除冗余参数] C -- C1[KV Cache 优化: PagedAttention] C -- C2[连续批处理: Continuous Batching] C -- C3[前缀缓存: 共享 Prompt 的 KV Cache] C -- C4[投机解码: 小模型预测大模型验证] D -- D1[张量并行: 模型切分到多 GPU] D -- D2[流水线并行: 层级切分] D -- D3[显存卸载: GPU↔CPU 数据搬运] style B fill:#e8f5e9 style C fill:#e1f5fe style D fill:#fff3e02.1 KV Cache 与 PagedAttention# paged_attention.py — PagedAttention 的 KV Cache 管理 # 设计意图将 KV Cache 按固定大小的 Block 管理类似操作系统的虚拟内存 # 解决传统 KV Cache 预分配导致的显存浪费和碎片问题 from dataclasses import dataclass, field from typing import Optional import math dataclass class KVBlock: KV Cache 的一个 Block固定大小 block_id: int block_size: int 16 # 每个 Block 存储 16 个 token 的 KV ref_count: int 0 # 引用计数支持共享前缀缓存 is_free: bool True dataclass class KVBlockTable: 单个序列的 KV Block 映射表 sequence_id: int blocks: list[int] field(default_factorylist) # Block ID 列表 num_tokens: int 0 # 当前已使用的 token 数 class PagedAttentionManager: def __init__(self, num_blocks: int, block_size: int 16): self.block_size block_size self.blocks: dict[int, KVBlock] {} self.block_tables: dict[int, KVBlockTable] {} self.free_blocks: list[int] [] # 初始化所有 Block for i in range(num_blocks): self.blocks[i] KVBlock(block_idi, block_sizeblock_size) self.free_blocks.append(i) def allocate(self, sequence_id: int, num_tokens: int) - list[int]: 为序列分配 KV Cache Block num_blocks_needed math.ceil(num_tokens / self.block_size) if len(self.free_blocks) num_blocks_needed: # 显存不足尝试驱逐低优先级序列 self._evict_sequences(num_blocks_needed - len(self.free_blocks)) allocated [] for _ in range(num_blocks_needed): if not self.free_blocks: raise RuntimeError(KV Cache 显存不足无法分配新 Block) block_id self.free_blocks.pop() block self.blocks[block_id] block.is_free False block.ref_count 1 allocated.append(block_id) self.block_tables[sequence_id] KVBlockTable( sequence_idsequence_id, blocksallocated, num_tokensnum_tokens, ) return allocated def append_tokens(self, sequence_id: int, num_new_tokens: int): 为已有序列追加 token可能需要分配新 Block table self.block_tables.get(sequence_id) if not table: return table.num_tokens num_new_tokens needed_blocks math.ceil(table.num_tokens / self.block_size) current_blocks len(table.blocks) if needed_blocks current_blocks: extra_needed needed_blocks - current_blocks for _ in range(extra_needed): if not self.free_blocks: self._evict_sequences(1) block_id self.free_blocks.pop() self.blocks[block_id].is_free False self.blocks[block_id].ref_count 1 table.blocks.append(block_id) def free(self, sequence_id: int): 释放序列的 KV Cache table self.block_tables.pop(sequence_id, None) if not table: return for block_id in table.blocks: block self.blocks[block_id] block.ref_count - 1 if block.ref_count 0: block.is_free True self.free_blocks.append(block_id) def _evict_sequences(self, num_blocks_needed: int): 驱逐低优先级序列释放 Block # 简化策略按序列已生成 token 数排序优先驱逐最长的 sorted_tables sorted( self.block_tables.values(), keylambda t: t.num_tokens, reverseTrue, ) freed 0 for table in sorted_tables: if freed num_blocks_needed: break freed len(table.blocks) self.free(table.sequence_id)2.2 连续批处理# continuous_batching.py — 连续批处理调度器 # 设计意图不同于静态批处理等待所有序列完成才释放资源 # 连续批处理在序列完成后立即插入新请求显著提升 GPU 利用率 import time from dataclasses import dataclass from typing import Optional dataclass class InferenceRequest: request_id: str prompt_tokens: list[int] max_output_tokens: int generated_tokens: list[int] None is_completed: bool False arrival_time: float 0.0 def __post_init__(self): if self.generated_tokens is None: self.generated_tokens [] if self.arrival_time 0.0: self.arrival_time time.time() class ContinuousBatchScheduler: def __init__(self, max_batch_size: int 32): self.max_batch_size max_batch_size self.waiting_queue: list[InferenceRequest] [] self.running_batch: list[InferenceRequest] [] def add_request(self, request: InferenceRequest): 添加推理请求到等待队列 self.waiting_queue.append(request) def schedule(self) - list[InferenceRequest]: 调度下一批推理请求 # 移除已完成的请求释放批次槽位 self.running_batch [ req for req in self.running_batch if not req.is_completed ] # 计算可用槽位 available_slots self.max_batch_size - len(self.running_batch) # 从等待队列中取请求填充槽位 new_requests [] while available_slots 0 and self.waiting_queue: request self.waiting_queue.pop(0) new_requests.append(request) available_slots - 1 self.running_batch.extend(new_requests) return self.running_batch def mark_completed(self, request_id: str): 标记请求完成 for req in self.running_batch: if req.request_id request_id: req.is_completed True break def get_stats(self) - dict: return { waiting: len(self.waiting_queue), running: len(self.running_batch), completed_in_batch: sum(1 for r in self.running_batch if r.is_completed), }三、量化与投机解码3.1 量化策略选择# quantization_config.py — 量化策略配置 # 设计意图根据延迟要求和精度容忍度选择量化方案 # INT4 最大化吞吐但精度损失较大INT8 是平衡选择 from dataclasses import dataclass from enum import Enum class QuantizationMethod(Enum): FP16 fp16 # 无量化基线 INT8_WEIGHT int8_w # 仅权重量化为 INT8 INT8_FULL int8_full # 权重和激活都 INT8 INT4_GPTQ int4_gptq # GPTQ 4-bit 量化 INT4_AWQ int4_awq # AWQ 4-bit 量化 dataclass class QuantizationConfig: method: QuantizationMethod group_size: int 128 # 量化分组大小 desc_act: bool False # GPTQ 的激活排序 vmapped_only: bool False # 仅量化 V 投影 staticmethod def recommend(model_size_b: float, latency_sla_ms: float) - QuantizationConfig: 根据模型大小和延迟 SLA 推荐量化方案 if model_size_b 7: # 小模型INT8 足够精度损失小 return QuantizationConfig(methodQuantizationMethod.INT8_WEIGHT) elif model_size_b 30: # 中等模型INT8 或 INT4-AWQ if latency_sla_ms 200: return QuantizationConfig(methodQuantizationMethod.INT4_AWQ) return QuantizationConfig(methodQuantizationMethod.INT8_WEIGHT) else: # 大模型必须 INT4 才能在有限 GPU 上运行 return QuantizationConfig( methodQuantizationMethod.INT4_AWQ, group_size128, )3.2 投机解码# speculative_decoding.py — 投机解码实现 # 设计意图用小模型快速生成候选 token大模型并行验证 # 接受正确的 token拒绝错误的 token加速生成过程 from typing import Optional class SpeculativeDecoder: def __init__(self, draft_model, target_model, max_spec_tokens: int 5): self.draft_model draft_model # 小模型草稿模型 self.target_model target_model # 大模型目标模型 self.max_spec_tokens max_spec_tokens def generate(self, prompt_tokens: list[int], max_tokens: int) - list[int]: generated [] while len(generated) max_tokens: # 步骤 1草稿模型快速生成 K 个候选 token draft_tokens self.draft_model.generate( prompt_tokens generated, max_tokensself.max_spec_tokens, ) # 步骤 2目标模型并行验证 K 个 token # 一次前向传播同时计算 K1 个位置的概率 target_probs self.target_model.forward( prompt_tokens generated draft_tokens, ) # 步骤 3逐个验证候选 token accepted 0 for i, draft_token in enumerate(draft_tokens): target_prob target_probs[len(generated) i] draft_prob self.draft_model.get_prob( prompt_tokens generated draft_tokens[:i], draft_token, ) # 接受条件目标模型的概率 草稿模型的概率 # 或按概率比例随机接受 acceptance_ratio target_prob / max(draft_prob, 1e-10) if acceptance_ratio 1.0: # 确定接受 generated.append(draft_token) accepted 1 else: # 按概率接受 import random if random.random() acceptance_ratio: generated.append(draft_token) accepted 1 else: # 拒绝从目标模型的分布中采样一个 token corrected_token self._sample_from_target(target_prob) generated.append(corrected_token) break # 如果所有候选都被接受额外生成一个 token if accepted len(draft_tokens): bonus_token self._sample_from_target( target_probs[len(generated)] ) generated.append(bonus_token) return generated[:max_tokens] def _sample_from_target(self, probs) - int: 从目标模型的概率分布中采样 # 简化实现 return 0四、边界分析与架构权衡量化精度损失INT4 量化可能导致模型输出质量下降尤其在数学推理和代码生成等精确性要求高的场景。AWQ 通过保护重要权重减少精度损失但仍需在目标数据集上评测。建议对核心业务场景进行量化前后的对比评测。PagedAttention 的实现复杂度PagedAttention 需要修改注意力计算内核使用 Block 索引替代连续内存访问。这需要编写自定义 CUDA 内核开发和维护成本高。生产环境建议直接使用 vLLM 等已实现 PagedAttention 的推理引擎。投机解码的加速比投机解码的加速比取决于草稿模型与目标模型的一致性。如果草稿模型的候选 token 经常被拒绝投机解码反而会增加延迟因为验证步骤需要额外计算。草稿模型的选择需要在速度和一致性之间权衡。张量并行的通信开销多 GPU 张量并行需要在每层计算后进行 AllReduce 同步通信延迟随 GPU 数量增加而增加。超过 8 卡时通信开销可能成为瓶颈。需要使用 NVLink 等高带宽互联技术降低通信延迟。五、总结LLM 推理性能优化需要在模型层、引擎层和系统层协同进行。模型量化降低显存占用和带宽需求PagedAttention 消除 KV Cache 碎片连续批处理提升 GPU 利用率投机解码加速自回归生成。落地建议优先使用 vLLM 等成熟推理引擎已集成 PagedAttention 和连续批处理7B 以下模型使用 INT8 量化30B 以上模型使用 INT4-AWQ 量化投机解码适用于草稿模型与目标模型一致性高的场景多 GPU 部署优先使用张量并行配合 NVLink 降低通信开销。
LLM 推理性能调优:从显存瓶颈到吞吐优化,大模型服务的工程化加速
发布时间:2026/6/15 21:57:59
LLM 推理性能调优从显存瓶颈到吞吐优化大模型服务的工程化加速一、LLM 推理的性能瓶颈显存墙与计算墙的双重制约大模型推理的性能受两个物理约束制约。显存墙模型权重必须加载到 GPU 显存中才能推理7B 模型需要约 14GB 显存70B 模型需要约 140GB 显存单张 A10080GB无法承载。计算墙自回归生成每个 token 都需要读取全部模型权重计算密度低GPU 的计算单元利用率不足。更具体地推理过程分为两个阶段。预填充Prefill阶段处理输入 prompt 的所有 token计算 KV Cache这一步是计算密集型。解码Decode阶段逐个生成输出 token每步读取 KV Cache 和模型权重这一步是显存带宽密集型。两个阶段的瓶颈不同优化策略也不同。实际生产中推理服务的性能指标是吞吐量tokens/s和首 token 延迟TTFT。优化目标是在满足延迟 SLA 的前提下最大化吞吐量。二、推理性能优化的技术栈LLM 推理性能优化需要在模型层、引擎层和系统层三个层面协同进行。flowchart TD A[LLM 推理性能优化] -- B[模型层优化] A -- C[引擎层优化] A -- D[系统层优化] B -- B1[量化: INT8/INT4 降低显存占用] B -- B2[蒸馏: 小模型替代大模型] B -- B3[剪枝: 移除冗余参数] C -- C1[KV Cache 优化: PagedAttention] C -- C2[连续批处理: Continuous Batching] C -- C3[前缀缓存: 共享 Prompt 的 KV Cache] C -- C4[投机解码: 小模型预测大模型验证] D -- D1[张量并行: 模型切分到多 GPU] D -- D2[流水线并行: 层级切分] D -- D3[显存卸载: GPU↔CPU 数据搬运] style B fill:#e8f5e9 style C fill:#e1f5fe style D fill:#fff3e02.1 KV Cache 与 PagedAttention# paged_attention.py — PagedAttention 的 KV Cache 管理 # 设计意图将 KV Cache 按固定大小的 Block 管理类似操作系统的虚拟内存 # 解决传统 KV Cache 预分配导致的显存浪费和碎片问题 from dataclasses import dataclass, field from typing import Optional import math dataclass class KVBlock: KV Cache 的一个 Block固定大小 block_id: int block_size: int 16 # 每个 Block 存储 16 个 token 的 KV ref_count: int 0 # 引用计数支持共享前缀缓存 is_free: bool True dataclass class KVBlockTable: 单个序列的 KV Block 映射表 sequence_id: int blocks: list[int] field(default_factorylist) # Block ID 列表 num_tokens: int 0 # 当前已使用的 token 数 class PagedAttentionManager: def __init__(self, num_blocks: int, block_size: int 16): self.block_size block_size self.blocks: dict[int, KVBlock] {} self.block_tables: dict[int, KVBlockTable] {} self.free_blocks: list[int] [] # 初始化所有 Block for i in range(num_blocks): self.blocks[i] KVBlock(block_idi, block_sizeblock_size) self.free_blocks.append(i) def allocate(self, sequence_id: int, num_tokens: int) - list[int]: 为序列分配 KV Cache Block num_blocks_needed math.ceil(num_tokens / self.block_size) if len(self.free_blocks) num_blocks_needed: # 显存不足尝试驱逐低优先级序列 self._evict_sequences(num_blocks_needed - len(self.free_blocks)) allocated [] for _ in range(num_blocks_needed): if not self.free_blocks: raise RuntimeError(KV Cache 显存不足无法分配新 Block) block_id self.free_blocks.pop() block self.blocks[block_id] block.is_free False block.ref_count 1 allocated.append(block_id) self.block_tables[sequence_id] KVBlockTable( sequence_idsequence_id, blocksallocated, num_tokensnum_tokens, ) return allocated def append_tokens(self, sequence_id: int, num_new_tokens: int): 为已有序列追加 token可能需要分配新 Block table self.block_tables.get(sequence_id) if not table: return table.num_tokens num_new_tokens needed_blocks math.ceil(table.num_tokens / self.block_size) current_blocks len(table.blocks) if needed_blocks current_blocks: extra_needed needed_blocks - current_blocks for _ in range(extra_needed): if not self.free_blocks: self._evict_sequences(1) block_id self.free_blocks.pop() self.blocks[block_id].is_free False self.blocks[block_id].ref_count 1 table.blocks.append(block_id) def free(self, sequence_id: int): 释放序列的 KV Cache table self.block_tables.pop(sequence_id, None) if not table: return for block_id in table.blocks: block self.blocks[block_id] block.ref_count - 1 if block.ref_count 0: block.is_free True self.free_blocks.append(block_id) def _evict_sequences(self, num_blocks_needed: int): 驱逐低优先级序列释放 Block # 简化策略按序列已生成 token 数排序优先驱逐最长的 sorted_tables sorted( self.block_tables.values(), keylambda t: t.num_tokens, reverseTrue, ) freed 0 for table in sorted_tables: if freed num_blocks_needed: break freed len(table.blocks) self.free(table.sequence_id)2.2 连续批处理# continuous_batching.py — 连续批处理调度器 # 设计意图不同于静态批处理等待所有序列完成才释放资源 # 连续批处理在序列完成后立即插入新请求显著提升 GPU 利用率 import time from dataclasses import dataclass from typing import Optional dataclass class InferenceRequest: request_id: str prompt_tokens: list[int] max_output_tokens: int generated_tokens: list[int] None is_completed: bool False arrival_time: float 0.0 def __post_init__(self): if self.generated_tokens is None: self.generated_tokens [] if self.arrival_time 0.0: self.arrival_time time.time() class ContinuousBatchScheduler: def __init__(self, max_batch_size: int 32): self.max_batch_size max_batch_size self.waiting_queue: list[InferenceRequest] [] self.running_batch: list[InferenceRequest] [] def add_request(self, request: InferenceRequest): 添加推理请求到等待队列 self.waiting_queue.append(request) def schedule(self) - list[InferenceRequest]: 调度下一批推理请求 # 移除已完成的请求释放批次槽位 self.running_batch [ req for req in self.running_batch if not req.is_completed ] # 计算可用槽位 available_slots self.max_batch_size - len(self.running_batch) # 从等待队列中取请求填充槽位 new_requests [] while available_slots 0 and self.waiting_queue: request self.waiting_queue.pop(0) new_requests.append(request) available_slots - 1 self.running_batch.extend(new_requests) return self.running_batch def mark_completed(self, request_id: str): 标记请求完成 for req in self.running_batch: if req.request_id request_id: req.is_completed True break def get_stats(self) - dict: return { waiting: len(self.waiting_queue), running: len(self.running_batch), completed_in_batch: sum(1 for r in self.running_batch if r.is_completed), }三、量化与投机解码3.1 量化策略选择# quantization_config.py — 量化策略配置 # 设计意图根据延迟要求和精度容忍度选择量化方案 # INT4 最大化吞吐但精度损失较大INT8 是平衡选择 from dataclasses import dataclass from enum import Enum class QuantizationMethod(Enum): FP16 fp16 # 无量化基线 INT8_WEIGHT int8_w # 仅权重量化为 INT8 INT8_FULL int8_full # 权重和激活都 INT8 INT4_GPTQ int4_gptq # GPTQ 4-bit 量化 INT4_AWQ int4_awq # AWQ 4-bit 量化 dataclass class QuantizationConfig: method: QuantizationMethod group_size: int 128 # 量化分组大小 desc_act: bool False # GPTQ 的激活排序 vmapped_only: bool False # 仅量化 V 投影 staticmethod def recommend(model_size_b: float, latency_sla_ms: float) - QuantizationConfig: 根据模型大小和延迟 SLA 推荐量化方案 if model_size_b 7: # 小模型INT8 足够精度损失小 return QuantizationConfig(methodQuantizationMethod.INT8_WEIGHT) elif model_size_b 30: # 中等模型INT8 或 INT4-AWQ if latency_sla_ms 200: return QuantizationConfig(methodQuantizationMethod.INT4_AWQ) return QuantizationConfig(methodQuantizationMethod.INT8_WEIGHT) else: # 大模型必须 INT4 才能在有限 GPU 上运行 return QuantizationConfig( methodQuantizationMethod.INT4_AWQ, group_size128, )3.2 投机解码# speculative_decoding.py — 投机解码实现 # 设计意图用小模型快速生成候选 token大模型并行验证 # 接受正确的 token拒绝错误的 token加速生成过程 from typing import Optional class SpeculativeDecoder: def __init__(self, draft_model, target_model, max_spec_tokens: int 5): self.draft_model draft_model # 小模型草稿模型 self.target_model target_model # 大模型目标模型 self.max_spec_tokens max_spec_tokens def generate(self, prompt_tokens: list[int], max_tokens: int) - list[int]: generated [] while len(generated) max_tokens: # 步骤 1草稿模型快速生成 K 个候选 token draft_tokens self.draft_model.generate( prompt_tokens generated, max_tokensself.max_spec_tokens, ) # 步骤 2目标模型并行验证 K 个 token # 一次前向传播同时计算 K1 个位置的概率 target_probs self.target_model.forward( prompt_tokens generated draft_tokens, ) # 步骤 3逐个验证候选 token accepted 0 for i, draft_token in enumerate(draft_tokens): target_prob target_probs[len(generated) i] draft_prob self.draft_model.get_prob( prompt_tokens generated draft_tokens[:i], draft_token, ) # 接受条件目标模型的概率 草稿模型的概率 # 或按概率比例随机接受 acceptance_ratio target_prob / max(draft_prob, 1e-10) if acceptance_ratio 1.0: # 确定接受 generated.append(draft_token) accepted 1 else: # 按概率接受 import random if random.random() acceptance_ratio: generated.append(draft_token) accepted 1 else: # 拒绝从目标模型的分布中采样一个 token corrected_token self._sample_from_target(target_prob) generated.append(corrected_token) break # 如果所有候选都被接受额外生成一个 token if accepted len(draft_tokens): bonus_token self._sample_from_target( target_probs[len(generated)] ) generated.append(bonus_token) return generated[:max_tokens] def _sample_from_target(self, probs) - int: 从目标模型的概率分布中采样 # 简化实现 return 0四、边界分析与架构权衡量化精度损失INT4 量化可能导致模型输出质量下降尤其在数学推理和代码生成等精确性要求高的场景。AWQ 通过保护重要权重减少精度损失但仍需在目标数据集上评测。建议对核心业务场景进行量化前后的对比评测。PagedAttention 的实现复杂度PagedAttention 需要修改注意力计算内核使用 Block 索引替代连续内存访问。这需要编写自定义 CUDA 内核开发和维护成本高。生产环境建议直接使用 vLLM 等已实现 PagedAttention 的推理引擎。投机解码的加速比投机解码的加速比取决于草稿模型与目标模型的一致性。如果草稿模型的候选 token 经常被拒绝投机解码反而会增加延迟因为验证步骤需要额外计算。草稿模型的选择需要在速度和一致性之间权衡。张量并行的通信开销多 GPU 张量并行需要在每层计算后进行 AllReduce 同步通信延迟随 GPU 数量增加而增加。超过 8 卡时通信开销可能成为瓶颈。需要使用 NVLink 等高带宽互联技术降低通信延迟。五、总结LLM 推理性能优化需要在模型层、引擎层和系统层协同进行。模型量化降低显存占用和带宽需求PagedAttention 消除 KV Cache 碎片连续批处理提升 GPU 利用率投机解码加速自回归生成。落地建议优先使用 vLLM 等成熟推理引擎已集成 PagedAttention 和连续批处理7B 以下模型使用 INT8 量化30B 以上模型使用 INT4-AWQ 量化投机解码适用于草稿模型与目标模型一致性高的场景多 GPU 部署优先使用张量并行配合 NVLink 降低通信开销。