本文主要介紹zookeeper中zookeeper Server leader的選舉,zookeeper在選舉leader的時候采用了paxos算法(主要是fast paxos),這里主要介紹其中兩種:LeaderElection 和FastLeaderElection.
我們先要清楚以下幾點
在zookeeper中,一個zookeeper集群有多少個Server是固定,每個Server用于選舉的IP和PORT都在配置文件中
- 除了IP和PORT能標識一個Server外,還有沒有別的方法
每一個Server都有一個數字編號,而且是唯一的,我們根據配置文件中的配置來對每一個Server進行編號,這一步在部署時需要人工去做,需要在存儲數據文件的目錄中創建一個文件叫myid的文件,并寫入自己的編號,這個編號在處理我提交的value相同很有用
獲得n/2 + 1個Server同意(這里意思是n/2 + 1個Server要同意擁有zxid是所有Server最大的哪個Server)
zookeeper中選舉主要是采用UDP,也一種實現是采用TCP,在這里介紹的兩種實現采用的是UDP
LOOKING 初始化狀態
LEADING 領導者狀態
FOLLOWING 跟隨者狀態
- 如果所有zxid都相同(例如: 剛初始化時),此時有可能不能形成n/2+1個Server,怎么辦
zookeeper中每一個Server都有一個ID,這個ID是不重復的,而且按大小排序,如果遇到這樣的情況時,zookeeper就推薦ID最大的哪個Server作為Leader
- zookeeper中Leader怎么知道Fllower還存活,Fllower怎么知道Leader還存活
Leader定時向Fllower發ping消息,Fllower定時向Leader發ping消息,當發現Leader無法ping通時,就改變自己的狀態(LOOKING),發起新的一輪選舉
名詞解釋
zookeeer Server: zookeeper中一個Server,以下簡稱Server
zxid(zookeeper transtion id): zookeeper 事務id,他是選舉過程中能否成為leader的關鍵因素,它決定當前Server要將自己這一票投給誰(也就是我在選舉過程中的value,這只是其中一個,還有id)
myid/id(zookeeper server id): zookeeper server id ,他也是能否成為leader的一個因素
epoch/logicalclock:他主要用于描述leader是否已經改變,每一個Server中啟動都會有一個epoch,初始值為0,當 開始新的一次選舉時epoch加1,選舉完成時 epoch加1。
tag/sequencer:消息編號
xid:隨機生成的一個數字,跟epoch功能相同
Fast Paxos消息流向圖與Basic Paxos的對比
消息流向圖
Client Proposer Acceptor Learner
| | | | | | |
X-------->| | | | | | Request
| X--------->|->|->| | | Prepare(N)//向所有Server提議
| |<---------X--X--X | | Promise(N,{Va,Vb,Vc})//向提議人回復是否接受提議(如果不接受回到上一步)
| X--------->|->|->| | | Accept!(N,Vn)//向所有人發送接受提議消息
| |<---------X--X--X------>|->| Accepted(N,Vn)//向提議人回復自己已經接受提議)
|<---------------------------------X--X Response
| | | | | | |
沒有沖突的選舉過程
Client Leader Acceptor Learner
| | | | | | | |
| X--------->|->|->|->| | | Any(N,I,Recovery)
| | | | | | | |
X------------------->|->|->|->| | | Accept!(N,I,W)//向所有Server提議,所有Server收到消息后,接受提議
| |<---------X--X--X--X------>|->| Accepted(N,I,W)//向提議人發送接受提議的消息
|<------------------------------------X--X Response(W)
| | | | | | | |
第一種實現: LeaderElection
LeaderElection是Fast paxos最簡單的一種實現,每個Server啟動以后都詢問其它的Server它要投票給誰,收到所有Server回復以后,就計算出zxid最大的哪個Server,并將這個Server相關信息設置成下一次要投票的Server
每個Server都有一個response線程和選舉線程,我們先看一下每個線程是做一些什么事情
response線程
它主要功能是被動的接受對方法的請求,并根據當前自己的狀態作出相應的回復,每次回復都有自己的Id,以及xid,我們根據他的狀態來看一看他都回復了哪些內容
LOOKING狀態:
自己要推薦的Server相關信息(id,zxid)
LEADING狀態
myid,上一次推薦的Server的id
FLLOWING狀態:
當前Leader的id,以及上一次處理的事務ID(zxid)
選舉線程
選舉線程由當前Server發起選舉的線程擔任,他主要的功能對投票結果進行統計,并選出推薦的Server。選舉線程首先向所有Server發起一次詢問(包括自己),被詢問方,根據自己當前的狀態作相應的回復,選舉線程收到回復后,驗證是否是自己發起的詢問(驗證 xid是否一致),然后獲取對方的id(myid),并存儲到當前詢問對象列表中,最后獲取對方提議的leader相關信息(id,zxid),并將這些 信息存儲到當次選舉的投票記錄表中,當向所有Server都詢問完以后,對統計結果進行篩選并進行統計,計算出當次詢問后獲勝的是哪一個 Server,并將當前zxid最大的Server設置為當前Server要推薦的Server(有可能是自己,也有可以是其它的Server,根據投票 結果而定,但是每一個Server在第一次投票時都會投自己),如果此時獲勝的Server獲得n/2 + 1的Server票數, 設置當前推薦的leader為獲勝的Server,將根據獲勝的Server相關信息設置自己的狀態。每一個Server都重復以上流程,直到選出 leader
了解每個線程的功能以后,我們來看一看選舉過程
當一個Server啟動時它都會發起一次選舉,此時由選舉線程發起相關流程,那么每個Server都會獲得當前zxid最大的哪個Server是誰,如果當次最大的Server沒有獲得n/2+1個票數,那么下一次投票時,他將向zxid最大的Server投票,重復以上流程,最后一定能選舉出一個Leader
只要保證n/2+1個Server存活就沒有任何問題,如果少于n/2+1個Server存活就沒辦法選出Leader
當選舉出Leader以后,此時每個Server應該是什么狀態(FLLOWING)都已經確定,此時由于Leader已經死亡我們就不管它,其它的Fllower按正常的流程繼續下去,當完成這個流程以后,所有的Fllower都會向Leader發送Ping消息,如果無法ping通,就改變自己的狀態為(FLLOWING ==> LOOKING),發起新的一輪選舉
這個過程的處理跟選舉過程中Leader死亡處理方式一樣,這里就不再描述
第二種實現: FastLeaderElection
fastLeaderElection是標準的fast paxos的實現,它首先向所有Server提議自己要成為leader,當其它Server收到提議以后,解決epoch和zxid的沖突,并接受對方的提議,然后向對方發送接受提議完成的消息
數據結構
本地消息結構:
static public class Notification {
long leader; //所推薦的Server id
long zxid; //所推薦的Server的zxid(zookeeper transtion id)
long epoch; //描述leader是否變化(每一個Server啟動時都有一個logicalclock,初始值為0)
QuorumPeer.ServerState state; //發送者當前的狀態
InetSocketAddress addr; //發送者的ip地址
}
網絡消息結構:
static public class ToSend {
int type; //消息類型
long leader; //Server id
long zxid; //Server的zxid
long epoch; //Server的epoch
QuorumPeer.ServerState state; //Server的state
long tag; //消息編號
InetSocketAddress addr;
}
Server具體的實現
每個Server都一個接收線程池(3個線程)和一個發送線程池 (3個線程),在沒有發起選舉時,這兩個線程池處于阻塞狀態,直到有消息到來時才解除阻塞并處理消息,同時每個Server都有一個選舉線程(可以發起 選舉的線程擔任);我們先看一下每個線程所做的事情,如下:
被動接收消息端(接收線程池)的處理:
notification: 首先檢測當前Server上所被推薦的zxid,epoch是否合法(currentServer.epoch <= currentMsg.epoch && (currentMsg.zxid > currentServer.zxid || (currentMsg.zxid == currentServer.zxid && currentMsg.id > currentServer.id))) 如果不合法就用消息中的zxid,epoch,id更新當前Server所被推薦的值,此時將收到的消息轉換成Notification消息放入接收隊列中,將向對方發送ack消息
ack: 將消息編號放入ack隊列中,檢測對方的狀態是否是LOOKING狀態,如果不是說明此時已經有Leader已經被選出來,將接收到的消息轉發成Notification消息放入接收對隊列
主動發送消息端(發送線程池)的處理:
notification: 將要發送的消息由Notification消息轉換成ToSend消息,然后發送對方,并等待對方的回復,如果在等待結束沒有收到對方法回復,重做三次,如果重做次還是沒有收到對方的回復時檢測當前的選舉(epoch)是否已經改變,如果沒有改變,將消息再次放入發送隊列中,一直重復直到有Leader選出或者收到對方回復為止
ack: 主要將自己相關信息發送給對方
主動發起選舉端(選舉線程)的處理:
首先自己的epoch 加1,然后生成notification消息,并將消息放入發送隊列中,系統中配置有幾個Server就生成幾條消息,保證每個Server都能收到此消息,如果當前Server的狀態是LOOKING就一直循環檢查接收隊列是否有消息,如果有消息,根據消息中對方的狀態進行相應的處理。
LOOKING狀態:
首先檢測消息中epoch是否合法,是否比當前Server的大,如果比較當前Server的epoch大時,更新epoch,檢測是消息中的zxid,id是否比當前推薦的Server大,如果是更新相關值,并新生成notification消息放入發關隊列,清空投票統計表; 如果消息小的epoch則什么也不做; 如果相同檢測消息中zxid,id是否合法,如果消息中的zxid,id大,那么更新當前Server相關信息,并新生成notification消息放入發送隊列,將收到的消息的IP和投票結果放入統計表中,并計算統計結果,根據結果設置自己相應的狀態
LEADING狀態:
將收到的消息的IP和投票結果放入統計表中(這里的統計表是獨立的),并計算統計結果,根據結果設置自己相應的狀態
FOLLOWING狀態:
將收到的消息的IP和投票結果放入統計表中(這里的統計表是獨立的),并計算統計結果,根據結果設置自己相應的狀態
了解每個線程的功能以后,我們來看一看選舉過程,選舉過程跟第一程一樣
當一個Server啟動時它都會發起一次選舉,此時由選舉線程發起相關流程,通過將自己的zxid和epoch告訴其它Server,最后每個Server都會得zxid值最大的哪個Server的相關信息,并且在下一次投票時就投zxid值最大的哪個Server,重復以上流程,最后一定能選舉出一個Leader
只要保證n/2+1個Server存活就沒有任何問題,如果少于n/2+1個Server存活就沒辦法選出Leader
當選舉出Leader以后,此時每個Server應該是什么狀態 (FLLOWING)都已經確定,此時由于Leader已經死亡我們就不管它,其它的Fllower按正常的流程繼續下去,當完成這個流程以后,所有的 Fllower都會向Leader發送Ping消息,如果無法ping通,就改變自己的狀態為(FLLOWING ==> LOOKING),發起新的一輪選舉
這個過程的處理跟選舉過 程中Leader死亡處理方式一樣,這里就不再描述