在 分布式事务 中,讨论了跨节点事务的提交问题——2PC 追求强一致但牺牲可用性,Saga 追求可用性但接受中间状态。但”一致性”这个词在分布式系统中有着远比事务提交更丰富的含义:线性化要求操作看起来在某一瞬间原子地发生,因果一致性只要求有因果关系的操作按序可见,而共识算法则是让一组节点对某个值达成不可撤销的一致。
本章从一致性模型的形式化定义出发,逐步深入线性化、因果一致性、全序广播,最终落地到 Raft 和 Paxos 两种共识算法的实现细节,并讨论 Fencing Token 如何防止脑裂——这是一致性与共识的完整理论链条。
前置知识
- Ch14 分布式事务:分布式事务的一致性需求是理解一致性模型的动机
- Ch12 数据复制:复制引入了一致性问题
- Ch04 事务与并发控制:单机事务的隔离级别是理解分布式一致性的基础
共识算法的历史是分布式系统最精彩的篇章之一。1989 年,Leslie Lamport 提出了 Paxos 算法——但论文以古希腊议会为隐喻写成,晦涩难懂,直到 2001 年 Lamport 发表”Paxos Made Simple”才被广泛理解。2013 年,Diego Ongaro 在其博士论文中提出了 Raft 算法,以”易于理解”为设计目标,迅速成为工业界的主流选择(etcd、Consul、TiKV 都用 Raft)。理解这段历史,你就会明白为什么 Raft 的论文标题是”In Search of an Understandable Consensus Algorithm”——它是对 Paxos 难以理解的直接回应。
一、一致性模型概述
1.1 为什么需要形式化定义
“一致性”是分布式系统中最容易引起混淆的术语。ACID 中的 C 指数据完整性约束,CAP 中的 C 指线性化,而复制延迟语境下的”一致性”可能指读写一致性或最终一致性——它们是完全不同的概念。
形式化定义的一致性模型回答一个核心问题:在并发和故障存在时,系统对外呈现的行为等价于哪种顺序执行? 不同的等价条件定义了不同的一致性级别。
1.2 一致性谱系
一致性模型构成一个从强到弱的谱系。更强的模型提供更直观的语义,但代价也更高:
线性化和串行化是正交的概念:线性化关注单对象的实时顺序保证,串行化关注多对象的事务隔离。严格可串行化是两者的组合,是分布式数据库能提供的最强一致性保证。
1.3 一致性模型对比
| 模型 | 保证范围 | 实时约束 | 性能代价 | 典型系统 |
|---|---|---|---|---|
| 严格可串行化 | 多对象事务 | 有 | 最高 | Spanner, FoundationDB |
| 线性化 | 单操作 | 有 | 高 | ZooKeeper (写), etcd |
| 串行化 | 多对象事务 | 无 | 高 | 单机 PostgreSQL SERIALIZABLE |
| 因果一致性 | 因果相关操作 | 无 | 中 | DynamoDB (DAX), COPS |
| 顺序一致性 | 全部操作 | 无 | 中 | ZooKeeper (读) |
| 最终一致性 | 无 | 无 | 最低 | Cassandra, Dynamo |
二、线性化
2.1 定义
线性化(Linearizability)也称为强一致性或原子一致性。其核心思想是:每个操作看起来在它的调用和响应之间的某个瞬间原子地生效。
具体来说,如果操作 A 的响应在操作 B 的调用之前发生,那么在线性化的执行中,A 的效果必须在 B 之前可见。这个”之前”是基于物理时间的——线性化是唯一引入实时约束的一致性模型。
# 线性化的直觉:一个寄存器的读写# 假设初始值 x = 0
# 客户端 1 客户端 2 客户端 3# write(x, 1) read(x) read(x)# --- 调用 --- --- 调用 ---
# --- 调用 ---
# --- 响应 OK ---
# 后续所有 read(x) 都必须返回 1(或更新的值)# 不允许出现 read→1, read→0 的回退2.2 线性化的代价
线性化不是免费的。它要求所有操作在全局时间线上有一个确定的顺序点,这意味着:
- 性能代价:每次读操作都可能需要与多数节点通信(Quorum 读),网络延迟成为瓶颈
- 可用性代价:网络分区时,无法同时保证线性化和可用性——这正是 CAP 定理的精确表述
// 线性化读的代价:Quorum 读必须联系多数节点func LinearizableRead(quorum int, nodes []*Node) (string, error) { // 必须从多数节点读取,才能确定最新值 results := make(chan ReadResult, len(nodes)) for _, node := range nodes { go func(n *Node) { val, ts, err := n.Read() results <- ReadResult{Value: val, Timestamp: ts, Err: err} }(node) }
// 收集多数响应 responses := make([]ReadResult, 0, quorum) for i := 0; i < quorum; i++ { r := <-results if r.Err != nil { continue } responses = append(responses, r) }
// 返回时间戳最大的值(最新写入) latest := responses[0] for _, r := range responses[1:] { if r.Timestamp > latest.Timestamp { latest = r } } return latest.Value, nil}2.3 CAP 定理的精确表述
CAP 定理常被简化为”三者选其二”,但这种表述是误导性的。精确的表述是:
在网络分区(Partition)发生时,系统只能在一致性(Consistency,即线性化)和可用性(Availability)之间二选一。 当网络正常时,两者可以同时满足。
| 场景 | C(线性化) | A(可用性) | P(分区) | 行为 |
|---|---|---|---|---|
| 网络正常 | 正常服务 | |||
| 网络分区 | CP 系统:拒绝部分请求 | |||
| 网络分区 | AP 系统:返回可能过时的数据 |
CAP 不是”三选二”的选择题,而是”当分区发生时如何抉择”的应急策略。现实中的系统设计更多是在 延迟与一致性之间做权衡——即使没有分区,线性化读的延迟也高于非线性化读。
2.4 线性化与串行化的区别
这是分布式系统中最常见的混淆之一:
| 维度 | 线性化 | 串行化 |
|---|---|---|
| 关注点 | 单对象操作的实时顺序 | 多对象事务的等价串行顺序 |
| 实时约束 | 有(操作必须在调用-响应间生效) | 无(只要等价于某个串行执行即可) |
| 范围 | 单个操作 | 事务(多个操作) |
| 来源 | 分布式系统/并发理论 | 数据库事务理论 |
在 事务与并发控制 中讨论的隔离级别就是串行化范畴——它们不关心操作的实时顺序,只关心事务的等价串行性。而线性化关心的是实时顺序,不关心事务。
三、因果一致性
3.1 因果关系与 Happens-Before
因果一致性(Causal Consistency)是比线性化弱但比最终一致性强的模型。它只要求有因果关系的操作按顺序被所有节点看到,没有因果关系的并发操作可以以任意顺序出现。
因果关系的核心是 happens-before 关系(Lamport, 1978):
- 同一进程内,操作 A 在操作 B 之前执行 → A happens-before B
- 如果操作 A 发送消息,操作 B 接收该消息 → A happens-before B
- 传递性:A happens-before B,B happens-before C → A happens-before C
# 因果一致性的经典场景:社交网络的评论## 用户 A 发帖:"今天天气真好!"# 用户 B 看到帖子后评论:"确实!"# 用户 C 看到评论后回复:"同意!"## 因果关系链:发帖 → 评论 → 回复# 因果一致性要求:任何节点必须先看到"发帖",才能看到"评论"# 先看到"评论",才能看到"回复"## 如果某个节点先看到"回复"而没看到"发帖",就违反了因果一致性
class CausalClock: """因果时钟:跟踪已知的因果前驱""" def __init__(self): self.clock = {} # {node_id: counter}
def increment(self, node_id): """本地事件:递增自己的计数器""" self.clock[node_id] = self.clock.get(node_id, 0) + 1
def merge(self, other_clock): """接收消息:合并对方的时钟,取每个分量的最大值""" for node_id, counter in other_clock.items(): self.clock[node_id] = max( self.clock.get(node_id, 0), counter )
def happens_before(self, other): """判断 self 是否 happens-before other""" all_less_or_equal = True has_strictly_less = False for node_id in set(list(self.clock.keys()) + list(other.keys())): a = self.clock.get(node_id, 0) b = other.get(node_id, 0) if a > b: all_less_or_equal = False break if a < b: has_strictly_less = True return all_less_or_equal and has_strictly_less3.2 因果一致性强于最终一致性
最终一致性只保证”如果没有新的写入,最终所有副本会收敛到相同的值”——但收敛之前,不同节点可能看到完全不同的顺序。因果一致性在此基础上增加了关键保证:
| 保证 | 最终一致性 | 因果一致性 |
|---|---|---|
| 收敛性 | 最终一致 | 最终一致 |
| 因果顺序 | 不保证 | 保证 |
| 并发顺序 | 不保证 | 不保证(允许不同节点看到不同顺序) |
| 回退读取 | 可能发生 | 不会发生(对因果相关的操作) |
3.3 Lamport 时间戳
Lamport 时间戳是捕获因果关系的经典方法。每个节点维护一个逻辑时钟,规则如下:
- 本地事件:递增自己的时钟
- 发送消息:递增时钟,将时钟值附在消息上
- 接收消息:将时钟设为
max(本地时钟, 消息时钟) + 1
class LamportClock: """Lamport 逻辑时钟""" def __init__(self, node_id: int): self.node_id = node_id self.counter = 0
def local_event(self) -> int: """本地事件""" self.counter += 1 return self.counter
def send_event(self) -> tuple[int, int]: """发送事件:返回 (counter, node_id) 作为时间戳""" self.counter += 1 return (self.counter, self.node_id)
def recv_event(self, msg_timestamp: tuple[int, int]) -> int: """接收事件:取 max 后 +1""" msg_counter, _ = msg_timestamp self.counter = max(self.counter, msg_counter) + 1 return self.counter
@staticmethod def compare(ts_a: tuple[int, int], ts_b: tuple[int, int]) -> int: """比较两个 Lamport 时间戳:先比 counter,再比 node_id""" if ts_a[0] != ts_b[0]: return -1 if ts_a[0] < ts_b[0] else 1 return -1 if ts_a[1] < ts_b[1] else (1 if ts_a[1] > ts_b[1] else 0)Lamport 时间戳能给出全序,但这个全序不等于因果顺序——两个并发事件可能被 Lamport 时间戳排了先后,但它们之间并没有因果关系。要精确捕获因果关系,需要使用向量时钟。
3.4 向量时钟
向量时钟(Vector Clock)是 Lamport 时间戳的扩展,为每个节点维护独立的计数器,能精确判断两个事件的因果关系:
from typing import Dict
class VectorClock: """向量时钟:精确捕获因果关系""" def __init__(self, node_id: int, all_nodes: list[int]): self.node_id = node_id self.clock: Dict[int, int] = {n: 0 for n in all_nodes}
def local_event(self): self.clock[self.node_id] += 1 return dict(self.clock)
def send_event(self): self.clock[self.node_id] += 1 return dict(self.clock)
def recv_event(self, msg_clock: Dict[int, int]): for node, counter in msg_clock.items(): self.clock[node] = max(self.clock.get(node, 0), counter) self.clock[self.node_id] += 1 return dict(self.clock)
@staticmethod def compare(a: Dict[int, int], b: Dict[int, int]) -> str: """ 比较两个向量时钟: - 'before': a happens-before b - 'after': b happens-before a - 'concurrent': 并发(无因果关系) - 'equal': 相同 """ a_less = any(a.get(n, 0) < b.get(n, 0) for n in set(a) | set(b)) b_less = any(b.get(n, 0) < a.get(n, 0) for n in set(a) | set(b))
if not a_less and not b_less: return 'equal' if a_less and not b_less: return 'before' if b_less and not a_less: return 'after' return 'concurrent' # 两个方向都有分量更小 → 并发四、全序广播
4.1 原子广播
全序广播(Total Order Broadcast)也称为原子广播(Atomic Broadcast),它保证:
- 全序性:所有节点以相同的顺序接收消息
- 无遗漏:如果某个节点接收了消息 m,则所有在 m 之前发送的消息都被接收
- 可靠性:消息不会丢失,也不会被重复接收
全序广播本质上就是复制日志——每条消息是一个日志条目,所有节点看到相同的日志序列。
4.2 全序广播 = 复制日志
// 全序广播的抽象接口type TotalOrderBroadcast interface { // 广播一条消息:所有节点以相同顺序接收 Broadcast(msg []byte) error
// 接收下一条消息:按全局确定的顺序返回 Receive() (msg []byte, err error)}
// 基于全序广播实现复制日志type ReplicatedLog struct { broadcast TotalOrderBroadcast log [][]byte commitIdx int}
func (rl *ReplicatedLog) Append(entry []byte) error { // 将条目广播到所有节点,全序广播保证顺序一致 return rl.broadcast.Broadcast(entry)}
func (rl *ReplicatedLog) Read(offset int) ([]byte, error) { // 所有节点在相同 offset 读到相同内容 if offset >= len(rl.log) { return nil, fmt.Errorf("offset %d not yet committed", offset) } return rl.log[offset], nil}4.3 用全序广播实现线性化
全序广播与线性化有微妙但重要的关系:
- 全序广播本身不是线性化的:它保证消息按相同顺序传递,但不保证消息何时被传递(没有实时约束)
- 但可以用全序广播实现线性化:通过在日志中追加一条”读请求”,等该条目被提交后返回结果
# 用全序广播实现线性化读写class LinearizableKVStore: def __init__(self, broadcast): self.broadcast = broadcast # 全序广播 self.state = {} # 本地状态 self.pending_reads = {} # 等待中的读请求
def write(self, key, value): """线性化写:将写操作追加到日志""" entry = {"op": "write", "key": key, "value": value} self.broadcast.broadcast(entry) # 等待条目被提交(简化示意)
def read(self, key): """线性化读:将读请求也追加到日志""" request_id = generate_uuid() entry = {"op": "read", "key": key, "request_id": request_id} self.broadcast.broadcast(entry) # 等待该条目被提交后返回结果 return self.pending_reads.pop(request_id)
def apply_entry(self, entry): """按日志顺序应用条目""" if entry["op"] == "write": self.state[entry["key"]] = entry["value"] elif entry["op"] == "read": # 读操作被排在日志中,保证了线性化语义 self.pending_reads[entry["request_id"]] = \ self.state.get(entry["key"])Zab(ZooKeeper 的广播协议)和 Raft 的日志复制本质上都是全序广播的实现。它们通过领导者确定消息顺序,再将该顺序复制到所有节点——领导者就是”顺序的决定者”。
五、Raft 共识算法
Raft 是以可理解性为核心设计目标的共识算法。与 Paxos 相比,Raft 通过将共识问题分解为三个相对独立的子问题,大幅降低了理解难度:
- 领导者选举:如何选出一个领导者
- 日志复制:领导者如何将日志复制到所有节点
- 安全性:如何保证任何提交的日志条目不会被覆盖
5.1 领导者选举
Raft 任何时候最多有一个领导者,所有写请求都由领导者处理。领导者通过心跳维持权威;跟随者检测到心跳超时后发起选举。
关键概念:
- 任期(Term):逻辑时钟,每次选举递增。一个任期最多一个领导者
- 心跳超时:跟随者在选举超时(election timeout)内未收到心跳,则发起选举
- 投票规则:每个节点在一个任期内最多投一票,先到先得
// Raft 节点状态const ( Follower = "Follower" Candidate = "Candidate" Leader = "Leader")
type RaftNode struct { nodeID int state string currentTerm int votedFor int // 当前任期投给了谁 log []LogEntry commitIndex int lastApplied int
// 选举相关 electionTimeout time.Duration heartbeatTimer *time.Timer
// 领导者相关(仅 Leader 使用) nextIndex []int // 每个跟随者的下一条日志索引 matchIndex []int // 每个跟随者的已匹配日志索引}
// 发起选举func (rn *RaftNode) startElection() { rn.state = Candidate rn.currentTerm++ rn.votedFor = rn.nodeID // 投给自己
votesReceived := 1 // 自己的一票
// 向所有其他节点请求投票 for _, peer := range rn.peers { go func(p int) { args := RequestVoteArgs{ Term: rn.currentTerm, CandidateID: rn.nodeID, LastLogIndex: len(rn.log) - 1, LastLogTerm: rn.log[len(rn.log)-1].Term, } reply := rn.sendRequestVote(p, args) if reply.VoteGranted { votesReceived++ if votesReceived > len(rn.peers)/2+1 && rn.state == Candidate { rn.becomeLeader() } } }(peer) }}5.2 日志复制
领导者收到客户端请求后,将操作追加到本地日志,然后通过 AppendEntries RPC 将日志条目复制到所有跟随者。当多数节点确认接收后,条目被视为已提交(committed)。
// AppendEntries RPC 处理(跟随者侧)func (rn *RaftNode) handleAppendEntries(args AppendEntriesArgs) AppendEntriesReply { reply := AppendEntriesReply{}
// 规则 1:任期检查 if args.Term < rn.currentTerm { reply.Success = false reply.Term = rn.currentTerm return reply }
// 规则 2:日志匹配检查 if args.PrevLogIndex >= 0 { if args.PrevLogIndex >= len(rn.log) || rn.log[args.PrevLogIndex].Term != args.PrevLogTerm { // 日志不一致,拒绝,让领导者回退 reply.Success = false reply.Term = rn.currentTerm return reply } }
// 规则 3:追加新条目(处理冲突:截断不一致的条目) for i, entry := range args.Entries { logIdx := args.PrevLogIndex + 1 + i if logIdx < len(rn.log) { if rn.log[logIdx].Term != entry.Term { // 冲突:截断从此处开始的所有条目 rn.log = rn.log[:logIdx] rn.log = append(rn.log, entry) } // 否则:已存在且一致,跳过 } else { rn.log = append(rn.log, entry) } }
// 规则 4:更新提交索引 if args.LeaderCommit > rn.commitIndex { lastNewIdx := args.PrevLogIndex + len(args.Entries) rn.commitIndex = min(args.LeaderCommit, lastNewIdx) }
reply.Success = true reply.Term = rn.currentTerm return reply}5.3 安全性保证
Raft 的安全性由两条核心规则保证:
选举限制:候选人必须拥有所有已提交的日志条目才能当选。RequestVote RPC 包含候选人的最后日志信息,投票者只在候选人的日志至少和自己一样新时才投票。
// 投票者的选举限制检查func (rn *RaftNode) handleRequestVote(args RequestVoteArgs) RequestVoteReply { reply := RequestVoteReply{Term: rn.currentTerm}
// 任期检查 if args.Term < rn.currentTerm { reply.VoteGranted = false return reply }
// 投票限制:每个任期最多投一票 if rn.votedFor != -1 && rn.votedFor != args.CandidateID { reply.VoteGranted = false return reply }
// 选举限制:候选人的日志必须至少和自己一样新 lastLogIdx := len(rn.log) - 1 lastLogTerm := rn.log[lastLogIdx].Term if args.LastLogTerm < lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex < lastLogIdx) { reply.VoteGranted = false return reply }
// 通过所有检查,授予投票 rn.votedFor = args.CandidateID rn.currentTerm = args.Term reply.VoteGranted = true return reply}日志匹配性质:如果两个日志在相同索引处有相同任期的条目,则该索引之前的所有条目也相同。这由 AppendEntries 的一致性检查保证——PrevLogIndex 和 PrevLogTerm 确保了日志的前缀一致。
六、Paxos 算法
6.1 Basic Paxos
Paxos 由 Leslie Lamport 于 1990 年提出,是共识算法的理论基石。Basic Paxos 解决的是:让一组节点对单个值达成一致。
算法涉及三种角色:提议者(Proposer)、接受者(Acceptor)、学习者(Learner),分为两个阶段:
阶段一:Prepare
- Proposer 选择提案号 n,向所有 Acceptor 发送
Prepare(n) - Acceptor 收到
Prepare(n)后:- 如果 n 大于已承诺的任何提案号,承诺不再接受编号小于 n 的提案,并返回已接受的编号最大的提案(如果有)
- 否则拒绝
阶段二:Accept
- Proposer 收到多数 Acceptor 的 Promise 后:
- 如果返回中有已接受的提案,用编号最大的提案值作为自己的提案值
- 否则可以使用任意值
- 向所有 Acceptor 发送
Accept(n, value) - Acceptor 收到
Accept(n, value)后:如果 n ≥ 已承诺的编号,则接受
# Basic Paxos 的 Acceptor 实现class Acceptor: def __init__(self, acceptor_id: int): self.acceptor_id = acceptor_id self.promised_n = -1 # 承诺过的最大提案号 self.accepted_n = -1 # 已接受的最大提案号 self.accepted_value = None # 已接受的值
def prepare(self, n: int) -> dict: """处理 Prepare 请求""" if n > self.promised_n: self.promised_n = n return { "ok": True, "accepted_n": self.accepted_n, "accepted_value": self.accepted_value } return {"ok": False} # 拒绝:已承诺更高编号
def accept(self, n: int, value) -> dict: """处理 Accept 请求""" if n >= self.promised_n: self.promised_n = n self.accepted_n = n self.accepted_value = value return {"ok": True} return {"ok": False} # 拒绝:已承诺更高编号
# Basic Paxos 的 Proposer 实现class Proposer: def __init__(self, proposer_id: int, acceptors: list): self.proposer_id = proposer_id self.acceptors = acceptors
def propose(self, value) -> bool: """两阶段提案""" n = self.generate_proposal_number()
# 阶段一:Prepare promises = [] for acc in self.acceptors: resp = acc.prepare(n) if resp["ok"]: promises.append(resp)
if len(promises) < len(self.acceptors) // 2 + 1: return False # 未获多数承诺
# 如果有已接受的值,必须使用编号最大的那个 max_accepted_n = -1 for p in promises: if p["accepted_n"] > max_accepted_n: max_accepted_n = p["accepted_n"] value = p["accepted_value"]
# 阶段二:Accept accepted = 0 for acc in self.acceptors: resp = acc.accept(n, value) if resp["ok"]: accepted += 1
return accepted >= len(self.acceptors) // 2 + 1
def generate_proposal_number(self) -> int: """生成全局唯一且递增的提案号""" # 简化实现:时间戳 * 节点数 + proposer_id import time return int(time.time() * 1000) * 100 + self.proposer_id6.2 Multi-Paxos
Basic Paxos 对每个值都需要两轮 RPC(Prepare + Accept),效率很低。Multi-Paxos 通过选出一个稳定的领导者来优化:
- 领导者可以直接跳过 Prepare 阶段,连续发送 Accept 请求
- 只有在领导者切换时才需要重新执行 Prepare 阶段
- 这本质上就是 Raft 的日志复制机制
6.3 Raft 与 Paxos 对比
| 维度 | Paxos (Basic/Multi) | Raft |
|---|---|---|
| 设计目标 | 最小化理论证明 | 可理解性 |
| 领导者 | Multi-Paxos 隐含领导者 | 显式领导者选举 |
| 日志管理 | 允许日志空洞 | 不允许日志空洞(简化实现) |
| 选举 | 无显式选举过程 | 任期制 + 随机超时选举 |
| 修改提议 | 允许覆盖未提交条目 | 领导者从不覆盖自己的日志 |
| 理论基础 | 数学证明完备 | 等价于 Paxos 的正确性 |
| 工程实现 | 难(论文未给出细节) | 相对容易(论文详细描述) |
| 典型系统 | Google Chubby, Spanner | etcd, Consul, TiKV, CockroachDB |
选择 Raft 还是 Paxos?如果你从零实现一个共识模块,Raft 几乎总是更好的选择——它的可理解性直接转化为更少的实现 bug。Google Spanner 和 CockroachDB 最初使用 Paxos 系变体,但 CockroachDB 后来切换到了 Raft。只有当你需要 Multi-Paxos 的某些高级特性(如并发日志空洞填充)时,Paxos 才有优势。
七、Fencing Token
7.1 脑裂问题
在 分布式事务 中讨论 2PC 时,提到协调者故障可能导致阻塞。但在基于领导者的系统中,还有一个更隐蔽的问题:脑裂(Split Brain)。
当网络分区发生时,旧领导者可能仍然认为自己是领导者,同时新领导者已经被选出——此时系统中有两个领导者同时接受写入,数据必然不一致。
7.2 Fencing Token 机制
Fencing Token 是解决脑裂问题的经典方案。核心思想:每次领导者变更时,颁发一个单调递增的 Token;所有对共享资源的访问必须携带 Token,资源方拒绝旧 Token 的请求。
// Fencing Token 防脑裂type FencingService struct { currentToken int64 mu sync.Mutex}
// 领导者获得新的 Fencing Token(由共识服务颁发)func (fs *FencingService) NewToken() int64 { fs.mu.Lock() defer fs.mu.Unlock() fs.currentToken++ return fs.currentToken}
// 共享资源(如分布式锁服务、存储系统)type SharedResource struct { lastSeenToken int64 mu sync.Mutex}
// 所有请求必须携带 Fencing Tokenfunc (sr *SharedResource) Write(token int64, key string, value string) error { sr.mu.Lock() defer sr.mu.Unlock()
if token < sr.lastSeenToken { return fmt.Errorf("fencing error: token %d < last seen %d, "+ "possible stale leader", token, sr.lastSeenToken) }
sr.lastSeenToken = token // 执行写入 sr.data[key] = value return nil}7.3 ZooKeeper 的实现
ZooKeeper 通过 临时顺序节点(Ephemeral Sequential Node) 实现了 Fencing Token:
- 客户端创建临时顺序节点
/lock-,ZooKeeper 自动追加序号 →/lock-0000000001 - 序号最小的客户端获得锁
- 客户端故障时,临时节点自动删除
- 序号天然单调递增,可作为 Fencing Token
# ZooKeeper 分布式锁 + Fencing Tokenclass ZKDistributedLock: def __init__(self, zk_client, lock_path="/locks"): self.zk = zk_client self.lock_path = lock_path self.my_node = None self.fencing_token = None
def acquire(self) -> int: """获取锁,返回 Fencing Token""" # 1. 创建临时顺序节点 self.my_node = self.zk.create( f"{self.lock_path}/lock-", ephemeral=True, sequential=True ) # my_node = "/locks/lock-0000000042"
# 2. 提取序号作为 Fencing Token self.fencing_token = int(self.my_node.split("-")[-1])
# 3. 检查是否获得锁(序号最小) children = self.zk.get_children(self.lock_path) children.sort()
if self.my_node == f"{self.lock_path}/{children[0]}": return self.fencing_token # 获得锁
# 4. 等待前一个节点删除(Watch 机制) prev_idx = children.index(self.my_node.split('/')[-1]) - 1 prev_node = f"{self.lock_path}/{children[prev_idx]}" self.zk.wait_for_deletion(prev_node) return self.fencing_token
def release(self): """释放锁:删除临时节点""" if self.my_node: self.zk.delete(self.my_node) self.my_node = None7.4 Fencing Token 与租约的对比
| 维度 | Fencing Token | 租约(Lease) |
|---|---|---|
| 机制 | 单调递增 Token,拒绝旧 Token | 时间窗口,过期自动失效 |
| 依赖 | 需要共享资源配合检查 | 依赖时钟同步 |
| 时钟敏感 | 否 | 是(时钟漂移导致问题) |
| 安全性 | 强(即使旧主不配合也无法写入) | 弱(时钟漂移可能导致旧租约未过期) |
| 典型应用 | ZooKeeper, etcd | GFS, HDFS NameNode |
八、总结
8.1 核心概念回顾
从一致性模型到共识算法,本章覆盖了分布式一致性的完整理论链条:
| 概念 | 核心思想 | 关键保证 |
|---|---|---|
| 线性化 | 操作在调用-响应间原子生效 | 实时顺序 |
| 因果一致性 | 因果相关操作按序可见 | 因果顺序 |
| 全序广播 | 所有节点按相同顺序接收消息 | 全序 + 可靠 |
| Raft | 领导者 + 日志复制 + 多数投票 | 共识(安全 + 活跃) |
| Paxos | Prepare/Accept 两阶段 | 共识(理论完备) |
| Fencing Token | 单调递增令牌拒绝旧主 | 防脑裂 |
8.2 一致性与性能的权衡
8.3 如何选择一致性方案
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 配置中心、分布式锁 | 线性化(Raft/Zab) | 配置不一致导致严重故障 |
| 金融交易 | 严格可串行化 | 事务正确性不可妥协 |
| 社交网络信息流 | 因果一致性 | 用户体验要求因果顺序,但允许并发乱序 |
| 电商库存 | 线性化(单键) | 超卖不可接受 |
| 日志收集、监控 | 最终一致性 | 延迟几秒可接受,吞吐优先 |
| 多活架构 | 因果一致性 + 冲突解决 | 可用性优先,最终收敛 |
8.4 与前后章的联系
一致性理论不是孤立存在的。在 事务与并发控制 中,讨论了单机事务的隔离级别——它们是串行化范畴的概念,不涉及实时约束。在 分布式事务 中,2PC 解决的是跨节点原子提交问题,而共识算法解决的是值的一致性问题——两者看似相似,但原子提交要求所有参与者都同意提交,而共识允许少数节点故障。在 分库分表与 NewSQL 中,可以看到 TiDB 和 CockroachDB 如何基于 Raft 共识构建分布式事务——正是本章理论在工程中的落地。
一致性与共识是分布式系统最核心的理论基础。理解线性化的代价、因果一致性的实用价值、Raft 的工程优势,以及 Fencing Token 的防脑裂机制,是设计任何分布式数据系统的必备能力。没有”万能”的一致性方案——只有适合业务场景的权衡。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






