CANN 异步推理:隐藏推理延迟提升吞吐量的完整方案 一、同步 vs 异步推理1.1 执行模型对比同步推理 (Synchronous): 请求 → 等待推理 → 返回结果 延迟 预处理 推理 后处理 特点: 简单但 CPU 空闲等待 NPU 异步推理 (Asynchronous): 请求 → 提交推理 → 立即返回 结果就绪 → 回调通知 / 轮询获取 延迟感知 ≈ 预处理 (推理在后台) 特点: 复杂但 CPU/NPU 并行工作 ┌─────────────────────────────────┐ │ 同步: [CPU][NPU][CPU][NPU] │ │ 异步: [CPU][CPU][CPU][CPU] │ │ [NPU ][NPU ][NPU ] │ └─────────────────────────────────┘1.2 适用场景同步推理适用: - 单次推理对延迟敏感 - 简单应用不需要高吞吐 - 调试阶段 异步推理适用: - 高并发服务 - 流水线推理 - 多模型串行执行 - CPU/NPU 异构协同二、CANN 异步 API2.1 基础异步推理importtorch.npuimportthreadingclassAsyncInferenceEngine:异步推理引擎def__init__(self,model):self.modelmodel self.streamtorch.npu.Stream()self.lockthreading.Lock()definfer_async(self,input_data,callbackNone):异步推理withtorch.npu.stream(self.stream):outputself.model(input_data)ifcallback:# 注册回调 (Stream 完成后执行)eventtorch.npu.Event()event.record(self.stream)callback_threadthreading.Thread(targetself._wait_and_callback,args(event,callback,output))callback_thread.start()returnoutputdef_wait_and_callback(self,event,callback,output):等待完成并执行回调event.synchronize()callback(output)# 使用示例engineAsyncInferenceEngine(model)defon_complete(output):print(f推理完成:{output.shape})# 异步推理outputengine.infer_async(input_data,callbackon_complete)# CPU 继续其他工作process_other_tasks()2.2 Future 模式importconcurrent.futuresclassFutureInferenceEngine:Future 模式异步推理def__init__(self,model):self.modelmodel self.executorconcurrent.futures.ThreadPoolExecutor(max_workers4)self.streamtorch.npu.Stream()definfer(self,input_data):提交异步推理任务futureself.executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):实际推理执行withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutputdefinfer_batch(self,input_list):批量异步推理futures[self.infer(inp)forinpininput_list]results[f.result()forfinfutures]returnresults# 使用示例engineFutureInferenceEngine(model)# 提交多个推理任务future1engine.infer(input1)future2engine.infer(input2)future3engine.infer(input3)# 处理其他任务process_other_tasks()# 获取结果result1future1.result(timeout5.0)result2future2.result(timeout5.0)result3future3.result(timeout5.0)print(f结果:{result1.shape},{result2.shape},{result3.shape})三、生产者-消费者模型3.1 异步推理队列importqueueimportthreadingclassAsyncInferenceQueue:异步推理队列def__init__(self,model,max_queue_size100):self.modelmodel self.request_queuequeue.Queue(maxsizemax_queue_size)self.result_store{}self.lockthreading.Lock()self.runningFalseself.worker_threadNonedefstart(self):启动推理工作线程self.runningTrueself.worker_threadthreading.Thread(targetself._worker,daemonTrue)self.worker_thread.start()print(异步推理队列已启动)defstop(self):停止推理工作线程self.runningFalseifself.worker_thread:self.worker_thread.join(timeout5.0)print(异步推理队列已停止)defsubmit(self,request_id,input_data):提交推理请求self.request_queue.put({id:request_id,input:input_data,submitted_at:time.time()})withself.lock:self.result_store[request_id]{status:pending,submitted_at:time.time()}returnrequest_iddefget_result(self,request_id,timeout10.0):获取推理结果start_timetime.time()whiletime.time()-start_timetimeout:withself.lock:ifrequest_idinself.result_store:resultself.result_store[request_id]ifresult[status]completed:returnresult[output]elifresult[status]failed:raiseRuntimeError(f推理失败:{result.get(error)})time.sleep(0.01)raiseTimeoutError(f推理超时:{request_id})def_worker(self):推理工作线程whileself.running:try:requestself.request_queue.get(timeout1.0)request_idrequest[id]input_datarequest[input]try:# 执行推理outputself.model(input_data)# 存储结果withself.lock:self.result_store[request_id]{status:completed,output:output,completed_at:time.time()}exceptExceptionase:withself.lock:self.result_store[request_id]{status:failed,error:str(e)}exceptqueue.Empty:continue# 使用示例queue_engineAsyncInferenceQueue(model)queue_engine.start()# 提交请求req_idqueue_engine.submit(req_001,input_data)# 处理其他任务process_other_tasks()# 获取结果resultqueue_engine.get_result(req_id)print(f结果:{result.shape})# 停止queue_engine.stop()3.2 多消费者模型classMultiConsumerInference:多消费者异步推理def__init__(self,model,num_consumers4):self.modelmodel self.request_queuequeue.Queue(maxsize1000)self.result_store{}self.lockthreading.Lock()self.consumers[]self.num_consumersnum_consumersdefstart(self):启动多个消费者foriinrange(self.num_consumers):consumerthreading.Thread(targetself._consumer_worker,args(i,),daemonTrue)self.consumers.append(consumer)consumer.start()print(f已启动{self.num_consumers}个消费者)def_consumer_worker(self,consumer_id):消费者工作线程streamtorch.npu.Stream()whileTrue:try:requestself.request_queue.get(timeout1.0)request_idrequest[id]input_datarequest[input]try:withtorch.npu.stream(stream):outputself.model(input_data)withself.lock:self.result_store[request_id]{status:completed,output:output,consumer_id:consumer_id}exceptExceptionase:withself.lock:self.result_store[request_id]{status:failed,error:str(e),consumer_id:consumer_id}exceptqueue.Empty:continuedefsubmit(self,request_id,input_data):提交请求self.request_queue.put({id:request_id,input:input_data})withself.lock:self.result_store[request_id]{status:pending}returnrequest_iddefget_result(self,request_id,timeout10.0):获取结果start_timetime.time()whiletime.time()-start_timetimeout:withself.lock:ifrequest_idinself.result_store:resultself.result_store[request_id]ifresult[status]in[completed,failed]:returnresult time.sleep(0.01)raiseTimeoutError(f推理超时:{request_id})# 使用示例multi_consumerMultiConsumerInference(model,num_consumers4)multi_consumer.start()# 提交多个请求foriinrange(100):multi_consumer.submit(freq_{i},input_data)# 获取结果foriinrange(100):resultmulti_consumer.get_result(freq_{i})print(freq_{i}:{result[status]})四、流水线推理架构4.1 三阶段流水线classInferencePipeline:三阶段推理流水线def__init__(self,preprocessor,model,postprocessor):self.preprocessorpreprocessor self.modelmodel self.postprocessorpostprocessor# 三个阶段各自的 Streamself.preprocess_streamtorch.npu.Stream()self.inference_streamtorch.npu.Stream()self.postprocess_streamtorch.npu.Stream()# 事件同步self.preprocess_donetorch.npu.Event()self.inference_donetorch.npu.Event()definfer(self,raw_data):流水线推理# 阶段 1: 预处理withtorch.npu.stream(self.preprocess_stream):preprocessedself.preprocessor(raw_data)self.preprocess_done.record(self.preprocess_stream)# 阶段 2: 推理 (等待预处理完成)withtorch.npu.stream(self.inference_stream):self.inference_stream.wait_event(self.preprocess_done)outputself.model(preprocessed)self.inference_done.record(self.inference_stream)# 阶段 3: 后处理 (等待推理完成)withtorch.npu.stream(self.postprocess_stream):self.postprocess_stream.wait_event(self.inference_done)resultself.postprocessor(output)returnresultdefinfer_batch(self,raw_data_list):批量流水线推理results[]forraw_datainraw_data_list:resultself.infer(raw_data)results.append(result)torch.npu.synchronize()returnresults# 使用示例pipelineInferencePipeline(preprocessor,model,postprocessor)resultspipeline.infer_batch(raw_data_list)五、错误处理与超时控制5.1 超时控制classTimeoutInferenceEngine:带超时控制的异步推理def__init__(self,model,default_timeout5.0):self.modelmodel self.default_timeoutdefault_timeout self.streamtorch.npu.Stream()definfer_with_timeout(self,input_data,timeoutNone):带超时的推理timeouttimeoutorself.default_timeout futureself._submit_inference(input_data)try:resultfuture.result(timeouttimeout)returnresultexceptconcurrent.futures.TimeoutError:# 超时处理print(f推理超时 ({timeout}s))returnNonedef_submit_inference(self,input_data):提交推理任务executorconcurrent.futures.ThreadPoolExecutor(max_workers1)futureexecutor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):实际推理withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutput# 使用示例engineTimeoutInferenceEngine(model,default_timeout5.0)resultengine.infer_with_timeout(input_data,timeout3.0)5.2 重试机制classRetryInferenceEngine:带重试机制的异步推理def__init__(self,model,max_retries3,retry_delay1.0):self.modelmodel self.max_retriesmax_retries self.retry_delayretry_delay self.streamtorch.npu.Stream()definfer_with_retry(self,input_data):带重试的推理forattemptinrange(self.max_retries):try:withtorch.npu.stream(self.stream):outputself.model(input_data)returnoutputexceptExceptionase:ifattemptself.max_retries-1:print(f推理失败 (尝试{attempt1}/{self.max_retries}):{e})time.sleep(self.retry_delay)else:raiseRuntimeError(f推理失败已重试{self.max_retries}次:{e})# 使用示例engineRetryInferenceEngine(model,max_retries3)resultengine.infer_with_retry(input_data)六、常见问题问题原因解决方案异步结果获取失败Stream 未同步使用 Event 同步内存泄漏异步任务未清理及时清理过期任务推理顺序错乱未使用请求 ID使用请求 ID 跟踪超时不生效超时设置太长调整超时参数重试风暴重试间隔太短增加退避策略相关仓库ascend-cl- 异步推理接口 https://gitee.com/ascend/ascend-cltorch_npu- Stream 管理 https://gitee.com/ascend/torch_nputorch_npu- Event 同步 https://gitee.com/ascend/torch_npu