影刀RPA店群自动化教程:Python协同任务手动干预与安全暂停恢复机制 影刀RPA店群自动化教程Python协同任务手动干预与安全暂停恢复机制自动化流程跑到一半运营发现商品价格不对却不敢点暂停。店群矩阵自动化突破运营极限因为谁也不知道点了暂停之后系统会不会留下一笔无法清理的半成品数据。店群自动化做到一定规模就会面临一个无法回避的现实不是所有操作都适合全自动执行。平台大促期间运营可能想在自动上架商品之前做一次人工审核。客服自动回复流程跑到一半突然需要转入人工处理。甚至有时候某个店铺登录态即将过期运维需要暂停所有任务刷新登录态之后再继续。这些需求指向同一个能力——手动干预与安全暂停恢复。我们在早期架构中完全没有这个能力。暂停就意味着强制终止任务进度丢失已执行的步骤不可回退可能产生脏数据。temu店群自动化报活动案例后来我们花了相当精力设计了一套支持暂停、人工干预和断点续执的任务执行框架。这篇文章就完整展开这套机制的设计与实现。一、任务状态模型的扩展增加“暂停”与“等待人工”之前的任务状态机只有pending → queued → running → success/failed。这个模型无法表达“人为暂停”和“等待人工操作”。我们扩展了状态机加入了三个新状态PAUSED任务被主动暂停保留所有进度可恢复WAITING_MANUAL任务执行到需要人工决策的节点挂起并等待MANUAL_PROCESSING人工正在处理任务资源被锁定但未释放状态转换变得更为丰富pending → queued → running → PAUSED (人工暂停) → WAITING_MANUAL (等待审批) → success → failed PAUSED → running (恢复执行) WAITING_MANUAL → MANUAL_PROCESSING (人工接手) → running (放行) python from enum import Enum class TaskState(Enum): PENDING pending QUEUED queued RUNNING running PAUSED paused WAITING_MANUAL waiting_manual MANUAL_PROCESSING manual_processing SUCCESS success FAILED failed CANCELLED cancelled 状态转换由Python调度层严格控制不允许Worker自行改变任务的基础状态防止出现状态混乱。 --- ## 二、安全暂停不丢失进度不释放资源 实现安全暂停的难点在于任务可能正处于任何一步操作中甚至正在操作浏览器的DOM元素。 我们定义的暂停策略是 1. 收到暂停指令后调度器向对应Worker发送暂停信号通过gRPC或Redis Pub/Sub 2. 2. Worker在当前指令步骤执行完成后才响应暂停不中断正在执行的原子操作 3. 3. 保存当前任务上下文已完成的步骤序号、数据上下文、页面URL、Cookie快照 4. 4. 释放正在使用的浏览器实例但不清理用户数据标记实例为“暂停预留” 5. 5. 任务状态更新为 PAUSED上下文序列化到Redis python class TaskPauseHandler: def __init__(self, worker_agent, state_store): self.agent worker_agent self.state_store state_store async def request_pause(self, task_id: str, reason: str manual): # 通知Worker暂停 await self.agent.send_signal(task_id, pause) logger.info(fPause requested for {task_id}, reason: {reason}) async def handle_pause_signal(self, task_context: dict): # Worker端在当前步骤完成后执行暂停 current_step task_context.get(current_step, 0) completed_steps task_context.get(completed_steps, []) completed_steps.append(current_step) snapshot { task_id: task_context[task_id], shop_id: task_context[shop_id], flow_name: task_context[flow_name], completed_steps: completed_steps, remaining_steps: task_context[steps][current_step1:], data_context: task_context.get(data, {}), page_url: task_context.get(current_url, ), paused_at: time.time(), reason: task_context.get(pause_reason, ), } await self.state_store.save(task_context[task_id], snapshot) # 释放浏览器实例但标记为保留 await self.agent.release_browser(task_context[shop_id], preserve_stateTrue) # 更新状态 await self.agent.update_task_state(task_context[task_id], TaskState.PAUSED) 关键设计点**浏览器实例释放但保留状态**。User Data目录不清理Cookie和登录态完整保留。 这样恢复任务时只需重新获取该实例页面状态和登录状态都还在。 --- ## 三、人工审批节点让自动化流程等人 某些运营流程需要人工审核后才能继续。比如大促期间的批量改价运营希望在真正执行前看一眼价格。 我们在指令序列中增加了 manual_checkpoint 动作类型。 当执行引擎遇到这个指令时自动暂停流程保存上下文并向运营推送审批通知。 json { steps: [ {action: navigate, url: /product/list}, {action: collect_data, locator: .product-row}, {action: manual_checkpoint, checkpoint_id: approve_price_update, message: 即将批量更新10个商品价格请审核后确认执行, timeout: 1800}, {action: click, locator: #confirm-update-btn} ] } 审批通知通过企业微信发送包含操作详情和“通过/拒绝”按钮。 运营可以在电脑或手机上直接审批。审批通过后任务从断点恢复继续执行。 python class ManualCheckpointExecutor: def __init__(self, state_store, notification): self.state_store state_store self.notification notification async def execute_checkpoint(self, task_context, checkpoint_config): # 保存进度进入等待审批状态 await self.state_store.save(task_context[task_id], task_context) await self.agent.update_task_state(task_context[task_id], TaskState.WAITING_MANUAL) # 发送审批通知 approval_url fhttps://admin.internal/tasks/{task_context[task_id]}/approve await self.notification.send( task_context.get(operator_id), f自动化任务暂停需要审批{checkpoint_config[message]}, actions[{type: approve, url: approval_url}, {type: reject}], timeoutcheckpoint_config.get(timeout, 1800) ) async def on_approved(self, task_id): snapshot await self.state_store.load(task_id) # 恢复任务执行跳过checkpoint步骤从下一步继续 await self.agent.resume_task(snapshot, skip_checkpointTrue) async def on_rejected(self, task_id): snapshot await self.state_store.load(task_id) # 标记任务失败执行回滚补偿操作 await self.agent.fail_task(task_id, rejected by operator) 审批超时未处理时任务自动标记为失败并向运营发送超时告警。 --- ## 四、断点续执从暂停点精确恢复 任务恢复执行的可靠性是整个设计中最关键的一环。 恢复时可能面临的问题 - 浏览器页面是否还停留在暂停前的URL - - 已执行的步骤会产生页面状态变化恢复时如何避免重复操作 - - 如果暂停期间店铺登录态过期了怎么办 我们的恢复流程分以下几步 1. 从Redis加载任务快照 2. 2. 重新获取该店铺的浏览器实例优先使用之前保留的实例 3. 3. 导航到快照中记录的URL等待页面就绪 4. 4. 跳过所有已完成的步骤直接从未执行的下一步开始 5. 5. 已完成的写操作不重复执行但需校验操作结果是否仍然存在幂等性检测 python class TaskResumer: def __init__(self, state_store, browser_pool, smart_locator): self.state_store state_store self.browser_pool browser_pool self.smart_locator smart_locator async def resume(self, task_id: str): snapshot await self.state_store.load(task_id) if not snapshot: raise TaskSnapshotNotFound(task_id) # 获取浏览器实例 instance await self.browser_pool.acquire(snapshot[shop_id]) # 恢复页面状态 if snapshot.get(page_url): await instance.navigate(snapshot[page_url]) await instance.wait_for_stable(timeout15) # 恢复数据上下文 data_context snapshot.get(data_context, {}) await instance.restore_data_context(data_context) # 跳过已完成的步骤执行剩余步骤 completed set(snapshot[completed_steps]) remaining [s for i, s in enumerate(snapshot[remaining_steps]) if i not in completed] for step in remaining: await self.agent.execute_step(instance, step, data_context) await self.agent.update_task_state(task_id, TaskState.SUCCESS) await self.state_store.delete(task_id) 恢复时的一个细节如果发现暂停前最后一步是一个“写操作”且已完成我们需要验证该操作确实已生效。 比如检查商品是否确实已上架避免重复上架。这依赖之前设计的幂等机制。 --- ## 五、手动干预的三种模式 除了暂停和审批我们还支持三种不同粒度的手动干预 **任务级干预**暂停、恢复、取消整个任务。 **步骤级干预**在某个步骤执行前暂停允许运营修改该步骤的参数如修改上架价格。 **实例级干预**锁定某个店铺的浏览器实例禁止一切自动化操作完全交给人操作。 python class ManualInterventionAPI: async def step_intervene(self, task_id, step_index, new_params): # 修改特定步骤的参数然后继续 snapshot await self.state_store.load(task_id) snapshot[remaining_steps][step_index][params].update(new_params) await self.state_store.save(task_id, snapshot) async def lock_shop_for_manual(self, shop_id, operator_id): # 锁定店铺实例当前正在执行的任务安全暂停 await self.agent.pause_all_tasks_for_shop(shop_id) await self.browser_pool.mark_as_manual_locked(shop_id, operator_id) async def unlock_shop(self, shop_id): await self.browser_pool.mark_as_manual_unlocked(shop_id) # 恢复之前暂停的任务 await self.agent.resume_paused_tasks_for_shop(shop_id) 实例级锁定常用于运维排查浏览器问题或者运营需要临时手动操作某个店铺后台。 --- ## 六、暂停恢复的副作用处理 暂停可能跨越很长时间几小时甚至过夜。 这期间店铺的某些状态可能已经发生了变化登录态过期、商品被手动修改、平台页面结构变更。 我们在恢复任务时加入前置检查 - 检测登录态是否有效如果失效则暂停恢复并请求人工重新登录 - - 检测目标页面结构是否发生重大变化通过与快照中的DOM哈希比对如果变化则降级为智能元素定位或请求人工确认 - - 检测即将执行的写操作是否会覆盖人工已做的修改通过版本号对比如果冲突则暂停并通知 python async def pre_resume_checks(snapshot, instance): # 登录态检查 if not await instance.is_logged_in(): return ResumeAction.REQUEST_MANUAL_LOGIN # 页面结构检查 current_dom_hash await instance.get_dom_hash() if current_dom_hash ! snapshot.get(dom_hash): logger.warning(fPage structure changed for task {snapshot[task_id]}) return ResumeAction.FALLBACK_TO_SMART_LOCATE return ResumeAction.PROCEED 这些检查有效避免了“盲目恢复导致脏数据”的风险。 --- ## 七、审计与权限控制 所有手动干预操作暂停、审批通过/拒绝、参数修改、实例锁定都会生成审计记录记录操作人、时间、操作内容、操作前后的任务状态。 只有具备相应权限的运营人员才能进行干预。审批节点的通过/拒绝权限与运营角色绑定敏感操作如修改价格需要更高权限。 --- ## 八、监控与统计 手动干预的频率和类型反映了自动化流程的成熟度和运营信任度。 我们统计以下指标 - 每天的手动暂停次数、人工审批次数 - - 人工审批通过/拒绝率 - - 干预后任务的成功率 - - 从暂停到恢复的平均等待时间 如果某个流程的人工审批拒绝率持续很高说明策略配置可能有问题需要优化规则减少人工干预需求。 --- ## 九、写在最后 自动化不是“无人值守”的同义词。 真正成熟的自动化系统应该让人能够平滑地介入、干预、修正然后再优雅地退回到自动模式。 就像自动驾驶汽车在复杂路口让人接管通过后又悄无声息地切回自动驾驶。 **安全暂停和人工干预不是自动化的倒退而是自动化的安全气囊。** 它让运营团队敢于把更多业务交给机器因为他们知道任何时候他们都能踩下刹车。 --- *作者林焱*