从 Demo 到产品:AI 落地的几个工程坑 从 Demo 到产品AI 落地的几个工程坑一、Demo 跑通很容易做成产品很难很多团队都有一个共同经历Demo 阶段一切顺利输入规范模型响应正常用户也按预期操作。但一旦要变成正式产品问题就来了——输入异常、模型幻觉、用户乱点、网络超时、并发冲突这些“阴暗路径”在 Demo 里根本遇不到。从 0 到 Demo 的距离往往远不如从 Demo 到产品的距离。最头疼的是延迟的不确定性。本地跑模型响应时间稳定在 2 秒左右一旦部署到云端受网络波动、模型排队、冷启动影响P99 延迟可能直接飙到 15 秒以上。更麻烦的是长尾延迟——99% 的请求 3 秒内完成但剩下的 1% 超过 10 秒。这种分布对用户体验的破坏远比平均延迟上升更严重。成本也是个容易被低估的问题。Demo 阶段免费 API 额度够用产品化后日活一上来Token 消耗线性增长月度费用可能从几十美元跳到几万。还有那些“隐性成本”——向量数据库的存储费、GPU 推理的闲置浪费、Prompt 调试的人力投入在 Demo 阶段根本看不出来。二、三个必须解决的工程问题要把 AI 做成产品可靠性、可观测性和成本控制是绕不开的三件事。flowchart TB subgraph 可靠性工程 R1[输入校验与清洗] R2[模型降级与熔断] R3[输出审核与过滤] R4[重试与超时策略] end subgraph 可观测性工程 O1[请求级追踪TraceID 串联全链路] O2[指标监控延迟/错误率/Token 消耗] O3[日志聚合Prompt 与 Response 快照] O4[告警规则P99 延迟/错误率阈值] end subgraph 成本治理工程 C1[Token 预算与限流] C2[模型路由按任务复杂度分流] C3[缓存策略相似请求复用结果] C4[用量计费按 Token 消耗定价] end R1 R2 R3 R4 -- STABLE[服务稳定] O1 O2 O3 O4 -- OBSERVABLE[问题可定位] C1 C2 C3 C4 -- SUSTAINABLE[商业可持续] STABLE OBSERVABLE SUSTAINABLE -- PRODUCTION[生产级 AI 产品] style STABLE fill:#e8f5e9 style OBSERVABLE fill:#e3f2fd style SUSTAINABLE fill:#fff3e0 style PRODUCTION fill:#ffe082这三者其实是环环相扣的没有可观测性问题发现不了可靠性就无从谈起没有可观测性的数据支撑成本控制就是瞎猜而可靠性如果不受成本约束无限投入虽然能堆出高可用但商业模式跑不通。三、可靠性工程把模型调用变成稳定服务# production/reliability.py — AI 产品的可靠性工程组件 import time import hashlib import json from dataclasses import dataclass, field from typing import Optional, Callable from enum import Enum class ModelTier(Enum): PREMIUM premium # 高质量模型GPT-4, Claude STANDARD standard # 标准模型GPT-4o-mini FALLBACK fallback # 兜底模型本地部署 dataclass class AIRequest: AI 请求的标准化封装 request_id: str prompt: str max_tokens: int 2048 temperature: float 0.7 tier: ModelTier ModelTier.STANDARD metadata: dict field(default_factorydict) dataclass class AIResponse: AI 响应的标准化封装 request_id: str content: str model: str tier: ModelTier input_tokens: int output_tokens: int latency_ms: float filtered: bool False # 是否经过内容过滤 fallback_used: bool False # 是否使用了降级模型 cached: bool False # 是否命中缓存 class AIReliabilityLayer: AI 可靠性层输入校验、降级熔断、输出审核、重试超时 def __init__( self, call_model_fn: Callable, content_filter_fn: Optional[Callable] None, cache_ttl_seconds: int 3600, ): self._call_model call_model_fn self._content_filter content_filter_fn self._cache_ttl cache_ttl_seconds self._cache: dict[str, tuple[float, AIResponse]] {} # 熔断器状态 self._failure_counts: dict[str, int] {} self._circuit_open: dict[str, bool] {} self._last_failure_time: dict[str, float] {} # 降级链配置 self._fallback_chain { ModelTier.PREMIUM: ModelTier.STANDARD, ModelTier.STANDARD: ModelTier.FALLBACK, } async def invoke(self, request: AIRequest) - AIResponse: 执行 AI 调用包含完整的可靠性保障流程 start_time time.perf_counter() # 1. 输入校验与清洗 validated_prompt self._validate_and_sanitize(request.prompt) request.prompt validated_prompt # 2. 缓存检查 cache_key self._compute_cache_key(request) cached self._check_cache(cache_key) if cached: cached.cached True return cached # 3. 熔断检查 if self._is_circuit_open(request.tier): # 当前层级熔断尝试降级 fallback_tier self._fallback_chain.get(request.tier) if fallback_tier and not self._is_circuit_open(fallback_tier): request.tier fallback_tier elif request.tier ! ModelTier.FALLBACK: # 所有层级都熔断使用兜底响应 return self._fallback_response(request, start_time) # 4. 执行模型调用含重试 response await self._call_with_retry(request, max_retries2) # 5. 输出审核 if self._content_filter: filter_result self._content_filter(response.content) if not filter_result.safe: response.content filter_result.replacement response.filtered True # 6. 写入缓存 latency_ms (time.perf_counter() - start_time) * 1000 response.latency_ms latency_ms self._write_cache(cache_key, response) return response def _validate_and_sanitize(self, prompt: str) - str: 输入校验与清洗 # 长度限制 if len(prompt) 50000: prompt prompt[:50000] # 去除控制字符 prompt .join( c for c in prompt if c.isprintable() or c in \n\r\t ) # 检测注入攻击模式 injection_patterns [ ignore previous instructions, 忽略以上指令, system:, ] prompt_lower prompt.lower() for pattern in injection_patterns: if pattern in prompt_lower: # 标记但不拒绝由输出审核兜底 break return prompt.strip() def _compute_cache_key(self, request: AIRequest) - str: 计算缓存键基于 Prompt 和参数的哈希 key_data f{request.prompt}:{request.max_tokens}:{request.temperature}:{request.tier.value} return hashlib.md5(key_data.encode()).hexdigest() def _check_cache(self, key: str) - Optional[AIResponse]: 检查缓存 if key in self._cache: timestamp, response self._cache[key] if time.time() - timestamp self._cache_ttl: return response del self._cache[key] return None def _write_cache(self, key: str, response: AIResponse) - None: 写入缓存 self._cache[key] (time.time(), response) def _is_circuit_open(self, tier: ModelTier) - bool: 检查熔断状态 tier_name tier.value if not self._circuit_open.get(tier_name, False): return False # 熔断 30 秒后半开 last_failure self._last_failure_time.get(tier_name, 0) if time.time() - last_failure 30: self._circuit_open[tier_name] False self._failure_counts[tier_name] 0 return False return True def _record_failure(self, tier: ModelTier) - None: 记录失败更新熔断状态 tier_name tier.value self._failure_counts[tier_name] self._failure_counts.get(tier_name, 0) 1 self._last_failure_time[tier_name] time.time() # 连续 3 次失败触发熔断 if self._failure_counts[tier_name] 3: self._circuit_open[tier_name] True def _record_success(self, tier: ModelTier) - None: 记录成功重置熔断计数 self._failure_counts[tier.value] 0 self._circuit_open[tier.value] False async def _call_with_retry( self, request: AIRequest, max_retries: int, ) - AIResponse: 带重试的模型调用 last_error None for attempt in range(max_retries 1): try: response await self._call_model(request) self._record_success(request.tier) return response except Exception as e: last_error e self._record_failure(request.tier) # 最后一次尝试失败尝试降级 if attempt max_retries: fallback_tier self._fallback_chain.get(request.tier) if fallback_tier and not self._is_circuit_open(fallback_tier): request.tier fallback_tier try: response await self._call_model(request) response.fallback_used True self._record_success(fallback_tier) return response except Exception: pass # 指数退避 if attempt max_retries: wait_time min(2 ** attempt, 8) # 1s, 2s, 4s, 8s time.sleep(wait_time) # 所有尝试失败返回兜底响应 return self._fallback_response(request, time.perf_counter()) def _fallback_response(self, request: AIRequest, start_time: float) - AIResponse: 兜底响应当所有模型不可用时返回 return AIResponse( request_idrequest.request_id, content抱歉服务暂时不可用请稍后重试。, modelfallback, tierModelTier.FALLBACK, input_tokens0, output_tokens0, latency_ms(time.perf_counter() - start_time) * 1000, fallback_usedTrue, )这套可靠性层把输入校验、缓存、熔断、重试、降级和输出审核封装成一个统一的调用接口。业务代码只需要调invoke方法不用管底层的容错逻辑。熔断器采用“连续 3 次失败触发、30 秒后半开”的策略降级链按 Premium→Standard→Fallback 的顺序逐级降级。四、怎么不让钱烧得太快Token 预算与限流给每个用户设个日 Token 预算超了就降级到轻量模型或者提示用户。企业客户可以买更高的预算。限流策略要区分模型层级——Premium 模型的并发限制更严Standard 模型可以宽松些。缓存策略语义缓存Semantic Cache通过向量相似度匹配把相似请求的响应复用。当新请求和缓存里的请求语义相似度超过 0.95 时直接返回缓存结果。这对 FAQ 类请求效果不错缓存命中率能到 40% 以上但开放式对话基本没用。用量计费按 Token 消耗定价是主流但得让用户清楚自己用了多少。建议在响应里带上input_tokens和output_tokens字段在用户面板里提供日/月用量统计和费用预估。五、总结AI 产品化其实就是解决三个问题怎么让服务稳定、怎么让问题可发现、怎么让成本可控。可靠性层把 Demo 级的模型调用升级成生产级服务可观测性确保问题能发现能定位成本治理通过 Token 预算、语义缓存和用量计费让商业模式能跑通。这三件事缺一不可。