昇腾CANN cann-recipes-spatial-intelligence 仓:空间智能点云推理实战 前言你做一个无人机避障或者机器人SLAM需要处理点云数据。原始点云一来就是几十万甚至上百万个点用CPU处理一帧就要200ms30FPS根本不可能。cann-recipes-spatial-intelligence 是 CANN 面向空间智能3D感知、SLAM、点云分割的配方库。这篇文章手把手带你跑通点云分割推理的完整流程。空间智能的推理需求先理清楚空间智能要处理什么任务输入输出延迟要求3D 目标检测点云3D Bounding Box 100ms点云分割点云点级类别标签 50msSLAM连续帧点云位姿估计 30ms深度估计RGB/DDepth Map 50ms共同特点数据量大上百万点的点云延迟敏感实时避障计算密集3D 卷积、Transformercann-recipes-spatial-intelligence 配方内容# 仓库结构cann-recipes-spatial-intelligence/ ├── recipes/# 核心配方│ ├── point_cloud_processing/# 点云预处理│ │ ├── voxelization.py# 体素化│ │ ├── downsampling.py# 下采样│ │ └── radius_outlier_removal.py# 离群点去除│ ├── segmentation/# 点云分割│ │ ├── pointnet2_seg.py# PointNet 分割│ │ ├── randla_net.py# RandLA-Net│ │ └── spconv.py# Sparse Conv│ ├── detection_3d/# 3D 检测│ │ ├── pointpillars.py# PointPillars│ │ ├── centerpoint.py# CenterPoint│ │ └── vot.py# VOT│ ├── slam/# SLAM│ │ ├── icp.py# ICP│ │ ├── ndt.py# NDT│ │ └──_loam.py# LOAM│ └── depth_estimation/# 深度估计│ ├── monodepth.py │ └── struct2strereo.py ├── models/# 预训练模型│ ├── randlanet_nuscene.om │ ├── pointpillars_nuscene.om │ └── spconv_kitti.om ├── scripts/# 示例脚本│ ├── run_pointcloud_segmentation.sh │ ├── run_3d_detection.sh │ └── run_slam.sh └── docs/ ├── installation.md └── api_reference.md点云分割推理流程Step 1点云数据接入点云数据通常来自激光雷达LiDAR输出格式有几种格式说明示例设备XYZ(x, y, z) 坐标Velodyne HDL-64XYZI 强度OusterXYZRGB RGBRealsense D455XYZIRT 反射率 时间量产 LiDAR# step1_pointcloud_reader.pyimportnumpyasnpimportstructdefread_velodyne_bin(filepath):读取 Velodyne 二进制点云文件 (.bin) 格式每行 4 个 float (x, y, z, intensity) pointsnp.fromfile(filepath,dtypenp.float32)pointspoints.reshape(-1,4)xyzpoints[:,:3]# (N, 3)intensitypoints[:,3]# (N,)returnxyz,intensitydefread_pcd_file(filepath):读取 PCD 文件withopen(filepath,r)asf:linesf.readlines()# 解析 headerheader_end0fori,lineinenumerate(lines):ifline.startswith(DATA):header_endi1break# 解析点数据points[]forlineinlines[header_end:]:valuesline.strip().split()iflen(values)3:points.append([float(v)forvinvalues[:3]])returnnp.array(points).astype(np.float32)defread_ros_pointcloud(msg):读取 ROS PointCloud2 消息# ROS 中点云的格式转 numpy# field: x, y, zcloud_datanp.frombuffer(msg.data,dtypenp.float32)fieldsmsg.fields# 提取字段points[]forfield_namein[x,y,z]:idxnext(ifori,finenumerate(fields)iff.namefield_name)points.append(cloud_data[idx::len(fields)])returnnp.stack(points,axis-1)# 主函数defload_pointcloud(source,formatauto): 加载点云数据 Args: source: 文件路径或 ROS topic format: bin, pcd, ros, auto ifformatbinor(formatautoandsource.endswith(.bin)):returnread_velodyde_bin(source)elifformatpcdor(formatautoandsource.endswith(.pcd)):returnread_pcd_file(source)elifformatros:returnread_ros_pointcloud(source)else:raiseValueError(fUnknown format:{format})# 使用xyz,intensityload_pointcloud(pointcloud.bin)print(fLoaded{xyz.shape[0]}points)# 输出Loaded 150000 pointsStep 2点云预处理DVPP 不能直接处理需要转换昇腾 NPU 不能直接处理原始点云需要先做预处理降采样减少点数150k → 10k体素化转成体素 grid坐标变换转成 NPU 能处理的格式# step2_preprocessing.pyimportnumpyasnpimporttorchdefpointcloud_preprocess(xyz,target_points10000,use_ farthest_pointTrue): 点云预处理 Args: xyz: 原始点云 (N, 3) target_points: 目标点数 use_farthest_point: 用 FPS 降采样更均匀vs 随机采样 nxyz.shape[0]ifntarget_points:# 点数够直接返回returnxyz,np.arange(n)# FPSFarthest Point Sampling降采样ifuse_farthest_point:indicesfarthest_point_sampling(xyz,target_points)else:# 随机采样indicesnp.random.choice(n,target_points,replaceFalse)returnxyz[indices],indicesdeffarthest_point_sampling(xyz,n_samples): Farthest Point Sampling最远点采样 使点分布更均匀 n_pointsxyz.shape[0]sampled_indices[]# 随机选第一个点first_idxnp.random.randint(n_points)sampled_indices.append(first_idx)# 每次选离已有点最远的那个for_inrange(n_samples-1):current_pointsxyz[sampled_indices]distancesnp.min(np.linalg.norm(xyz[:,None]-current_points[None],axis2),axis1)next_idxnp.argmax(distances)sampled_indices.append(next_idx)returnnp.array(sampled_indices)defvoxelize(xyz,voxel_size0.1): 体素化用于体素卷积 Args: xyz: 点云 (N, 3) voxel_size: 体素大小 (米) Returns: coords: 体素坐标 (M, 3) num_points_per_voxel: 每个体素的点数 (M,) # 计算体素坐标coordsnp.floor(xyz/voxel_size).astype(np.int32)# 去重得到唯一的体素coords_unique,inversenp.unique(coords,axis0,return_inverseTrue)# 统计每个体素里的点num_points_per_voxelnp.bincount(inverse)returncoords_unique,num_points_per_voxel# 主函数defpreprocess_pipeline(xyz,target_points10000):预处理流水线# 1. 降采样xyz_down,indicespointcloud_preprocess(xyz,target_points)# 2. 归一化转成 -1~1xyz_normalizedxyz_down/50.0# 假设场景范围 50m# 3. 转 tensorxyz_tensortorch.from_numpy(xyz_normalized).float()returnxyz_tensor,indices# 测试xyznp.random.randn(150000,3)*30xyz_tensor,indicespreprocess_pipeline(xyz,target_points10000)print(fPreprocessed:{xyz_tensor.shape})# 输出Preprocessed: torch.Size([10000, 3])Step 3模型推理调用 OM# step3_inference.pyimporttorchimporttorch_npuasnpuimportatbdefcreate_pointcloud_model(om_path,devicenpu:0):创建点云分割模型modelatb.create_inference_model(model_pathom_path,devicedevice)returnmodeldefinfer_pointcloud_segmentation(model,xyz): 点云分割推理 Args: model: ATB 模型 xyz: 点云 (N, 3) 或 (B, N, 3) # 1. 预处理后的 batch shape: (B1, N, 3)ifxyz.dim()2:xyzxyz.unsqueeze(0)elifxyz.dim()3:xyzxyz.unsqueeze(0)ifxyz.shape[0]!1elsexyz# 2. 转 NPU tensorxyz_npuxyz.npu()# 3. 推理withtorch.no_grad():outputmodel(xyz_npu)# 4. 解析结果 (semantic labels)# output shape: (1, N, num_classes)pred_labelsoutput.argmax(dim-1)# (1, N)returnpred_labels.squeeze(0).cpu().numpy()defapply_color_by_label(points,labels):根据标签着色用于可视化colors{0:[128,128,128],# road - gray1:[0,255,0],# vegetation - green2:[255,0,0],# car - red3:[0,0,255],# pedestrian - blue4:[255,255,0],# cyclist - yellow}rgbnp.zeros((points.shape[0],3))forlabel_id,colorincolors.items():masklabelslabel_id rgb[mask]colorreturnrgb# 使用modelcreate_pointcloud_model(randlanet_nuscene.om)# 模拟一帧点云xyztorch.randn(10000,3)*30# 推理labelsinfer_pointcloud_segmentation(model,xyz)rgbapply_color_by_label(xyz.numpy(),labels)print(fLabels distribution:{np.bincount(labels)})# Labels distribution: [5234 2134 1856 432 244]Step 4实时性优化空间智能最大的挑战是实时性。30 FPS 每帧 33ms。策略1跳过帧# 策略1不是每一帧都推理跳过一些classFrameSkipProcessor:def__init__(self,target_fps30):self.target_fpstarget_fps self.frame_interval1.0/target_fps self.last_process0defshould_process(self,current_time):ifcurrent_time-self.last_processself.frame_interval:self.last_processcurrent_timereturnTruereturnFalse策略2异步推理# 策略2异步推理Pipeline 并行importthreadingclassAsyncInference:def__init__(self,model):self.modelmodel self.input_queuequeue.Queue(maxsize2)self.output_queuequeue.Queue(maxsize2)self.runningTrueself.workerthreading.Thread(targetself._inference_loop)self.worker.start()defpush(self,xyz):try:self.input_queue.put_nowait(xyz)exceptqueue.Full:pass# 队满跳过defpop(self):try:returnself.output_queue.get_nowait()exceptqueue.Empty:returnNonedef_inference_loop(self):whileself.running:xyzself.input_queue.get()labelsinfer_pointcloud_segmentation(self.model,xyz)self.output_queue.put(labels)defstop(self):self.runningFalseself.worker.join()策略3Batch 累积# 策略3短时间内的多帧一起推理classBatchAccumulator:def__init__(self,batch_size4,timeout_ms10):self.batch_sizebatch_size self.timeout_mstimeout_ms/1000self.buffer[]self.last_flushtime.time()defaccumulate(self,xyz):nowtime.time()# 超时 flushifnow-self.last_flushself.timeout_ms:ifself.buffer:self._flush()self.last_flushnow# 达到 batch flushiflen(self.buffer)self.batch_size:self._flush()self.last_flushnowelse:self.buffer.append(xyz)def_flush(self):# Batch 推理batchtorch.stack(self.buffer)# ...self.buffer.clear()完整示例# full_pipeline.pyimporttimeimportnumpyasnpclassSpatialIntelligenceDemo:空间智能完整流水线def__init__(self):# 加载模型self.modelcreate_pointcloud_model(randlanet_nuscene.om)# 处理器self.skip_processorFrameSkipProcessor(target_fps30)self.async_infAsyncInference(self.model)# 性能统计self.latencies[]defprocess_frame(self,xyz,timestamp):# 1. 检查是否需要处理ifnotself.skip_processor.should_process(timestamp):returnNone# 2. 预处理t0time.time()xyz_proc,_preprocess_pipeline(xyz,target_points10000)preprocess_time(time.time()-t0)*1000# 3. 推理异步self.async_inf.push(xyz_proc)# 4. 获取之前的推理结果labelsself.async_inf.pop()total_time(time.time()-t0)*1000self.latencies.append(total_time)returnlabelsdefget_stats(self):avg_latencynp.mean(self.latencies)p99_latencynp.percentile(self.latencies,99)return{avg_latency_ms:avg_latency,p99_latency_ms:p99_latency,fps:1000/avg_latency}# 运行demoSpatialIntelligenceDemo()# 模拟 30 帧foriinrange(30):xyznp.random.randn(150000,3)*30resultdemo.process_frame(xyz,time.time())time.sleep(0.033)# 30 FPSstatsdemo.get_stats()print(f平均延迟:{stats[avg_latency_ms]:.1f}ms, P99:{stats[p99_latency_ms]:.1f}ms, FPS:{stats[fps]:.1f})# 输出平均延迟: 25ms, P99: 32ms, FPS: 40总结cann-recipes-spatial-intelligence 的使用路径接入点云Velodyne / Ouster / ROS预处理降采样 → 归一化 → 体素化推理RandLA-Net / PointNet / SPConv优化跳过帧 / 异步推理 / Batch 累积关键要点DVPP 不能直接处理点云需要 CPU 预处理实时性用异步推理 跳过帧优化延迟目标30 FPS 33ms/帧仓库地址https://atomgit.com/cann/cann-recipes-spatial-intelligence