Python读取光谱仪数据的完整代码示例 一、前言上一篇文章我们介绍了光谱仪的工作原理本文将手把手教你用 Python 连接辰昶光纤光谱仪实现光谱数据的实时读取、显示和保存。本文亮点✅ 完整的设备连接代码✅ 实时光谱采集✅ 数据可视化✅ 文件保存与管理✅ 错误处理机制二、开发环境准备2.1 硬件要求辰昶光纤光谱仪本文以 EQ2000/ER4000 系列为例USB 数据线或以太网连接光纤跳线推荐 SMA905 接口2.2 软件依赖pip install numpy pandas matplotlib pyserial pyusb依赖说明包名用途numpy数值计算pandas数据存储matplotlib光谱可视化pyserial串口通信pyusbUSB设备连接三、基础连接代码3.1 USB连接方式import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import time class ChopticsSpectrometer: 辰昶光纤光谱仪 Python 控制类 支持 EQ2000、ER4000、EK2000 Pro 系列 def __init__(self, serial_portCOM3, baudrate115200): self.serial_port serial_port self.baudrate baudrate self.is_connected False self.serial None def connect(self): 连接光谱仪 import serial try: self.serial serial.Serial( portself.serial_port, baudrateself.baudrate, bytesizeserial.EIGHTBITS, parityserial.PARITY_NONE, stopbitsserial.STOPBITS_ONE, timeout1.0 ) self.is_connected True print(f✅ 光谱仪已连接: {self.serial_port}) # 获取设备信息 self._get_device_info() return True except serial.SerialException as e: print(f❌ 连接失败: {e}) return False def _get_device_info(self): 获取设备基本信息 # 发送查询命令具体命令格式请参考辰昶通信协议文档 self.serial.write(b*IDN?\r\n) response self.serial.readline().decode(utf-8).strip() print(f设备信息: {response}) def set_integration_time(self, time_ms): 设置积分时间毫秒 辰昶光谱仪范围: 1ms ~ 65000ms command fINTEGRATION,{time_ms}\r\n self.serial.write(command.encode()) response self.serial.readline().decode(utf-8).strip() print(f积分时间设置: {response}) def acquire_spectrum(self): 获取单次光谱数据 返回: (wavelengths, intensities) if not self.is_connected: raise RuntimeError(请先连接光谱仪) # 发送采集命令 self.serial.write(bSPECTRUM\r\n) # 读取波长数据 num_pixels 2048 # EQ2000系列 wavelength_data [] for _ in range(num_pixels): line self.serial.readline().decode(utf-8).strip() if line: wavelength_data.append(float(line)) # 读取强度数据 intensity_data [] for _ in range(num_pixels): line self.serial.readline().decode(utf-8).strip() if line: intensity_data.append(float(line)) return np.array(wavelength_data), np.array(intensity_data) def close(self): 关闭连接 if self.serial and self.serial.is_open: self.serial.close() self.is_connected False print( 光谱仪连接已关闭) # 使用示例 if __name__ __main__: spectrometer ChopticsSpectrometer(serial_portCOM3) if spectrometer.connect(): # 设置积分时间 100ms spectrometer.set_integration_time(100) # 获取光谱 wavelengths, intensities spectrometer.acquire_spectrum() # 绘制光谱图 plt.figure(figsize(12, 6)) plt.plot(wavelengths, intensities, b-, linewidth1) plt.xlabel(波长 (nm), fontsize12) plt.ylabel(强度 (counts), fontsize12) plt.title(光纤光谱仪实时数据, fontsize14) plt.grid(True, alpha0.3) plt.show() spectrometer.close()3.2 以太网连接方式工业级import socket import struct class EthernetSpectrometer: 以太网连接光谱仪适合工业在线检测 支持 TCP/IP 协议 def __init__(self, ip_address192.168.1.100, port5000): self.ip_address ip_address self.port port self.socket None def connect(self): 建立TCP连接 try: self.socket socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.ip_address, self.port)) self.socket.settimeout(5.0) print(f✅ 已连接到 {self.ip_address}:{self.port}) return True except Exception as e: print(f❌ 连接失败: {e}) return False def send_command(self, command): 发送命令 self.socket.sendall((command \r\n).encode()) def receive_data(self, expected_bytes): 接收原始数据 data b while len(data) expected_bytes: packet self.socket.recv(4096) if not packet: break data packet return data def acquire_spectrum(self): 采集光谱数据 self.send_command(ACQUIRE) # EQ2000: 2048像素16位 4096字节 raw_data self.receive_data(4096) # 解析数据 intensities np.array(struct.unpack(f{2048}H, raw_data)) # 生成波长数组根据校准参数 start_wavelength 200 # nm end_wavelength 1100 # nm wavelengths np.linspace(start_wavelength, end_wavelength, 2048) return wavelengths, intensities def close(self): if self.socket: self.socket.close()四、实时显示与数据处理4.1 实时采集动画class RealTimeSpectrometerDisplay: 实时光谱显示类 支持动态更新、峰值标记、数据统计 def __init__(self, spectrometer, interval100): Args: spectrometer: ChopticsSpectrometer 实例 interval: 采集间隔毫秒 self.spectrometer spectrometer self.interval interval self.spectra_history [] # 存储历史数据 def update_plot(self, frame): 更新光谱曲线 try: wavelengths, intensities self.spectrometer.acquire_spectrum() # 绘制当前光谱 self.ax.clear() self.ax.plot(wavelengths, intensities, b-, linewidth1.5, label实时光谱) # 标记峰值 peak_indices, _ find_peaks(intensities, height1000) if len(peak_indices) 0: peak_wavelengths wavelengths[peak_indices] peak_intensities intensities[peak_indices] self.ax.scatter(peak_wavelengths, peak_intensities, cred, s50, zorder5, label峰值) # 标注峰值波长 for w, i in zip(peak_wavelengths, peak_intensities): self.ax.annotate(f{w:.1f}nm, (w, i), textcoordsoffset points, xytext(0,10), hacenter, fontsize9) # 显示统计信息 stats_text f最大值: {intensities.max()}\n stats_text f平均值: {intensities.mean():.1f}\n stats_text f标准差: {intensities.std():.1f} self.ax.text(0.02, 0.98, stats_text, transformself.ax.transAxes, verticalalignmenttop, bboxdict(boxstyleround, facecolorwheat, alpha0.5)) self.ax.set_xlabel(波长 (nm), fontsize12) self.ax.set_ylabel(强度 (counts), fontsize12) self.ax.set_title(光纤光谱仪实时监测, fontsize14) self.ax.grid(True, alpha0.3) self.ax.legend(locupper right) self.ax.set_xlim(200, 1100) # 保存历史数据 self.spectra_history.append(intensities.copy()) except Exception as e: print(f采集异常: {e}) def start(self): 启动实时显示 fig, self.ax plt.subplots(figsize(14, 6)) ani FuncAnimation(fig, self.update_plot, intervalself.interval, save_count100) plt.show() def save_data(self, filenamespectrum_data.csv): 保存数据到CSV import pandas as pd if not self.spectra_history: print(无历史数据) return df pd.DataFrame(self.spectra_history) df.to_csv(filename, indexFalse) print(f✅ 数据已保存: {filename}) # 启动实时显示 if __name__ __main__: spec ChopticsSpectrometer(serial_portCOM3) spec.connect() spec.set_integration_time(50) # 50ms积分时间 display RealTimeSpectrometerDisplay(spec, interval100) display.start()4.2 数据平滑与去噪from scipy.signal import savgol_filter, medfilt from scipy.ndimage import gaussian_filter1d def preprocess_spectrum(intensities, methodsg): 光谱数据预处理 Parameters: intensities: 原始光谱强度数据 method: sg (Savitzky-Golay) 或 gauss (高斯) 或 median (中值) Returns: 平滑后的光谱数据 if method sg: # Savitzky-Golay滤波器保持峰形 return savgol_filter(intensities, window_length11, polyorder3) elif method gauss: # 高斯平滑 return gaussian_filter1d(intensities, sigma2) elif method median: # 中值滤波去除脉冲噪声 return medfilt(intensities, kernel_size5) else: return intensities def remove_baseline(intensities, lambda_param1e5): 基线校正AsLS算法 用于去除荧光背景、散射背景等 from scipy import sparse from scipy.sparse.linalg import spsolve n len(intensities) L sparse.diags([1, -2, 1], [0, -1, -2], shape(n, n-2)) L lambda_param * L.dot(L.transpose()) D sparse.diags([1, -1], [0, -1], shape(n, n)) w np.ones(n) W sparse.diags(w, 0) for _ in range(10): # 迭代优化 W.setdiag(w) Z W L baseline spsolve(Z, w * intensities) w 1 * (intensities baseline) 0.001 * (intensities baseline) return intensities - baseline五、数据保存与导出5.1 多种格式保存import json from datetime import datetime class SpectrumDataManager: 光谱数据管理器 def __init__(self, base_path./data): self.base_path base_path self.current_session datetime.now().strftime(%Y%m%d_%H%M%S) def save_csv(self, wavelengths, intensities, filenameNone): 保存为CSV格式 if filename is None: filename fspectrum_{self.current_session}.csv filepath os.path.join(self.base_path, filename) df pd.DataFrame({ Wavelength_nm: wavelengths, Intensity_counts: intensities }) df.to_csv(filepath, indexFalse) print(f✅ CSV已保存: {filepath}) return filepath def save_numpy(self, wavelengths, intensities, filenameNone): 保存为NumPy格式保留高精度 if filename is None: filename fspectrum_{self.current_session}.npz filepath os.path.join(self.base_path, filename) np.savez(filepath, wavelengthswavelengths, intensitiesintensities) print(f✅ NumPy已保存: {filepath}) return filepath def save_with_metadata(self, wavelengths, intensities, metadataNone): 保存带元数据的光谱文件 包含仪器参数、测量条件等信息 if metadata is None: metadata {} # 元数据 metadata.update({ timestamp: datetime.now().isoformat(), instrument: 辰昶光纤光谱仪, num_points: len(wavelengths), wavelength_range: f{wavelengths.min():.2f}-{wavelengths.max():.2f}nm }) filename fspectrum_{self.current_session}_meta.json filepath os.path.join(self.base_path, filename) data { metadata: metadata, wavelengths: wavelengths.tolist(), intensities: intensities.tolist() } with open(filepath, w, encodingutf-8) as f: json.dump(data, f, indent2, ensure_asciiFalse) print(f✅ 带元数据文件已保存: {filepath}) return filepath六、工业级应用示例6.1 在线浓度监测class ConcentrationMonitor: 在线浓度监测系统 基于Beer-Lambert定律: A ε·c·l def __init__(self, spectrometer, wavelength_absorbance): self.spectrometer spectrometer self.wavelength_absorbance wavelength_absorbance self.calibration_curve {} # 浓度标定曲线 def set_calibration(self, concentrations, absorbances): 设置标定曲线 concentrations: 标准浓度列表 absorbances: 对应的吸光度值 from scipy.optimize import curve_fit def linear(x, a, b): return a * x b popt, _ curve_fit(linear, concentrations, absorbances) self.calibration_curve {a: popt[0], b: popt[1]} print(f标定完成: A {popt[0]:.4f}×C {popt[1]:.4f}) def measure_concentration(self): 测量当前浓度 wavelengths, intensities self.spectrometer.acquire_spectrum() # 计算吸光度需要参考光谱 # A log10(I_ref / I_sample) absorbance np.log10(self.reference_intensity / intensities) # 在指定波长处读取吸光度 idx np.argmin(np.abs(wavelengths - self.wavelength_absorbance)) A absorbance[idx] # 根据标定曲线计算浓度 if self.calibration_curve: C (A - self.calibration_curve[b]) / self.calibration_curve[a] return C, A return None, A七、常见问题与解决方案问题可能原因解决方案连接超时串口被占用检查端口号更换USB口采集数据为0光纤未连接检查光纤接口确保光路导通噪声过大积分时间过短增加积分时间或使用平均采集基线漂移环境温度变化使用基线校正算法通信不稳定USB供电不足使用带供电的USB集线器辰昶仪器提示辰昶光谱仪提供完整的SDK开发包和技术支持包括Python、LabVIEW、C#、Java等多语言接口。八、总结本文提供了完整的 Python 光谱仪数据读取方案✅基础连接类- 支持USB和以太网两种连接方式✅实时显示- 动态更新、峰值标记、统计信息✅数据预处理- 平滑、去噪、基线校正✅多格式保存- CSV、NumPy、带元数据的JSON✅工业应用- 浓度监测、在线分析示例