etcd核心概念与MVCC机制 Raft共识算法详解 Raft核心原理 角色状态转换 stateDiagram-v2
[*] --> Follower : 启动
Follower --> Candidate : 选举超时
Candidate --> Leader : 获得多数投票
Candidate --> Follower : 发现更高任期
Leader --> Follower : 发现更高任期/失去多数支持
Follower --> Follower : 收到心跳
Candidate --> Candidate : 选票分裂,重新选举
note right of Follower
- 响应来自Leader和Candidate的RPC
- 不会主动发起请求
- 选举超时后成为Candidate
end note
note right of Candidate
- 增加当前任期
- 为自己投票
- 并行向其他服务器发送RequestVote RPC
end note
note right of Leader
- 接受客户端请求
- 向Follower发送心跳
- 复制日志条目
end note
Leader选举算法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 function startElection(): currentTerm++ votedFor = self resetElectionTimeout() voteCount = 1 for each server in cluster: send RequestVote RPC to server function onReceiveVote(vote): if vote.granted: voteCount++ if voteCount > (clusterSize / 2): becomeLeader() else : if vote.term > currentTerm: currentTerm = vote.term becomeFollower()
日志复制机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 type LogEntry struct { Term uint64 Index uint64 Type EntryType Data []byte } type AppendEntriesRequest struct { Term uint64 LeaderID uint64 PrevLogIndex uint64 PrevLogTerm uint64 Entries []LogEntry LeaderCommit uint64 } type AppendEntriesResponse struct { Term uint64 Success bool MatchIndex uint64 }
Raft安全性保证 选举安全性
每个任期最多一个Leader : 通过多数投票机制保证
Leader完整性 : 新Leader必须包含所有已提交的日志条目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.VoteGranted = false return } if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.votedFor = -1 rf.state = Follower } reply.Term = rf.currentTerm if rf.votedFor != -1 && rf.votedFor != args.CandidateId { reply.VoteGranted = false return } lastLogIndex := len (rf.log) - 1 lastLogTerm := rf.log[lastLogIndex].Term logUpToDate := args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) if logUpToDate { rf.votedFor = args.CandidateId rf.resetElectionTimeout() reply.VoteGranted = true } else { reply.VoteGranted = false } }
MVCC多版本并发控制 版本控制模型 全局版本号机制 1 2 3 4 5 6 7 8 9 10 11 12 Global Revision: 12345 ├── Key: /registry/pods/default/pod-1 │ ├── CreateRevision: 12340 │ ├── ModRevision: 12345 │ ├── Version: 3 │ └── Value: {pod data} ├── Key: /registry/pods/default/pod-2 │ ├── CreateRevision: 12341 │ ├── ModRevision: 12341 │ ├── Version: 1 │ └── Value: {pod data}
MVCC存储结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type KeyValue struct { Key []byte CreateRevision int64 ModRevision int64 Version int64 Value []byte Lease int64 } type TreeIndex struct { tree BTree } type keyIndex struct { key []byte modified revision generations []generation } type generation struct { ver int64 created revision revs []revision }
事务隔离级别 快照隔离(Snapshot Isolation) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 type RangeRequest struct { Key []byte RangeEnd []byte Limit int64 Revision int64 } func (s *store) Range(r RangeRequest) (*RangeResult, error ) { if r.Revision <= 0 { r.Revision = s.currentRev() } kvs, rev, err := s.rangeKeys(r.Key, r.RangeEnd, r.Limit, r.Revision) if err != nil { return nil , err } return &RangeResult{ KVs: kvs, Rev: rev, }, nil }
乐观并发控制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 type Compare struct { Result Compare_CompareResult Target Compare_CompareTarget Key []byte TargetUnion interface {} } type TxnRequest struct { Compare []Compare Success []RequestOp Failure []RequestOp } func acquireLock (client *clientv3.Client, key string , value string ) error { resp, err := client.Txn(context.Background()). If(clientv3.Compare(clientv3.CreateRevision(key), "=" , 0 )). Then(clientv3.OpPut(key, value)). Else(clientv3.OpGet(key)). Commit() if err != nil { return err } if !resp.Succeeded { return errors.New("lock already held" ) } return nil }
Watch机制原理 Watch事件模型 事件类型定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 type Event struct { Type EventType Kv *KeyValue PrevKv *KeyValue } type EventType int32 const ( PUT EventType = 0 DELETE EventType = 1 ) type WatchRequest struct { RequestUnion interface {} } type WatchCreateRequest struct { Key []byte RangeEnd []byte StartRevision int64 ProgressNotify bool Filters []WatchCreateRequest_FilterType PrevKv bool WatchId int64 }
Watch实现机制 sequenceDiagram
participant C as Client
participant WS as WatchableStore
participant S as Syncer
participant MVCC as MVCC Store
C->>WS: CreateWatch
WS->>WS: 创建Watcher
WS->>C: WatchID
Note over MVCC: 键值变更
MVCC->>WS: 触发事件
WS->>S: 发送事件到Syncer
S->>S: 批量处理事件
S->>C: 发送Watch响应
loop 持续监听
MVCC->>WS: 更多事件
WS->>S: 事件处理
S->>C: Watch响应
end
C->>WS: CancelWatch
WS->>WS: 清理Watcher
Watch性能优化 事件聚合机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type syncer struct { watcherGroup watchergroup victims []watcherBatch } type watcherBatch struct { watchers []watcher revs []revision } func (s *syncer) sync() { var victims []watcherBatch s.watcherGroup.visit(func (w *watcher) bool { if w.needsNotification() { victims = append (victims, watcherBatch{ watchers: []watcher{*w}, revs: w.getPendingRevisions(), }) } return true }) for _, victim := range victims { s.sendEvents(victim) } }
Watch缓存策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type watchableStore struct { store *store mu sync.RWMutex watchers watcherGroup victims []watcherBatch victimc chan watcherBatch eventsBuffer []mvccpb.Event startRev int64 } func (ws *watchableStore) NewWatchStream() WatchStream { return &watchStream{ watchable: ws, ch: make (chan WatchResponse, chanBufLen), recvc: make (chan *WatchRequest, recvBufLen), } }
租约系统(Lease) 租约模型设计 租约生命周期 stateDiagram-v2
[*] --> Granted : Grant
Granted --> Active : KeepAlive
Active --> Active : Renew
Active --> Expired : Timeout
Granted --> Revoked : Revoke
Active --> Revoked : Revoke
Expired --> [*] : Cleanup
Revoked --> [*] : Cleanup
note right of Granted
初始状态
TTL倒计时开始
end note
note right of Active
正常状态
可以续租
end note
note right of Expired
过期状态
关联键被删除
end note
租约数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 type Lease struct { ID LeaseID ttl int64 remainingTTL int64 expiry time.Time mu sync.RWMutex itemSet map [LeaseItem]struct {} } type LeaseItem struct { Key []byte } type lessor struct { mu sync.RWMutex leaseMap map [LeaseID]*Lease leaseExpiredC chan []*Lease stopC chan struct {} doneC chan struct {} checkpointInterval time.Duration expiredC chan []*Lease } type Lessor interface { Grant(id LeaseID, ttl int64 ) (*Lease, error ) Revoke(id LeaseID) error Renew(id LeaseID) (int64 , error ) Lookup(id LeaseID) *Lease ExpiredLeasesC() <-chan []*Lease }
租约过期检测 TTL管理算法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 type LeaseTimeoutChecker struct { lessor *lessor ticker *time.Ticker stopC chan struct {} } func (ltc *LeaseTimeoutChecker) run() { defer ltc.ticker.Stop() for { select { case <-ltc.ticker.C: expired := ltc.findExpiredLeases() if len (expired) > 0 { ltc.lessor.expiredC <- expired } case <-ltc.stopC: return } } } func (ltc *LeaseTimeoutChecker) findExpiredLeases() []*Lease { ltc.lessor.mu.RLock() defer ltc.lessor.mu.RUnlock() now := time.Now() var expired []*Lease for _, lease := range ltc.lessor.leaseMap { if now.After(lease.expiry) { expired = append (expired, lease) } } return expired }
租约与键值关联 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (le *lessor) Attach(id LeaseID, items []LeaseItem) error { le.mu.Lock() defer le.mu.Unlock() lease := le.leaseMap[id] if lease == nil { return ErrLeaseNotFound } lease.mu.Lock() defer lease.mu.Unlock() for _, item := range items { lease.itemSet[item] = struct {}{} } return nil } func (le *lessor) revokeExpiredLeases() { expired := <-le.ExpiredLeasesC() for _, lease := range expired { le.mu.Lock() delete (le.leaseMap, lease.ID) le.mu.Unlock() lease.mu.RLock() for item := range lease.itemSet { le.backend.DeleteRange(item.Key, nil ) } lease.mu.RUnlock() } }
存储引擎深入 BoltDB存储原理 B+树结构 graph TB
subgraph "BoltDB B+树"
ROOT[Root Page Branch Node]
BRANCH1[Branch Page 1]
BRANCH2[Branch Page 2]
LEAF1[Leaf Page 1 Key1:Value1 Key2:Value2]
LEAF2[Leaf Page 2 Key3:Value3 Key4:Value4]
LEAF3[Leaf Page 3 Key5:Value5 Key6:Value6]
ROOT --> BRANCH1
ROOT --> BRANCH2
BRANCH1 --> LEAF1
BRANCH1 --> LEAF2
BRANCH2 --> LEAF3
LEAF1 -.-> LEAF2
LEAF2 -.-> LEAF3
end
页面管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 const ( branchPageFlag = 0x01 leafPageFlag = 0x02 metaPageFlag = 0x04 freelistPageFlag = 0x10 ) type page struct { id pgid flags uint16 count uint16 overflow uint32 } type Tx struct { writable bool managed bool db *DB meta *meta root Bucket pages map [pgid]*page stats TxStats commitHandlers []func () WriteFlag int }
etcd存储抽象层 Backend接口设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 type Backend interface { ReadTx() ReadTx BatchTx() BatchTx ConcurrentReadTx() ReadTx Snapshot() Snapshot Hash(ignores map [IgnoreKey]struct {}) (uint32 , error ) Size() int64 SizeInUse() int64 OpenReadTxN() int64 Defrag() error ForceCommit() Close() error } type ReadTx interface { Lock() Unlock() RLock() RUnlock() UnsafeRange(bucket Bucket, key, endKey []byte , limit int64 ) (keys [][]byte , vals [][]byte ) UnsafeForEach(bucket Bucket, visitor func (k, v []byte ) error ) error } type BatchTx interface { ReadTx UnsafePut(bucket Bucket, key []byte , value []byte ) UnsafeSeqPut(bucket Bucket, key []byte , value []byte ) UnsafeDelete(bucket Bucket, key []byte ) Commit() CommitAndStop() }
MVCC存储层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type store struct { cfg StoreConfig mu sync.RWMutex currentRev int64 b backend.Backend kvindex index le lease.Lessor compactMainRev int64 fifoSched schedule.Scheduler stopc chan struct {} donec chan struct {} } type StoreConfig struct { CompactionBatchLimit int CompactionSleepInterval time.Duration }
网络通信协议 gRPC协议栈 服务定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 syntax = "proto3" ; package etcdserverpb;service KV { rpc Range(RangeRequest) returns (RangeResponse) {} rpc Put(PutRequest) returns (PutResponse) {} rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {} rpc Txn(TxnRequest) returns (TxnResponse) {} rpc Compact(CompactionRequest) returns (CompactionResponse) {} } service Watch { rpc Watch(stream WatchRequest) returns (stream WatchResponse) {} }
负载均衡策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 type Balancer interface { Endpoint() (string , error ) Endpoints() []string HostPortError(hostPort string , err error ) HostPortSuccess(hostPort string ) UpdateAddrs(addrs ...string ) error Close() error } type roundRobinBalancer struct { mu sync.RWMutex endpoints []string next int } func (rrb *roundRobinBalancer) Endpoint() (string , error ) { rrb.mu.Lock() defer rrb.mu.Unlock() if len (rrb.endpoints) == 0 { return "" , ErrNoAvailableEndpoints } endpoint := rrb.endpoints[rrb.next] rrb.next = (rrb.next + 1 ) % len (rrb.endpoints) return endpoint, nil }
这是etcd核心概念的深度技术解析,涵盖了Raft共识算法、MVCC多版本控制、Watch机制和租约系统的完整实现原理。理解这些概念是掌握etcd和分布式系统设计的关键基础。
系列文章导航: