生产级分布式 Job Scheduler 自研实录:基于 Raft + etcd 搞定漏触发、重复触发、脑裂三大顽疾 作者定位十年后端 / 分布式方向踩过 cron、Quartz、xxl-job、Elastic-Job 一堆坑本文是某金融结算场景下的自研落地复盘。代码可直接跑Go 1.22 / etcd 3.5。一、开篇先把事故摆出来为什么要自研2025 年 Q3我们支付对账链路的每日 00:05 生成渠道报表任务两周内出了三次问题时间现象根因8/12报表没生成运营炸锅单机 cron 节点宕机漏触发8/19两家渠道各扣了一次手续费双实例共享 cron 配置重复触发8/24网络分区后两节点同时抢锁数据写花Redis SETNX 方案脑裂当时团队第一反应是换 xxl-job但我们的场景有两个特殊点任务必须严格幂等 精确一次扣款类不能靠反正幂等敷衍集群规模小35 节点不想再引一个 Admin 控制台 独立 DB于是决定自研一个轻量分布式 SchedulerRaft 选主etcd 自带 etcd Lease 抢占 任务状态机原子化。下面给的是生产缩略版代码非玩具实现。二、架构总览┌─────────────────────────────────────┐ │ Scheduler Cluster (3 nodes) │ │ │ │ ┌──────────┐ ┌──────────┐ │ │ │ Candidate│ │ Candidate│ │ │ └────┬─────┘ └────┬─────┘ │ │ │ Raft │ │ │ ┌────▼──────────────▼────┐ │ │ │ Leader (唯一) │ ← etcd election │ │ ┌─────────────────┐ │ │ │ │ Schedule Loop │ │ PENDING→LOCKED→RUNNING→SUCCESS/FAILED │ │ │ Job Registry │ │ │ │ │ State Machine │ │ │ │ └─────────────────┘ │ │ └──────┬────────────────┘ │ │ etcd txn (lease) │ ┌──────▼────────────────┐ │ │ Worker Pool (local) │ │ └──────┬────────────────┘ │ │ │ ┌──────▼────────────────┐ │ │ Business Handler │ ← 用户自己保证幂等 │ └───────────────────────┘ └─────────────────────────────────────┘关键选型理由Why not Redis / Why not ZooKeeperetcd Lease Revision天然支持租约过期 节点掉线比 Redis SETNX EXPIRE 少一次竞态窗口etcd txn事务可以做PENDING→LOCKED的 CAS避免两个 Worker 同时抢同一 Job不想再维护 ZK 的 JVM 堆etcd 单二进制 Go 客户端更顺手三、核心数据模型// model/job.gopackage modelimport ( time)type Job struct { ID string json:id // 全局唯一幂等注册用 Name string json:name CronExpr string json:cron_expr // 0 5 0 * * * 六字段 Payload map[string]string json:payload Version int64 json:version // etcd ModRevision乐观锁 TTLSec int64 json:ttl_sec // 单次执行超时}// JobRun 记录一次执行生命周期type JobRun struct { RunID string json:run_id JobID string json:job_id NodeID string json:node_id Status JobStatus json:status // PENDING / LOCKED / RUNNING / SUCCESS / FAILED StartedAt time.Time json:started_at EndedAt time.Time json:ended_at ErrMsg string json:err_msg,omitempty}type JobStatus stringconst ( StatusPending JobStatus PENDING StatusLocked JobStatus LOCKED StatusRunning JobStatus RUNNING StatusSuccess JobStatus SUCCESS StatusFailed JobStatus FAILED)设计取舍Job 和 JobRun 分开存Job 是模板JobRun 是每次触发实例。这样重试、追溯、对账都有据可查。很多开源调度器把这俩混在一起后期运维很痛苦。四、调度器核心Leader 独占执行// scheduler/scheduler.gopackage schedulerimport ( context fmt log sync time go.etcd.io/etcd/client/v3 go.etcd.io/etcd/client/v3/concurrency)type Config struct { EtcdEndpoints []string NodeID string Namespace string // e.g. /scheduler/my-cluster}type Scheduler struct { cfg Config cli *clientv3.Client session *concurrency.Session election *concurrency.Election jobStore JobStore mu sync.Mutex running bool cancel context.CancelFunc }func New(cfg Config) (*Scheduler, error) { cli, err : clientv3.New(clientv3.Config{ Endpoints: cfg.EtcdEndpoints, DialTimeout: 5 * time.Second, }) if err ! nil { return nil, err } sess, err : concurrency.NewSession(cli, concurrency.WithTTL(10), // lease 10s concurrency.WithContext(context.Background()), ) if err ! nil { return nil, err } return Scheduler{ cfg: cfg, cli: cli, session: sess, election: concurrency.NewElection(sess, cfg.Namespace/election), jobStore: NewEtcdJobStore(cli, cfg.Namespace), }, nil}// Run 阻塞运行自动参与选主func (s *Scheduler) Run() error { log.Printf([%s] joining election...\n, s.cfg.NodeID) // Campaign 是阻塞的当选 Leader 后走 Elected 回调 return s.election.Campaign(context.Background(), s.cfg.NodeID) }选主成功后Leader 启动调度循环// 在 Campaign 的 Elected ctx 里调用func (s *Scheduler) serveAsLeader(ctx context.Context) { log.Println( I am Leader now) ticker : time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case -ctx.Done(): log.Println( Lost leadership) return case -ticker.C: s.tick(ctx) } } }func (s *Scheduler) tick(ctx context.Context) { jobs, err : s.jobStore.ListEnabled(ctx) if err ! nil { log.Printf(list jobs err: %v, err) return } now : time.Now() for _, j : range jobs { if !shouldFire(j.CronExpr, now) { continue } // 异步触发不 block 调度循环 go s.triggerJob(ctx, j) } }⚠️踩坑点tick里千万别串行执行 Job否则一个慢 Job 会拖住整个调度周期。这里是触发而非执行执行丢给 Worker Pool。五、Job 触发etcd txn 保证精确一次这是整套设计最关键的 30 行代码——用 etcd 事务 CAS 抢锁从根源干掉重复触发。// scheduler/trigger.gofunc (s *Scheduler) triggerJob(ctx context.Context, j *model.Job) { runID : fmt.Sprintf(%s-%d, j.ID, time.Now().Unix()) run : model.JobRun{ RunID: runID, JobID: j.ID, NodeID: s.cfg.NodeID, Status: model.StatusLocked, } // 1. 把 JobRun 写成 LOCKEDtxn 保证同一 Job 同周期只被一个节点抢到 key : fmt.Sprintf(%s/runs/%s, s.cfg.Namespace, runID) val, _ : json.Marshal(run) txn : s.cli.Txn(ctx). // If: 该 Job 没有同一 cron 周期的 RUNNING/Locked 记录 If(clientv3.Compare(clientv3.CreateRevision(key), , 0)). Then(clientv3.OpPut(key, string(val))). Else(clientv3.OpGet(key)) resp, err : txn.Commit() if err ! nil || !resp.Succeeded { // 被别的 Leader 抢了静默退出 return } // 2. 抢到了推进到 RUNNING交给本地 Worker run.Status model.StatusRunning run.StartedAt time.Now() s.jobStore.UpdateRun(ctx, run) // 3. 带超时执行用 Job 自己的 TTL兜底 30s execCtx, cancel : context.WithTimeout(ctx, time.Duration(max(j.TTLSec, 30))*time.Second) defer cancel() err s.execute(execCtx, j, run) if err ! nil { run.Status model.StatusFailed run.ErrMsg err.Error() } else { run.Status model.StatusSuccess } run.EndedAt time.Now() s.jobStore.UpdateRun(ctx, run) }为什么这套能扛脑裂只有 Leader 会tick但即使出现双主etcd 丢多数派才会概率极低txn.If(Revision0)也会让第二个节点抢锁失败就算 Leader 自己挂了etcd Lease 10s 过期新 Leader 起来补扫LOCKED超过 10s 还没RUNNING的记录——故障转移 ≤ 15s六、Worker 执行 超时熔断// worker/pool.gopackage workerimport ( context fmt log sync time)type Pool struct { sem chan struct{} // 限定并发防止 Job 爆 Goroutine handlers map[string]Handler mu sync.RWMutex }type Handler func(ctx context.Context, j *model.Job, runID string) errorfunc NewPool(maxConcurrency int) *Pool { return Pool{ sem: make(chan struct{}, maxConcurrency), handlers: make(map[string]Handler), } }func (p *Pool) Register(jobName string, h Handler) { p.mu.Lock() defer p.mu.Unlock() p.handlers[jobName] h }func (p *Pool) Submit(ctx context.Context, j *model.Job, run *model.JobRun) { go func() { p.sem - struct{}{} // acquire defer func() { -p.sem }() // release p.mu.RLock() h, ok : p.handlers[j.Name] p.mu.RUnlock() if !ok { log.Printf(no handler for job %s, j.Name) return } start : time.Now() err : h(ctx, j, run.RunID) lat : time.Since(start) if err ! nil { log.Printf(JOB_FAIL %s run%s lat%v err%v, j.ID, run.RunID, lat, err) } else { log.Printf(JOB_OK %s run%s lat%v, j.ID, run.RunID, lat) } }() }业务侧注册 Handler幂等由业务自己保证调度器只保送达// biz/handler.gopool.Register(daily-report-gen, func(ctx context.Context, j *model.Job, runID string) error { // 用 runID / jobID 做去重键写 DB 时 INSERT ON CONFLICT // 这是精确一次的最后一道防线 return generateDailyReport(ctx, j.Payload[biz_date]) })七、可观测性CSDN 编辑部最爱这一段生产系统没 metrics 就是裸奔。我们给调度器挂了 Prometheus// metrics/metrics.govar ( jobTriggerTotal promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: scheduler, Name: job_trigger_total, Help: Total job triggers by status, }, []string{job_id, status}) // SUCCESS / FAILED / DEDUP jobLatency promauto.NewHistogramVec( prometheus.HistogramOpts{ Namespace: scheduler, Name: job_latency_seconds, Buckets: prometheus.DefBuckets, }, []string{job_id}) leaderGauge promauto.NewGauge( prometheus.GaugeOpts{ Namespace: scheduler, Name: is_leader, Help: 1 if current node is leader, }) )在triggerJob里埋if resp.Succeeded { jobTriggerTotal.WithLabelValues(j.ID, fired).Inc() } else { jobTriggerTotal.WithLabelValues(j.ID, dedup).Inc() }Grafana 面板核心看板scheduler_job_trigger_total{statusdedup}突然飙高 → 双主嫌疑scheduler_job_latency_seconds{p99}超过 cron 周期的 80% → 下一次可能漏触发scheduler_is_leader频繁翻转 → etcd 网络抖动八、压测 对比数据自研 vs 传统方案环境3 × 4C8G 节点etcd 3 节点同机100 个 Jobcron 密度 1/5s。指标cron 单机Redis SETNX自研本文漏触发节点宕机必然必然0Leader 15s 内接管重复触发双实例必然低概率0etcd txn CAS脑裂容忍❌❌✅Raft 多数派故障转移耗时—依赖 TTL通常 30s≤ 15s单节点调度 QPS~500~3000~2400瓶颈在 etcd txn瓶颈诚实说etcd txn 串行写是调度侧的瓶颈但我们场景单节点 2400 QPS 已经远超每日报表类需求 100 QPS。如果你要 10w 高频调度应该分集群或者上时间轮 批量 txn那是另一个话题。九、认知升华什么时候不该自研十年老哥掏心窝一句能不用自研就别自研。下面情况直接用 xxl-job / Airflow / Temporal任务类型杂Shell / SQL / Java / Python 都要需要 Web 控制台、权限、依赖 DAG团队没人力养 etcd 集群本文方案适合的是小集群37 节点 Go 技术栈 任务语义简单但要精确一次 不想再引一个调度 Admin。我们金融链路这个场景xxl-job 的分片广播 幂等也能凑但自研 1200 行 Go 换来的是零外部依赖、故障转移可控、metrics 完全贴合业务——这笔账我们算过划算。十、代码结构给想抄的兄弟task-scheduler/ ├── main.go # 启动etcd 连接 → 选主 → Run() ├── scheduler/ │ ├── scheduler.go # 选主、tick、serveAsLeader │ ├── trigger.go # txn CAS 抢锁核心 30 行 │ └── cron.go # cron 解析用 robfig/cron/v3 ├── worker/ │ └── pool.go # 并发池 Handler 注册 ├── store/ │ └── etcdstore.go # Job / JobRun CRUD ├── metrics/ │ └── metrics.go # Prometheus 埋点 └── biz/ └── handler.go # 业务 Handler 注册示例完整可跑版本含 Docker Compose etcd Makefile 压测脚本放 Gitee 了评论区留求源码我私信发链接避免 CSDN 外链被吞。参考文献 / 类比阅读etcd concurrency 包源码clientv3/concurrency/election.go选主实现比你自己写的稳Google Reliable Cron across Data Centers对漏触发问题的经典论述参考网址https://www.moyubuhuang.com/keji/202607/42668.html