python: Push  Pull Pattern 项目结构模拟5 个核心业务环节组成流水线Push1原料采购部推送原料订单Pull-Push加工车间拉取原料 → 推送加工完成的珠宝Pull-Push质检中心拉取成品 → 推送质检分级结果Pull-Push门店销售部拉取质检成品 → 推送销售数据Pull售后维保部拉取销售数据 → 提供维保服务# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:38 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py 全局配置端口、超时、珠宝行业固定常量、消息分隔符 # ZMQ 通信端口定义 PORT_RAW_MATERIAL 5555 PORT_PROCESS 5556 PORT_QUALITY 5557 PORT_SALE 5558 # 通信超时(ms) ZMQ_SOCKET_TIMEOUT 3000 # 珠宝行业基础枚举常量 JEWELRY_CATEGORY [ 钻石戒指, 黄金项链, 翡翠手镯, 铂金耳钉, 彩宝吊坠 ] QUALITY_GRADE [ S级(收藏), A级(精品), B级(常规), C级(特价) ] # 消息分隔符统一解析格式 MSG_SEP | KV_SEP : # 本地连接地址 LOCAL_TCP_ADDR tcp://localhost BIND_ADDR tcp://* # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:39 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys from PushPullPattern.config.settings import ZMQ_SOCKET_TIMEOUT def get_module_logger(module_name: str) - logging.Logger: 获取模块独立日志器区分业务模块输出 :param module_name: :return: logger logging.getLogger(module_name) logger.setLevel(logging.INFO) if not logger.handlers: formatter logging.Formatter( fmt%(asctime)s [%(name)s] %(levelname)s: %(message)s, datefmt%Y-%m-%d %H:%M:%S ) stream_handler logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) return logger # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:41 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : message.py 统一消息封装所有业务消息标准化序列化、解析 from typing import Dict from PushPullPattern.config.settings import MSG_SEP, KV_SEP class BaseMessage: 消息基类统一打包/解包逻辑 staticmethod def pack(data: Dict[str, str]) - str: 字典转消息字符串 :param data: :return: parts [] for k, v in data.items(): parts.append(f{k}{KV_SEP}{v}) return MSG_SEP.join(parts) staticmethod def unpack(raw_msg: str) - Dict[str, str]: 消息字符串转回字典 :param raw_msg: :return: result {} items raw_msg.split(MSG_SEP) for item in items: if KV_SEP in item: k, v item.split(KV_SEP, maxsplit1) result[k] v return result # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 pip install zmq # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:43 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : socket_factory.py import zmq from PushPullPattern.config.settings import ZMQ_SOCKET_TIMEOUT from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(SocketFactory) class ZmqSocketFactory: 统一创建Push/Pull套接字资源统一管理职责单一 _context zmq.Context() classmethod def create_push_bind(cls, port: int) - zmq.Socket: 创建Push生产者绑定端口对外提供服务 :param port: :return: sock cls._context.socket(zmq.PUSH) sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT) sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT) bind_url ftcp://*:{port} sock.bind(bind_url) logger.info(fPush Socket bind success: {bind_url}) return sock classmethod def create_pull_connect(cls, port: int) - zmq.Socket: 创建Pull消费者连接上游Push端口 :param port: :return: sock cls._context.socket(zmq.PULL) sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT) sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT) connect_url ftcp://localhost:{port} sock.connect(connect_url) logger.info(fPull Socket connect success: {connect_url}) return sock classmethod def close_context(cls): 程序退出统一释放ZMQ资源 :return: cls._context.term() logger.info(ZMQ Context resource released)# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:45 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : raw_material_service.py import time import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_RAW_MATERIAL, JEWELRY_CATEGORY from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(RawMaterialService) class RawMaterialService: 原料采购纯 Push 生产者 业务职责珠宝原料采购仅推送原料订单消息 def __init__(self): self.push_sock ZmqSocketFactory.create_push_bind(PORT_RAW_MATERIAL) def run_produce(self, total_count: int 10): :param total_count: :return: logger.info(原料采购服务启动开始生成原料订单) for seq in range(1, total_count 1): cat random.choice(JEWELRY_CATEGORY) order_id fRAW_{seq:03d} msg_data { type: 原料订单, order_id: order_id, category: cat, status: 已采购待加工 } msg_str BaseMessage.pack(msg_data) self.push_sock.send_string(msg_str) logger.info(f推送原料消息: {msg_str}) time.sleep(2) logger.info(全部原料订单推送完成) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : process_service.py import time import random import zmq from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_RAW_MATERIAL, PORT_PROCESS from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(ProcessService) class ProcessService: 加工车间PullPush 中转 业务职责拉取原料加工生成成品向下游推送 def __init__(self): self.pull_sock ZmqSocketFactory.create_pull_connect(PORT_RAW_MATERIAL) self.push_sock ZmqSocketFactory.create_push_bind(PORT_PROCESS) def run_pipeline(self): logger.info(珠宝加工车间服务启动等待上游原料) while True: try: # 非阻塞拉取避免卡死 raw_msg self.pull_sock.recv_string(flagszmq.NOBLOCK) data BaseMessage.unpack(raw_msg) logger.info(f接收原料消息: {raw_msg}) # 模拟加工耗时 time.sleep(random.uniform(1, 3)) raw_order data[order_id] finish_order raw_order.replace(RAW, FIN) msg_data { type: 成品珠宝, order_id: finish_order, category: data[category], status: 已加工待质检 } send_msg BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f加工完成推送: {send_msg}\n) except zmq.Again: # 无消息时短暂休眠降低CPU占用 time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:49 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : quality_service.py import random import time import zmq from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_PROCESS, PORT_QUALITY, QUALITY_GRADE from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(QualityService) class QualityService: 质检中心PullPush 中转 业务职责拉取加工成品质检分级推送可销售货品 def __init__(self): self.pull_sock ZmqSocketFactory.create_pull_connect(PORT_PROCESS) self.push_sock ZmqSocketFactory.create_push_bind(PORT_QUALITY) def run_pipeline(self): logger.info(质检中心服务启动等待加工成品) while True: try: raw_msg self.pull_sock.recv_string(flagszmq.NOBLOCK) data BaseMessage.unpack(raw_msg) logger.info(f接收成品消息: {raw_msg}) time.sleep(random.uniform(0.5, 2)) grade random.choice(QUALITY_GRADE) msg_data { type: 质检成品, order_id: data[order_id], category: data[category], grade: grade, status: 可销售 } send_msg BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f质检完成推送: {send_msg}\n) except zmq.Again: time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:52 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : sale_service.py import time import zmq import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_QUALITY, PORT_SALE from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(SaleService) class SaleService: 门店销售PullPush 中转 业务职责拉取质检货品模拟销售推送销售单据给售后 def __init__(self): self.pull_sock ZmqSocketFactory.create_pull_connect(PORT_QUALITY) self.push_sock ZmqSocketFactory.create_push_bind(PORT_SALE) def run_pipeline(self): logger.info(门店销售服务启动等待质检货品) while True: try: raw_msg self.pull_sock.recv_string(flagszmq.NOBLOCK) data BaseMessage.unpack(raw_msg) logger.info(f接收质检货品: {raw_msg}) time.sleep(random.uniform(1, 2.5)) sale_order data[order_id].replace(FIN, SALE) price random.randint(1000, 50000) msg_data { type: 销售完成, order_id: sale_order, category: data[category], price: f{price}元, status: 已售出 } send_msg BaseMessage.pack(msg_data) self.push_sock.send_string(send_msg) logger.info(f销售单据推送: {send_msg}\n) except zmq.Again: time.sleep(0.1) continue # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:53 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : after_sale_service.py import time import zmq import random from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.model.message import BaseMessage from PushPullPattern.config.settings import PORT_SALE from PushPullPattern.utils.logger import get_module_logger logger get_module_logger(AfterSaleService) class AfterSaleService: 售后维保纯 Pull 末端消费者 业务职责接收销售单据开通终身维保档案流水线终点 def __init__(self): self.pull_sock ZmqSocketFactory.create_pull_connect(PORT_SALE) def run_consumer(self): logger.info(售后维保服务启动等待销售单据) while True: try: raw_msg self.pull_sock.recv_string(flagszmq.NOBLOCK) data BaseMessage.unpack(raw_msg) logger.info(f接收销售单据: {raw_msg}) logger.info( f维保档案创建成功 | 品类:{data[category]} f订单:{data[order_id]} | 终身维保已生效\n ) except zmq.Again: time.sleep(0.1) continue调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Push Pull Pattern推 - 拉模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/25 20:55 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : PushPullBll.py import time import threading import atexit from PushPullPattern.core.socket_factory import ZmqSocketFactory from PushPullPattern.service import ( RawMaterialService, ProcessService, QualityService, SaleService, AfterSaleService ) from PushPullPattern.utils.logger import get_module_logger class PushPullBll(object): logger get_module_logger(PushPullBll) # 注册退出钩子释放ZMQ资源 atexit.register def release_zmq(self): ZmqSocketFactory.close_context() self.logger.info(✅ ZMQ 资源已释放) def start_all_pipeline(self): 统一启动全链路多线程 # 实例化服务 raw_service RawMaterialService() process_service ProcessService() quality_service QualityService() sale_service SaleService() after_service AfterSaleService() # 创建线程 t_raw threading.Thread(targetraw_service.run_produce, args(10,), daemonTrue) t_process threading.Thread(targetprocess_service.run_pipeline, daemonTrue) t_quality threading.Thread(targetquality_service.run_pipeline, daemonTrue) t_sale threading.Thread(targetsale_service.run_pipeline, daemonTrue) t_after threading.Thread(targetafter_service.run_consumer, daemonTrue) # 启动 t_raw.start() t_process.start() t_quality.start() t_sale.start() t_after.start() self.logger.info( 珠宝全链路 Push-Pull 流水线启动完成) self.logger.info(流水线运行中按 Ctrl C 停止\n) # 主线程保持运行 try: t_raw.join() while True: time.sleep(1) except KeyboardInterrupt: self.logger.info( 程序正在退出...) def demo(self): :return: self.start_all_pipeline()输出