Zookeeper 源码分析

Zookeeper 源码分析,第1张

Zookeeper 源码分析

Zookeeper 源码分析
  • 算法基础
    • 拜占庭将军问题
    • Paxos 算法
      • Paxos算法描述:
      • Paxos算法流程
    • ZAB 协议
      • ZAB算法
      • Zab协议内容
        • 消息广播
        • 崩溃恢复
          • 崩溃恢复——Leader选举
          • 崩溃恢复——数据恢复
    • CAP
      • CAP理论概述
      • 用CAP理论来分析ZooKeeper
  • 源码详解
    • 辅助源码
      • 持久化源码
      • 序列化源码
    • ZK 服务端初始化源码解析
      • ZK服务端启动脚本分析
      • ZK服务端启动入口
      • 解析参数zoo.cfg和myid
      • 过期快照删除
      • 初始化通信组件
    • ZK 服务端加载数据源码解析
      • 冷启动数据恢复快照数据
      • 冷启动数据恢复编辑日志
      • 冷启动数据恢复快照数据
      • 冷启动数据恢复编辑日志
    • ZK 选举源码解析
      • Zookeeper选举机制——第一次启动
      • Zookeeper选举机制——非第一次启动
      • ZK选举源码解析
      • 选举准备
      • 选举执行
    • Follower 和 Leader 状态同步源码
      • Leader.lead()等待接收follower的状态同步申请
      • Follower.lead()查找并连接Leader
      • Leader.lead()创建LearnerHandler
      • Follower.lead()创建registerWithLeader
      • Leader.lead()接收Follwer状态,根据同步方式发送同步消息
      • Follower.lead()应答Leader同步结果
      • Leader.lead()应答Follower
    • 服务端 Leader 启动
    • 服务端 Follower 启动
    • 客户端启动
      • 客户端初始化源码解析
      • 创建ZookeeperMain
      • 初始化监听器
      • 解析连接地址
      • 创建通信
      • 执行run()

算法基础 拜占庭将军问题

拜占庭将军问题是一个协议问题, 拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。 问题是这些将军在地理上是分隔开来的, 并且将军中存在叛徒。 叛徒可以任意行动以达到以下目标: 欺骗某些将军采取进攻行动; 促成一个不是所有将军都同意的决定, 如当将军们不希望进攻时促成进攻行动; 或者迷惑某些将军, 使他们无法做出决定。 如果叛徒达到了这些目的之一, 则任何攻击行动的结果都是注定要失败的, 只有完全达成一致的努力才能获得胜利。

Paxos 算法

Paxos 算法: 一种基于消息传递且具有高度容错特性的一致性算法

Paxos 算法解决的问题: 就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性

Paxos算法描述:

在一个Paxos系统中,首先将所有节点划分为 Proposer(提议者), Acceptor(接受者) ,和 Learner(学习者) 。(注意:每个节点都可以身兼数职)

一个完整的Paxos算法流程分为三个阶段:

  • Prepare 准备阶段

Proposer向多个Acceptor发出Propose请求Promise(承诺)
Acceptor针对收到的Propose请求进行Promise(承诺)

  • Accept 接受阶段

Proposer收到多数Acceptor承诺的Promise后,向Acceptor发出Propose请求
Acceptor针对收到的Propose请求进行Accept处理

  • Learn 学习阶段:

Proposer将形成的决议发送给所有Learners

Paxos算法流程
  1. Prepare : Proposer 生成全局唯一且递增的 Proposal ID , 向所有 Acceptor 发送 Propose 请求, 这里无需携带提案内容, 只携带 Proposal ID 即可

  2. Promise : Acceptor 收到 Propose 请求后, 做出 " 两个承诺, 一个应答 "

不再接受 Proposal ID <= 当前请求的 Propose 请求
不再接受Proposal ID < 当前请求的 Accept 请求
不违背以前做出的承诺下, 回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID, 没有则返回空值

  1. Propose : Proposer 收到多数 Acceptor 的 Promise 应答后, 从应答中选择 Proposal ID 最大的提案的Value, 作为本次要发起的提案。 如果所有应答的提案Value均为空值, 则可以自己随意决定提案 Value。 然后携带当前 Proposal ID, 向所有 Acceptor 发送 Propose 请求

  2. Accept : Acceptor 收到 Propose 请求后, 在不违背自己之前做出的承诺下, 接受并持久化当前 Proposal ID 和提案 Value

  3. Learn : Proposer 收到多数 Acceptor 的 Accept 后, 决议形成, 将形成的决议发送给所有Learner

下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置 Learner

情况1:

有A1, A2, A3, A4, A5 5位议员,就税率问题进行决议

情况2:

Paxos 算法缺陷:在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况

情况3:

现在我们假设在A1提出提案的同时, A5决定将税率定为20%

  • A1, A5同时发起Propose(序号分别为1, 2)
  • A2承诺A1, A4承诺A5, A3行为成为关键
  • 情况2: A3先收到A1消息,承诺A1。之后立刻收到A5消息,承诺A5。
  • A1发起Proposal(1, 10%),无足够响应, A1重新Propose (序号3), A3再次承诺A1。
  • A5发起Proposal(2, 20%),无足够相应。 A5重新Propose (序号4), A3再次承诺A5。

造成这种情况的原因是系统中有一个以上的 Proposer,多个 Proposers 相互争夺 Acceptor,造成迟迟无法达成一致的情况。 针对这种情况,一种改进的 Paxos 算法被提出:从系统中选出一个节点作为 Leader,只有 Leader 能够发起提案。 这样,一次 Paxos 流程中只有一个 Proposer,不会出现活锁的情况,此时只会出现例子中第一种情况

ZAB 协议 ZAB算法

Zab 借鉴了 Paxos 算法,是特别为 Zookeeper 设计的支持崩溃恢复的原子广播协议。基于该协议, Zookeeper 设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后 Leader 客户端将数据同步到其他 Follower 节点。 即 Zookeeper 只有一个 Leader 可以发起提案。

Zab协议内容

Zab 协议包括两种基本的模式: 消息广播、 崩溃恢复

消息广播

ZAB协议针对事务请求的处理过程 类似于一个两阶段提交过程
(1) 广播事务阶段
(2) 广播提交 *** 作
这两阶段提交模型如下, 有可能因 为Leader宕机带来数据不一致, 比如
( 1 ) Leader 发 起 一 个 事 务
Proposal1 后 就 宕 机 , Follower 都 没 有 Proposal1
( 2) Leader收到半数ACK宕机, 没来得及向Follower发送Commit

(1) 客户端发起一个写 *** 作请求。
(2) Leader服务器将客户端的请求转化为事务Proposal 提案, 同时为每个Proposal 分配一个全局的ID, 即zxid。
(3) Leader服务器为每个Follower服务器分配一个单独的队列, 然后将需要广播的 Proposal依次放到队列中去, 并且根据FIFO策略进行消息发送。
(4) Follower接收到Proposal后, 会首先将其以事务日志的方式写入本地磁盘中, 写入成功后向Leader反馈一个Ack响应消息。
(5) Leader接收到超过半数以上Follower的Ack响应消息后, 即认为消息发送成功, 可以发送commit消息。
(6) Leader向所有Follower广播commit消息, 同时自身也会完成事务提交。 Follower 接收到commit消息后, 会将上一条事务提交。

(7) Zookeeper采用Zab协议的核心, 就是只要有一台服务器提交了Proposal, 就要确保所有的服务器最终都能正确提交Proposal。

崩溃恢复

崩溃恢复主要包括两部分: Leader 选举和数据恢复

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式

假设两种服务器异常情况:

  • 假设一个事务在 Leader 提出之后, Leader 挂了。

  • Zab协议崩溃恢复要求满足以下两个要求:

确保已经被Leader提交的提案Proposal, 必须最终被所有的Follower服务器提交。 (已经产生的提案, Follower必须执行)

确保丢弃已经被Leader提出的, 但是没有被提交的Proposal。 (丢弃胎死腹中的提案)

崩溃恢复——Leader选举

Leader选举: 根据上述要求, Zab协议需要保证选举出来的Leader需要满足以下条件:

  • 新选举出来的 Leader 不能包含未提交的 Proposal 。 即新 Leader 必须都是已经提交了 Proposal 的 Follower 服务器节点
  • 新选举的 Leader 节点中含有最大的 zxid 。 这样做的好处是可以避免Leader 服务器检查 Proposal 的提交和丢弃工作
崩溃恢复——数据恢复

Zab如何数据同步:

(1) 完成 Leader 选举后, 在正式开始工作之前(接收事务请求, 然后提出新的 Proposal ) , Leader 服务器会首先确认事务日志中的所有的Proposal 是否已经被集群中过半的服务器 Commit

2) Leader服务器需要确保所有的Follower服务器能够接收到每一条事务的Proposal, 并且能将所有已经提交的事务Proposal 应用到内存数据中。 等到Follower将所有尚未同步的事务Proposal都从Leader服务器上同步过, 并且应用到内存数据中以后, Leader才会把该Follower加入到真正可用的Follower列表中

CAP CAP理论概述

一个分布式系统不可能同时满足以下三种

  • 一致性(C : Consistency)
  • 可用性(A : Available)
  • 分区容错性( P : Partition Tolerance)

这三个基本需求, 最多只能同时满足其中的两项, 因为P是必须的, 因此往往选择就在 CP 或者 AP 中

  • 一致性( C : Consistency)

在分布式环境中, 一致性是指数据在多个副本之间是否能够保持数据一致的特性。 在一致性的需求下, 当一个系统在数据一致的状态下执行更新 *** 作后, 应该保证系统的数据仍然处于一致的状态

  • 可用性(A: Available)

可用性是指系统提供的服务必须一直处于可用的状态, 对于用户的每一个 *** 作请求总是能够在有限的时间内返回结果

  • 分区容错性( P : Partition Tolerance)

分布式系统在遇到任何网络分区故障的时候, 仍然需要能够保证对外提供满足一致性和可用性的服务, 除非是整个网络环境都发生了故障

用CAP理论来分析ZooKeeper

ZooKeeper 保证的是 CP

  • ZooKeeper 不能保证每次服务请求的可用性。 (注:在极端环境下, ZooKeeper 可能会丢弃一些请求, 消费者程序需要重新请求才能获得结果) 。 所以说, ZooKeeper 不能保证服务可用性

  • 进行Leader选举时集群都是不可用

源码详解 辅助源码 持久化源码

Leader 和 Follower 中的数据会在内存和磁盘中各保存一份。所以需要将内存中的数据持久化到磁盘中

在 org.apache.zookeeper.server.persistence 包下的相关类都是序列化相关的代码

快照

public interface SnapShot {
	// 反序列化方法
	long deserialize(DataTree dt, Map sessions) throws IOException;
	
	// 序列化方法
	void serialize(DataTree dt, Map sessions, File name) throws IOException;
	
	
	File findMostRecentSnapshot() throws IOException;
	
	// 释放资源
	void close() throws IOException;
}
public interface TxnLog {
	// 设置服务状态
	void setServerStats(ServerStats serverStats);
	// 滚动日志
	void rollLog() throws IOException;
	// 追加
	boolean append(TxnHeader hdr, Record r) throws IOException;
	// 读取数据
	TxnIterator read(long zxid) throws IOException;
	// 获取最后一个 zxid
	long getLastLoggedZxid() throws IOException;
	// 删除日志
	boolean truncate(long zxid) throws IOException;
	// 获取 DbId
	long getDbId() throws IOException;
	// 提交
	void commit() throws IOException;
	// 日志同步时间
	long getTxnLogSyncElapsedTime();
	// 关闭日志
	void close() throws IOException;
	// 读取日志的接口
	public interface TxnIterator {
		// 获取头信息
		TxnHeader getHeader();
		// 获取传输的内容
		Record getTxn();
		// 下一条记录
		boolean next() throws IOException;
		// 关闭资源
		void close() throws IOException;
		// 获取存储的大小
		long getStorageSize() throws IOException;
	}
}

处理持久化的核心类

序列化源码

zookeeper-jute 代码是关于 Zookeeper 序列化相关源码

序列化和反序列化方法

public interface Record {
	// 序列化方法
	public void serialize(OutputArchive archive, String tag) throws IOException;
	
	// 反序列化方法
	public void deserialize(InputArchive archive, String tag) throws IOException;
}

迭代

public interface Index {
	// 结束
	public boolean done();
	// 下一个
	public void incr();
}

序列化支持的数据类型

public interface OutputArchive {
	public void writeByte(byte b, String tag) throws IOException;
	public void writeBool(boolean b, String tag) throws IOException;
	public void writeInt(int i, String tag) throws IOException;
	public void writeLong(long l, String tag) throws IOException;
	public void writeFloat(float f, String tag) throws IOException;
	public void writeDouble(double d, String tag) throws IOException;
	public void writeString(String s, String tag) throws IOException;
	public void writeBuffer(byte buf[], String tag) throws IOException;
	public void writeRecord(Record r, String tag) throws IOException;
	public void startRecord(Record r, String tag) throws IOException;
	public void endRecord(Record r, String tag) throws IOException;
	public void startVector(List v, String tag) throws IOException;
	public void endVector(List v, String tag) throws IOException;
	public void startMap(TreeMap v, String tag) throws IOException;
	public void endMap(TreeMap v, String tag) throws IOException;
}

反序列化支持的数据类型

public interface InputArchive {
	public byte readByte(String tag) throws IOException;
	public boolean readBool(String tag) throws IOException;
	public int readInt(String tag) throws IOException;
	public long readLong(String tag) throws IOException;
	public float readFloat(String tag) throws IOException;
	public double readDouble(String tag) throws IOException;
	public String readString(String tag) throws IOException;
	public byte[] readBuffer(String tag) throws IOException;
	public void readRecord(Record r, String tag) throws IOException;
	public void startRecord(String tag) throws IOException;
	public void endRecord(String tag) throws IOException;
	public Index startVector(String tag) throws IOException;
	public void endVector(String tag) throws IOException;
	public Index startMap(String tag) throws IOException;
	public void endMap(String tag) throws IOException;
}
ZK 服务端初始化源码解析 ZK服务端启动脚本分析

Zookeeper 服务的启动命令是 zkServer.sh start

ZK服务端启动入口

ctrl + n,查找 QuorumPeerMain

解析参数zoo.cfg和myid

QuorumPeerConfig.java

过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

初始化通信组件 ZK 服务端加载数据源码解析 冷启动数据恢复快照数据
  • zk 中的数据模型,是一棵树, DataTree,每个节点,叫做 DataNode
  • zk 集群中的 DataTree 时刻保持状态同步
  • Zookeeper 集群中每个 zk 节点中,数据在内存和磁盘中都有一份完整的数据

内存数据: DataTree
磁盘数据:快照文件 + 编辑日志

冷启动数据恢复编辑日志

冷启动数据恢复快照数据 冷启动数据恢复编辑日志 ZK 选举源码解析 Zookeeper选举机制——第一次启动

Zookeeper选举机制——非第一次启动

ZK选举源码解析

选举准备 选举执行

Follower 和 Leader 状态同步源码

Leader.lead()等待接收follower的状态同步申请 Follower.lead()查找并连接Leader Leader.lead()创建LearnerHandler Follower.lead()创建registerWithLeader Leader.lead()接收Follwer状态,根据同步方式发送同步消息 Follower.lead()应答Leader同步结果 Leader.lead()应答Follower 服务端 Leader 启动

服务端 Follower 启动

客户端启动 客户端初始化源码解析

创建ZookeeperMain 初始化监听器 解析连接地址 创建通信 执行run()

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5610327.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-15
下一篇2022-12-15

发表评论

登录后才能评论

评论列表(0条)

    保存