分析 K8s CSI 控制器如何调度并保障 K8s CNI网络插件工作原理的持久化数据一致性流程 分析 K8s CSI 控制器如何调度并保障 K8s CNI网络插件工作原理的持久化数据一致性流程一、CSI 与 CNI 的数据一致性挑战1.1 问题的本质Kubernetes 的 CSI容器存储接口和 CNI容器网络接口在各自的标准规范中是独立的。然而在 Pod 的实际生命周期中存储的挂载与网络的配置必须是原子性操作——如果一个 Pod 需要挂载 PersistentVolume 并通过网络对外提供服务挂载与网络配置之间的时序错乱会导致数据不一致。Pod 创建时序中的一致性问题 时序 A正常 1. CNI 分配 IP 地址 2. CSI 挂载存储卷 3. Pod 启动应用读写正常 时序 B异常 1. CNI 分配 IP 地址 2. CSI 挂载失败 3. Pod 启动应用无法写数据脏数据风险 时序 C异常 1. CSI 挂载存储卷 2. CNI 分配 IP 失败 3. Pod 启动无网络但能写本地数据脑裂1.2 数据一致性保证的层级层级范围CSICNI一致性需求L0单 Pod本地挂载本地网络Mount 先于网络就绪L1同节点 Pod共享卷同子网读后写一致性L2跨节点 PodReadWriteMany跨节点路由分布式锁 fencingL3跨 AZ Pod跨 AZ 卷跨 AZ 路由全局 fencing 租约二、CSI 控制器的工作流与一致性保障2.1 CSI 标准一致性保障机制CSI 规范定义了一套完整的数据一致性保障协议核心是 ** fencing 机制**// CSI Controller 的 fencing 实现 package csi import ( context sync time k8s.io/apimachinery/pkg/util/wait ) type VolumeFence struct { mu sync.Mutex fencedVols map[string]*FenceEntry storage StorageBackend } type FenceEntry struct { VolumeID string NodeID string FencedAt time.Time ExpiresAt time.Time FenceToken string // 分布式 fence 令牌 } func (f *VolumeFence) FenceVolume(ctx context.Context, volID, nodeID string) error { f.mu.Lock() defer f.mu.Unlock() // 1. 生成 fence 令牌基于 etcd 的全局递增序列 token, err : f.storage.GenerateFenceToken(ctx, volID) if err ! nil { return err } // 2. 执行存储层 fenceSCSI-3 PR if err : f.storage.Reserve(ctx, volID, token); err ! nil { return err } // 3. 注册 fence 条目 f.fencedVols[volID] FenceEntry{ VolumeID: volID, NodeID: nodeID, FencedAt: time.Now(), ExpiresAt: time.Now().Add(30 * time.Second), FenceToken: token, } return nil } func (f *VolumeFence) ReleaseFence(ctx context.Context, volID, token string) error { f.mu.Lock() defer f.mu.Unlock() entry, ok : f.fencedVols[volID] if !ok { return nil // 已释放 } // 验证令牌一致性 if entry.FenceToken ! token { return ErrFenceTokenMismatch } // 释放存储层 reservation if err : f.storage.Release(ctx, volID, token); err ! nil { return err } delete(f.fencedVols, volID) return nil }2.2 Attach/Detach 的一致性保障# CSI Attach 操作的状态机 apiVersion: storage.k8s.io/v1 kind: VolumeAttachment metadata: name: csi-abcdef123456 spec: attacher: csi-driver.example.com nodeName: worker-node-1 source: persistentVolumeName: pvc-123456 status: attached: true attachmentMetadata: devicePath: /dev/nvme1n1 filesystemType: ext4 attachError: null detachError: nullCSI Attach 的原子性保障实现// CSI Attach Controller 核心逻辑 func (c *CSIAttachController) attachVolume(ctx context.Context, va *storagev1.VolumeAttachment) error { // 1. 乐观锁先读取当前状态 vaCopy : va.DeepCopy() // 2. 调用 CSI Driver 的 ControllerPublishVolume resp, err : c.csiClient.ControllerPublishVolume(ctx, csi.ControllerPublishVolumeRequest{ VolumeId: va.Spec.Source.PersistentVolumeName, NodeId: va.Spec.NodeName, VolumeCapability: va.Spec.VolumeCapability, Readonly: false, Secrets: nil, }) if err ! nil { // 3. 失败时更新 status 记录错误 vaCopy.Status.Attached false vaCopy.Status.AttachError storagev1.VolumeError{ Time: metav1.Now(), Message: err.Error(), } } else { // 4. 成功时记录 attachment 元数据 vaCopy.Status.Attached true vaCopy.Status.AttachmentMetadata resp.PublishContext } // 5. 原子更新 VolumeAttachment 状态 return c.client.Status().Update(ctx, vaCopy) }三、CNI 网络插件工作流程与状态一致性3.1 CNI 插件数据流与状态管理CNI 插件的工作流分为以下阶段每个阶段都有状态一致性要求CNI ADD 操作流程 [Kubelet] -- [CNI Plugin] -- [IPAM Plugin] -- [Network Backend] 阶段 1CNI 插件调用 kubelet 传递的 CNI_ARGS 阶段 2IPAM 分配 IP 地址存储在 etcd/文件 阶段 3配置网络接口veth pair / macvlan 阶段 4设置路由规则和 iptables 阶段 5返回 CNIResultIP、DNS、路由 状态一致性要点 - IPAM 分配必须在网络配置之前完成至少分配要先于配置 - 如果阶段 3 失败需要回滚阶段 2 的 IP 分配 - 如果阶段 5 返回失败kubelet 会重试整个流程3.2 CNI 操作的回滚机制package cni import ( github.com/containernetworking/cni/pkg/skel github.com/containernetworking/cni/pkg/types ) // CNI ADD 操作的回滚事务 type CNITransaction struct { Steps []func() error Rollback []func() State map[string]interface{} } func (t *CNITransaction) Execute() error { for i, step : range t.Steps { if err : step(); err ! nil { // 执行失败回滚所有已完成的步骤 for j : i - 1; j 0; j-- { t.Rollback[j]() } return err } } return nil } func CmdAdd(args *skel.CmdArgs) error { // 创建事务 txn : CNITransaction{ Steps: []func() error{ func() error { // Step 1: 检查网络命名空间 return validateNetNS(args.Netns) }, func() error { // Step 2: 从 IPAM 分配 IP ip, err : ipam.Allocate(args.ContainerID, args.IfName) if err ! nil { return err } txn.State[ip] ip return nil }, func() error { // Step 3: 创建 veth pair return setupVeth(args.ContainerID, args.Netns) }, func() error { // Step 4: 配置路由 return setupRouting(args.ContainerID, txn.State[ip].(*net.IPNet)) }, }, Rollback: []func(){ func() { cleanupVeth(args.ContainerID) }, func() { ipam.Release(args.ContainerID, args.IfName) }, func() { validateNetNS(args.Netns) }, }, State: make(map[string]interface{}), } // 执行事务 if err : txn.Execute(); err ! nil { return types.NewError(types.ErrInternal, transaction failed, err.Error()) } return nil }3.3 IPAM 的状态一致性# IPAM 状态存储设计 apiVersion: v1 kind: ConfigMap metadata: name: ipam-state namespace: kube-system data: allocations.json: | { container-abc: {ip: 10.244.1.5, ifname: eth0, netns: /proc/1234/ns/net}, container-def: {ip: 10.244.1.6, ifname: eth0, netns: /proc/5678/ns/net} } --- # 使用 Kubernetes CRD 存储 IPAM 状态 apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: ipallocations.cni.example.com spec: group: cni.example.com names: kind: IPAllocation plural: ipallocations scope: Cluster versions: - name: v1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: podRef: type: string ip: type: string subnet: type: string gateway: type: string status: type: string enum: [Allocated, InUse, Releasing, Released]四、CSI 与 CNI 的状态同步机制4.1 kubelet 的协调角色kubelet 是 CSI 和 CNI 操作的协调者它保证两者在 Pod 生命周期中的执行顺序// kubelet 的 Pod 创建状态机简化 func (kl *Kubelet) syncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { // 阶段 1: CNI 网络设置 if err : kl.runtimeService.RunPodSandbox(pod, networkPlugin); err ! nil { return fmt.Errorf(failed to setup network: %v, err) } // 阶段 2: CSI 卷挂载 if err : kl.volumeManager.WaitForAttachAndMount(pod); err ! nil { // 回滚网络设置 kl.runtimeService.StopPodSandbox(pod) return fmt.Errorf(failed to mount volumes: %v, err) } // 阶段 3: 创建容器 if err : kl.runtimeService.CreateContainer(pod, container); err ! nil { // 回滚卷挂载和网络 kl.volumeManager.UnmountVolumes(pod) kl.runtimeService.StopPodSandbox(pod) return err } // 阶段 4: 启动容器 return kl.runtimeService.StartContainer(containerID) }4.2 分布式一致性协议在跨节点场景中CSI 和 CNI 的操作需要分布式一致性协议来保障// 基于 etcd 的分布式协调 package coordination import ( context encoding/json clientv3 go.etcd.io/etcd/client/v3 ) type PodStateMachine struct { etcdClient *clientv3.Client } type PodOperation struct { PodUID string json:podUID NodeName string json:nodeName Phase string json:phase // network, storage, ready CNIStatus string json:cniStatus // pending, done, failed CSIStatus string json:csiStatus // pending, done, failed Version int64 json:version } func (m *PodStateMachine) TransitionPhase(ctx context.Context, pod *PodOperation) error { key : fmt.Sprintf(/pods/%s/state, pod.PodUID) for { // 读取当前状态 resp, err : m.etcdClient.Get(ctx, key) if err ! nil { return err } var current PodOperation if len(resp.Kvs) 0 { json.Unmarshal(resp.Kvs[0].Value, current) } // 状态转换校验 if !isValidTransition(current.Phase, pod.Phase) { return ErrInvalidTransition } // CASCompare-And-Swap写入 tx : m.etcdClient.Txn(ctx) tx.If(clientv3.Compare(clientv3.ModRevision(key), , current.Version)) tx.Then(clientv3.OpPut(key, marshalJSON(pod))) txResp, err : tx.Commit() if err ! nil { return err } if !txResp.Succeeded { continue // 并发冲突重试 } return nil } }4.3 fencing 机制预防脑裂#!/bin/bash # CSI Fencing 脚本在 CNI 故障时保护存储 # 由 kubelet 在 CNI ADD 失败后调用 set -euo pipefail POD_UID${1} VOLUME_ID${2} NODE_NAME${3} # 1. 检查是否有其他节点已挂载此存储卷 for node in $(kubectl get nodes -o name | cut -d/ -f2); do if [ $node $NODE_NAME ]; then continue fi # 2. 验证目标节点的 CNI 状态 cni_status$(kubectl get pod -n kube-system -o wide \ --field-selector spec.nodeName$node \ -l k8s-appcilium \ -o jsonpath{.items[0].status.phase}) if [ $cni_status Running ]; then # 3. 执行存储 fence kubectl exec -n kube-system deploy/csi-controller -- \ csi-fence --volume-id$VOLUME_ID --node$node --actionschedule echo Fenced volume $VOLUME_ID on node $node fi done # 4. 确认 fence 完成 if csi-fence --volume-id$VOLUME_ID --check-fence; then echo Fence confirmed for volume $VOLUME_ID else echo Fence failed for volume $VOLUME_ID 2 exit 1 fi五、数据一致性验证与监控5.1 一致性验证工具// CSI × CNI 一致性验证器 type ConsistencyValidator struct { kubeClient kubernetes.Interface csiClient csi.NodeClient cniClient cni.CNIClient } type ValidationResult struct { Pod string Node string VolumeMounted bool NetworkReady bool DataIntegrity bool Error string } func (v *ConsistencyValidator) ValidatePod(ctx context.Context, pod *v1.Pod) (*ValidationResult, error) { result : ValidationResult{ Pod: pod.Name, Node: pod.Spec.NodeName, } // 1. 验证存储挂载 for _, vol : range pod.Spec.Volumes { if vol.PersistentVolumeClaim ! nil { pvc, err : v.kubeClient.CoreV1().PersistentVolumeClaims( pod.Namespace).Get(ctx, vol.PersistentVolumeClaim.ClaimName, metav1.GetOptions{}) if err ! nil { result.Error fmt.Sprintf(PVC %s not found: %v, vol.Name, err) return result, nil } if pvc.Status.Phase ! v1.ClaimBound { result.VolumeMounted false result.Error fmt.Sprintf(PVC %s not bound, vol.Name) return result, nil } // 检查 VolumeAttachment vaList, _ : v.kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf(pv-name%s, pvc.Spec.VolumeName), }) if len(vaList.Items) 0 || !vaList.Items[0].Status.Attached { result.VolumeMounted false continue } result.VolumeMounted true } } // 2. 验证网络就绪 podIP : pod.Status.PodIP if podIP { result.NetworkReady false } else { result.NetworkReady true } // 3. 数据完整性测试需要在 Pod 内执行 if result.VolumeMounted result.NetworkReady { result.DataIntegrity true } return result, nil }5.2 一致性监控告警apiVersion: monitoring.coreos.com/v1 kind: PrometheusRule metadata: name: consistency-alerts spec: groups: - name: storage-network-consistency rules: - alert: VolumeMountedWithoutNetwork expr: | count by (node) ( kube_pod_volume_mount_info{statemounted} unless on (pod) kube_pod_info{pod_ip!} ) 0 for: 1m labels: severity: critical annotations: summary: 已挂载存储但无网络的 Pod - alert: CSIOperationStale expr: | time() - csi_volume_op_last_seconds 300 for: 5m labels: severity: warning annotations: summary: CSI 操作长时间未完成 - alert: CNIAllocationLeak expr: | count(cni_ipam_allocations{statusallocated}) - count(kube_pod_info{pod_ip!}) 100 for: 5m labels: severity: warning annotations: summary: IP 分配泄露已分配但未使用的 IP 超过 100 个六、最佳实践总结场景一致性策略技术实现单节点 Pod 创建串行化 回滚kubelet syncPod 状态机跨节点 AttachSCSI-3 PR etcd 租约CSI FencingIPAM 分配事务 CASetcd Txn / CRD 乐观锁故障恢复幂等重试 去重CNI DEL 幂等性集群搬迁全局 fencing 迁移 checkpointVolumeGroup 快照核心原则操作顺序不可逆CNI 网络配置 → CSI 存储挂载 → 容器启动回滚必须完整任一步骤失败所有已执行步骤都必须完整回滚状态必须持久化IPAM、VolumeAttachment 等状态存储在 etcd 而非内存Fencing 是最后防线网络分区时fencing 确保存储不会被两个节点同时写入可观测性全覆盖每个一致性检查点都暴露 Prometheus 指标CSI 和 CNI 的数据一致性不是靠运气实现的而是靠严谨的状态机、事务性回滚和全局 fencing 机制共同保障的。理解这套机制才能在生产集群中自信地运行有状态工作负载。