编写程序,让智能香烟检测仪检测烟雾时,室内新风系统自动启动提示, 智能香烟烟雾检测与新风联动系统一、实际应用场景描述在现代办公场所、餐厅包间、家庭客厅等封闭或半封闭环境中二手烟已成为严重的室内空气污染问题。传统解决方案存在诸多局限- 被动式排风依赖人工发现烟雾后手动开启新风系统响应滞后- 定时通风无法根据实际污染情况动态调整要么过度通风浪费能源要么通风不足- 单一检测仅能检测烟雾存在与否无法量化污染程度- 无联动机制检测系统与通风设备分离形成信息孤岛- 误报漏报对烹饪油烟、香薰等类似气溶胶缺乏区分能力- 缺乏记录无法追溯污染事件不利于环境管理和健康评估本系统通过Python程序与MQ-2烟雾传感器、SGP30空气质量传感器、继电器模块、OLED显示屏、蜂鸣器集成实现- 实时监测室内烟雾浓度PPM和空气质量指数TVOC/eCO2- 智能判断污染等级自动触发新风系统- 多传感器数据融合提高检测准确性- 分级预警和自动联动控制- 事件日志记录和空气质量趋势分析- 可配置的灵敏度阈值和联动策略二、引入痛点1. 健康隐患二手烟中的PM2.5、尼古丁、一氧化碳等有害物质持续危害人体2. 响应延迟人工发现到开启新风存在时间差期间已造成污染扩散3. 能源浪费固定时长的通风模式在轻度污染时过度运行4. 误判频繁烹饪、焚香等活动触发误报影响系统可信度5. 管理盲区无法记录污染事件难以追溯和评估环境改善效果6. 设备割裂检测、报警、通风各自独立操作繁琐7. 法律合规部分场所对禁烟有严格规定缺乏有效监管手段三、核心逻辑讲解烟雾检测科学原理污染等级 PM2.5等效(μg/m³) 烟雾浓度(PPM) 系统响应 新风档位优 0-35 0-200 绿灯常亮 关闭良 35-75 200-400 绿灯闪烁 低速轻度污染 75-115 400-800 黄灯常亮 中速中度污染 115-150 800-1500 橙灯闪烁 高速重度污染 150 1500 红灯常亮蜂鸣 最大档MQ-2传感器特性曲线Rs/Ro比值 vs 气体浓度(LPG/烟雾)Rs/Ro 1 : 浓度≈200ppm (洁净空气基准)Rs/Ro 2 : 浓度≈400ppm (轻度污染)Rs/Ro 5 : 浓度≈1000ppm (中度污染)Rs/Ro 10 : 浓度≈2000ppm (重度污染)核心算法流程1. 多传感器数据采集- MQ-2模拟量读取烟雾浓度- SGP30数字量读取TVOC/eCO2- DHT22温湿度读取补偿因子2. 数据预处理- 滑动平均滤波消除ADC噪声- 温度湿度补偿MQ-2读数- SGP30基线校准和恢复3. 污染等级判定- 烟雾浓度阈值判断- TVOC超标检测烹饪油烟识别- 综合污染指数计算4. 联动决策引擎- 分级响应策略- 延时启动防误触- 渐进式风量调节5. 执行控制- 继电器控制新风电机- PWM调速实现无级变速- 状态指示灯和OLED显示6. 事件记录与分析- CSV日志存储- 污染趋势图表生成- 统计分析报告关键技术难点及解决方案难点 解决方案MQ-2预热时间长 启动时预热提示等待传感器稳定交叉敏感性 SGP30 TVOC检测区分烟雾vs油烟误报问题 多条件联合判断延时确认机制继电器抖动 软件消抖硬件RC滤波功耗优化 休眠唤醒机制动态采样频率四、代码模块化含安全设计项目结构smart_smoke_detector/├── main.py # 主程序入口├── sensors/ # 传感器模块│ ├── __init__.py│ ├── mq2_sensor.py # MQ-2烟雾传感器驱动│ ├── sgp30_sensor.py # SGP30空气质量传感器驱动│ ├── dht22_sensor.py # DHT22温湿度传感器驱动│ └── sensor_fusion.py # 多传感器数据融合├── actuators/ # 执行器模块│ ├── __init__.py│ ├── relay_controller.py # 继电器控制模块│ ├── pwm_fan.py # PWM风扇调速模块│ ├── oled_display.py # OLED显示屏控制│ ├── buzzer.py # 蜂鸣器控制│ └── led_indicator.py # RGB LED指示灯控制├── core/ # 核心逻辑模块│ ├── __init__.py│ ├── pollution_analyzer.py # 污染分析引擎│ ├── control_strategy.py # 联动控制策略│ └── event_manager.py # 事件管理器├── storage/ # 数据存储模块│ ├── __init__.py│ ├── data_logger.py # 数据记录器│ └── report_generator.py # 报表生成器├── config.py # 系统配置├── requirements.txt # 依赖清单├── README.md # 项目文档└── utils/├── __init__.py├── logger.py # 日志工具└── decorators.py # 装饰器工具核心代码实现1. config.py系统配置智能香烟烟雾检测与新风联动系统配置文件包含传感器、执行器、控制策略等所有配置参数from dataclasses import dataclass, fieldfrom typing import List, Optional, Dict, Any, Tuplefrom enum import Enum, autofrom pathlib import Pathimport osimport mathclass FanSpeed(Enum):风扇速度档位OFF 0LOW 1MEDIUM 2HIGH 3MAX 4class PollutionLevel(Enum):污染等级EXCELLENT auto() # 优GOOD auto() # 良LIGHT_POLLUTION auto() # 轻度污染MODERATE_POLLUTION auto() # 中度污染HEAVY_POLLUTION auto() # 重度污染HAZARDOUS auto() # 危险dataclassclass SensorConfig:传感器配置# MQ-2烟雾传感器mq2_analog_pin: int 0 # ADC通道mq2_heating_time: int 20 # 预热时间秒mq2_ro_clean_air: float 9.83 # 洁净空气中Ro值mq2_attenuation: float 0.5 # 信号衰减系数# SGP30空气质量传感器sgp30_i2c_address: int 0x58sgp30_i2c_bus: int 1sgp30_baseline_file: str sgp30_baseline.json# DHT22温湿度传感器dht22_gpio_pin: int 4dht22_retry_count: int 3# 采样配置sample_interval: float 1.0 # 采样间隔秒filter_window: int 10 # 移动平均窗口temperature_compensation: bool True# 模拟模式simulation_mode: bool True # 开发阶段使用dataclassclass ThresholdConfig:污染阈值配置# 烟雾浓度阈值 (PPM)smoke_excellent_max: float 200.0smoke_good_max: float 400.0smoke_light_max: float 800.0smoke_moderate_max: float 1500.0smoke_heavy_max: float 3000.0# TVOC阈值 (ppb)tvoc_excellent_max: float 50.0tvoc_good_max: float 200.0tvoc_light_max: float 500.0tvoc_moderate_max: float 1000.0# eCO2阈值 (ppm)eco2_excellent_max: float 400.0eco2_good_max: float 1000.0eco2_light_max: float 1500.0eco2_moderate_max: float 2000.0# 综合污染指数权重smoke_weight: float 0.5tvoc_weight: float 0.3eco2_weight: float 0.2# 确认延时防误报detection_delay: float 3.0 # 连续检测到污染才确认recovery_delay: float 10.0 # 连续清洁才恢复dataclassclass ControlConfig:控制策略配置# 继电器配置relay_pin: int 17relay_active_low: bool True # 低电平触发# 风扇PWM配置pwm_pin: int 18pwm_frequency: int 25000 # 25kHz# 各档位PWM占空比fan_speed_duty_cycles: Dict[FanSpeed, float] field(default_factorylambda: {FanSpeed.OFF: 0.0,FanSpeed.LOW: 30.0,FanSpeed.MEDIUM: 55.0,FanSpeed.HIGH: 75.0,FanSpeed.MAX: 100.0})# 联动策略enable_auto_control: bool Trueenable_manual_override: bool Truegradual_speed_change: bool True # 渐进式调速speed_transition_time: float 2.0 # 档位切换过渡时间秒# 延时启动给用户反应时间activation_delay: float 5.0 # 检测到污染后延时启动dataclassclass DisplayConfig:显示配置# OLED配置oled_width: int 128oled_height: int 64oled_i2c_address: int 0x3Coled_i2c_bus: int 1# 显示内容show_smoke_concentration: bool Trueshow_air_quality_index: bool Trueshow_fan_status: bool Trueshow_pollution_level: bool Trueshow_temperature_humidity: bool Trueshow_timestamp: bool True# 刷新率refresh_rate: float 1.0 # 刷新间隔秒dataclassclass AlertConfig:提醒配置# LED配置led_red_pin: int 22led_green_pin: int 27led_blue_pin: int 24# 蜂鸣器配置buzzer_pin: int 23buzzer_frequency_normal: int 1000buzzer_frequency_warning: int 1500buzzer_frequency_alert: int 2000# 提醒模式visual_alert_enabled: bool Trueaudio_alert_enabled: bool Truealert_cooldown: float 5.0 # 提醒冷却时间dataclassclass StorageConfig:存储配置# 数据记录log_enabled: bool Truelog_directory: str logslog_filename: str smoke_detection_log.csvlog_interval: float 60.0 # 记录间隔秒# 事件记录event_log_filename: str events_log.csv# 历史数据max_history_points: int 1000export_enabled: bool Trueexport_format: str csvdataclassclass SystemConfig:系统总配置# 子配置sensor: SensorConfig field(default_factorySensorConfig)thresholds: ThresholdConfig field(default_factoryThresholdConfig)control: ControlConfig field(default_factoryControlConfig)display: DisplayConfig field(default_factoryDisplayConfig)alert: AlertConfig field(default_factoryAlertConfig)storage: StorageConfig field(default_factoryStorageConfig)# 系统设置debug_mode: bool Falselog_level: str INFOlog_file: str smoke_detector.log# 安全限制max_adc_value: int 1023min_adc_value: int 0max_smoke_ppm: float 10000.0min_smoke_ppm: float 0.0max_tvoc_ppb: float 60000.0max_eco2_ppm: float 40000.0# 数据路径data_directory: str datadef __post_init__(self):# 确保目录存在for directory in [self.storage.log_directory, self.data_directory]:Path(directory).mkdir(parentsTrue, exist_okTrue)# 全局配置实例config SystemConfig()# 污染等级颜色映射POLLUTION_LEVEL_COLORS {PollutionLevel.EXCELLENT: (0, 255, 0), # 绿色PollutionLevel.GOOD: (0, 200, 100), # 浅绿PollutionLevel.LIGHT_POLLUTION: (255, 255, 0), # 黄色PollutionLevel.MODERATE_POLLUTION: (255, 165, 0), # 橙色PollutionLevel.HEAVY_POLLUTION: (255, 0, 0), # 红色PollutionLevel.HAZARDOUS: (128, 0, 128), # 紫色}# 风扇档位对应的风速描述FAN_SPEED_DESCRIPTIONS {FanSpeed.OFF: 关闭,FanSpeed.LOW: 低速,FanSpeed.MEDIUM: 中速,FanSpeed.HIGH: 高速,FanSpeed.MAX: 最大档}# 污染等级描述POLLUTION_LEVEL_DESCRIPTIONS {PollutionLevel.EXCELLENT: 空气质量优,PollutionLevel.GOOD: 空气质量良,PollutionLevel.LIGHT_POLLUTION: 轻度污染,PollutionLevel.MODERATE_POLLUTION: 中度污染,PollutionLevel.HEAVY_POLLUTION: 重度污染,PollutionLevel.HAZARDOUS: 危险级别}2. sensors/mq2_sensor.pyMQ-2烟雾传感器模块MQ-2烟雾传感器模块支持模拟量读取、温度补偿、数据滤波用于检测LPG、烟雾、酒精、甲烷等可燃气体import timeimport loggingimport randomimport mathfrom typing import Optional, List, Tuple, Dequefrom dataclasses import dataclass, fieldfrom enum import Enum, autofrom collections import dequefrom config import config, SafetyLimits, PollutionLevellogger logging.getLogger(__name__)class SensorError(Exception):传感器异常类passclass CalibrationError(SensorError):校准异常类passdataclassclass SmokeReading:烟雾读数数据类timestamp: floatraw_adc_value: intvoltage: floatrs_value: floatratio_rs_ro: floatsmoke_ppm: floattemperature: Optional[float] Nonehumidity: Optional[float] Noneis_valid: bool Truequality: float 1.0 # 数据质量分数 0-1class MQ2SmokeSensor:MQ-2烟雾传感器驱动特性- 模拟量ADC读取- 温度湿度补偿- 滑动平均滤波- 自动校准功能- 模拟模式支持- 预热状态管理- 多气体检测主要检测烟雾工作原理1. 加热元件使敏感层达到工作温度2. 目标气体吸附改变敏感层电阻3. 通过测量负载电阻分压计算气体浓度# MQ-2特性参数# 对数坐标下Rs/Ro与气体浓度的关系# 烟雾/LPG: log(y) a*log(x) bSMOKE_LPG_PARAMS {a: -0.42, # 斜率b: 1.98, # 截距ro_ref: 9.83 # 洁净空气中Ro值}def __init__(self, analog_pin: int 0, ro_clean_air: float 9.83):初始化MQ-2传感器Args:analog_pin: ADC通道号ro_clean_air: 洁净空气中的Ro值self.analog_pin analog_pinself.ro_clean_air ro_clean_air# 当前Ro值会随使用老化self._ro_current: float ro_clean_air# 滤波缓冲区self._filter_buffer: Deque[float] deque(maxlenconfig.sensor.filter_window)# 状态管理self._is_initialized Falseself._is_preheated Falseself._preheat_start_time: Optional[float] Noneself._error_count 0self._max_errors 10# 模拟模式self._simulation_mode config.simulation_modeself._simulation_base_ppm 50.0 # 模拟基础烟雾浓度# 统计信息self._readings_count 0self._calibration_count 0logger.info(fMQ-2烟雾传感器初始化: 模拟引脚{analog_pin}, fRo洁净空气{ro_clean_air})def initialize(self) - bool:初始化传感器硬件Returns:bool: 初始化是否成功try:if self._simulation_mode:logger.info(MQ-2传感器运行在模拟模式)self._is_initialized Trueself._start_preheat()return True# 实际硬件初始化# import spidev# self.spi spidev.SpiDev()# self.spi.open(0, 0)# self.spi.max_speed_hz 1350000self._is_initialized Trueself._start_preheat()logger.info(MQ-2烟雾传感器初始化成功)return Trueexcept Exception as e:logger.error(fMQ-2传感器初始化失败: {e})return Falsedef _start_preheat(self):启动预热过程self._preheat_start_time time.time()self._is_preheated Falselogger.info(fMQ-2传感器开始预热预计需要{config.sensor.mq2_heating_time}秒)def _check_preheat_status(self) - Tuple[bool, float]:检查预热状态Returns:Tuple[bool, float]: (是否预热完成, 剩余时间)if self._preheat_start_time is None:return True, 0.0elapsed time.time() - self._preheat_start_timeremaining config.sensor.mq2_heating_time - elapsedif remaining 0:self._is_preheated Truelogger.info(MQ-2传感器预热完成)return True, 0.0return False, remainingdef _read_raw_adc(self) - Optional[int]:读取原始ADC值Returns:Optional[int]: 10位ADC值0-1023if not self._is_initialized:logger.error(传感器未初始化)return Nonetry:if self._simulation_mode:# 模拟数据模拟吸烟过程和自然衰减elapsed time.time() % 300 # 5分钟周期# 模拟吸烟事件smoking_factor 0.0if 60 elapsed 120: # 第1-2分钟模拟吸烟smoking_factor math.sin((elapsed - 60) / 60 * math.pi) * 0.8elif 180 elapsed 210: # 第3-3.5分钟再次吸烟smoking_factor math.sin((elapsed - 180) / 30 * math.pi) * 0.6# 基础浓度 吸烟影响 随机噪声simulated_value (self._simulation_base_ppm smoking_factor * 1500 random.uniform(-20, 20))# 转换为ADC值假设3.3V供电MQ-2输出电压与浓度正相关adc_value int(min(1023, simulated_value / 3000 * 1023))return adc_value# 实际硬件读取# command 0x01 7 | self.analog_pin 4# response self.spi.xfer2([command, 0x00, 0x00])# adc_value ((response[1] 0x03) 8) | response[2]# return adc_valuereturn int(random.uniform(100, 600))except Exception as e:logger.error(f读取ADC失败: {e})self._error_count 1if self._error_count self._max_errors:logger.critical(传感器错误过多进入保护模式)self._is_initialized Falsereturn Nonedef read_smoke(self, temperature: Optional[float] None,humidity: Optional[float] None) - Optional[SmokeReading]:读取烟雾浓度Args:temperature: 环境温度用于补偿humidity: 环境湿度用于补偿Returns:Optional[SmokeReading]: 烟雾读数对象# 检查预热状态preheated, remaining self._check_preheat_status()if not preheated:logger.debug(f传感器预热中剩余{remaining:.1f}秒)# 预热期间返回最后一次有效读数或默认值return self._create_preheat_reading(remaining)raw_value self._read_raw_adc()if raw_value is None:return None# 重置错误计数self._error_count 0# 转换为电压voltage self._adc_to_voltage(raw_value)# 计算Rs值传感器电阻rs_value self._calculate_rs(voltage)# 计算Rs/Ro比值ratio_rs_ro rs_value / self._ro_current# 温度湿度补偿compensated_ratio self._apply_environmental_compensation(ratio_rs_ro, temperature, humidity)# 转换为烟雾浓度PPMsmoke_ppm self._ratio_to_smoke_ppm(compensated_ratio)# 滤波处理filtered_ppm self._apply_filter(smoke_ppm)# 创建读数对象reading SmokeReading(timestamptime.time(),raw_adc_valueraw_value,voltageround(voltage, 3),rs_valueround(rs_value, 2),ratio_rs_roround(ratio_rs_ro, 3),smoke_ppmround(filtered_ppm, 1),temperaturetemperature,humidityhumidity,is_validTrue,qualityself._calculate_data_quality(raw_value))self._last_reading readingself._readings_count 1logger.debug(f烟雾读数: ADC{raw_value}, 电压{voltage:.3f}V, fRs{rs_value:.2f}Ω, Rs/Ro{ratio_rs_ro:.3f}, f烟雾{filtered_ppm:.1f}PPM)return readingdef _adc_to_voltage(self, adc_value: int) - float:ADC值转电压return (adc_value / config.sensor.max_adc_value) * 3.3def _calculate_rs(self, voltage: float) - float:计算传感器电阻RsMQ-2电路Vcc - Ro - Rs - GND中间抽头接ADCVout Vcc * Rs / (Ro Rs)Args:voltage: ADC测量的电压值Returns:float: Rs电阻值欧姆# 假设负载电阻Ro为10K典型值load_resistance 10000.0 # 10KΩif voltage 0 or voltage 3.3:return load_resistance# Rs Ro * (Vcc - Vout) / Voutrs load_resistance * (3.3 - voltage) / voltagereturn rsdef _apply_environmental_compensation(self, ratio: float,temperature: Optional[float],humidity: Optional[float]) - float:应用温度湿度补偿MQ-2对温度和湿度敏感需要进行补偿Args:ratio: Rs/Ro比值temperature: 温度摄氏度humidity: 湿度%Returns:float: 补偿后的比值if not config.sensor.temperature_compensation:return ratioif temperature is None or humidity is None:return ratio# 温度补偿系数典型值需根据实际传感器校准temp_coefficient 0.005 # 每度温度变化的影响humidity_coefficient 0.002 # 每1%湿度变化的影响# 补偿公式# 温度升高Rs减小湿度升高Rs增大temp_compensation 1.0 temp_coefficient * (temperature - 20.0)humidity_compensation 1.0 - humidity_coefficient * (humidity - 50.0)compensated_ratio ratio * temp_compensation * humidity_compensationlogger.debug(f环境补偿: 温度{temperature}°C, 湿度{humidity}%, f补偿系数{temp_compensation*humidity_compensation:.4f})return compensated_ratiodef _ratio_to_smoke_ppm(self, ratio: float) - float:将Rs/Ro比值转换为烟雾浓度PPM使用对数线性关系log(PPM) a * log(Rs/Ro) bArgs:ratio: Rs/Ro比值Returns:float: 烟雾浓度PPMparams self.SMOKE_LPG_PARAMSif ratio 0:return 0.0# 对数转换log_ratio math.log10(ratio)log_ppm params[a] * log_ratio params[b]# 反对数ppm math.pow(10, log_ppm)# 限制范围ppm max(config.min_smoke_ppm, min(config.max_smoke_ppm, ppm))return ppmdef _apply_filter(self, ppm: float) - float:应用滑动平均滤波self._filter_buffer.append(ppm)if len(self._filter_buffer) 0:return ppm# 加权平均最近的值权重更高weights [i 1 for i in range(len(self._filter_buffer))]weighted_sum sum(p * w for p, w in zip(self._filter_buffer, weights))total_weight sum(weights)return weighted_sum / total_weightdef _calculate_data_quality(self, raw_value: int) - float:计算数据质量分数# 基于ADC值的合理性评估if raw_value 50 or raw_value 900:return 0.7 # 边缘值质量较低elif raw_value 100 or raw_value 800:return 0.85 # 接近边缘质量中等else:return 1.0 # 正常范围质量高def _create_preheat_reading(self, remaining_time: float) - SmokeReading:创建预热期间的读数利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛