
方法实现junit junitRELEASE org.apache.logging.log4j log4j-core2.8.2 org.apache.zookeeper zookeeper3.5.7
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
// zookeeper server 列表
private String connectString =
"hadoop102:2181,hadoop103:2181,hadoop104:2181";
// 超时时间
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "locks";
private String subNode = "seq-";
// 当前client 等待的子节点
private String waitPath;
//ZooKeeper 连接
private CountDownLatch connectLatch = new CountDownLatch(1);
//ZooKeeper 节点等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前client 创建的子节点
private String currentNode;
// 和zk 服务建立连接,并创建根节点
public DistributedLock() throws IOException,
InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开latch, 唤醒wait 在该latch 上的线程
if (event.getState() ==
Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了waitPath 的删除事件
if (event.getType() ==
Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
{
waitLatch.countDown();
}
}
});
// 等待连接建立
connectLatch.await();
//获取根节点状态
Stat stat = zk.exists("/" + rootNode, false);
//如果根节点不存在,则创建根节点,根节点类型为永久节点
if (stat == null) {
System.out.println("根节点不存在");
zk.create("/" + rootNode, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁方法
public void zkLock() {
try {
//在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zk.create("/" + rootNode + "/" + subNode,
null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List childrenNodes = zk.getChildren("/" +
rootNode, false);
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明
client 获得锁
if (childrenNodes.size() == 1) {
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(childrenNodes);
//当前节点名称
String thisNode = currentNode.substring(("/" +
rootNode + "/").length());
//获取当前节点的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明 thisNode 在列表中最小, 当前
client 获得锁
return;
} else {
// 获得排名比currentNode 前1 位的节点
this.waitPath = "/" + rootNode + "/" +
childrenNodes.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的process 方法
zk.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁方法
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
测试
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws
InterruptedException, IOException, KeeperException {
// 创建分布式锁1
final DistributedLock lock1 = new DistributedLock();
// 创建分布式锁2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.zkLock();
System.out.println("线程1 获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.zkLock();
System.out.println("线程2 获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
方法讲解
当线程获取到zookeeper得节点创建一个临时节点,作为锁标志,并获取当前节点在zookeeper得位置,如果不为0,会监听这个节点得前置节点,并进入等待状态,直到获取锁。如果为-1 则出现错误,其他状态会等待锁。当方法完成之后会进行删除当前线程持有的节点,以达到释放锁的目的。
通过Curator 框架实现分布式锁导入依赖原生的Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
org.apache.curator curator-framework4.3.0 org.apache.curator curator-recipes4.3.0 org.apache.curator curator-client4.3.0
代码演示
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
private String rootNode = "/locks";
// zookeeper server 列表
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
// connection 超时时间
private int connectionTimeout = 2000;
// session 超时时间
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
// 测试
private void test() {
// 创建分布式锁1
final InterProcessLock lock1 = new InterProcessMutex(getCuratorframework(), rootNode);
// 创建分布式锁2
final InterProcessLock lock2 = new InterProcessMutex(getCuratorframework(), rootNode);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.acquire();
System.out.println("线程1 获取锁");
// 测试锁重入
lock1.acquire();
System.out.println("线程1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1 释放锁");
lock1.release();
System.out.println("线程1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("线程2 获取锁");
// 测试锁重入
lock2.acquire();
System.out.println("线程2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2 释放锁");
lock2.release();
System.out.println("线程2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 分布式锁初始化
public Curatorframework getCuratorframework (){
//重试策略,初试时间3 秒,重试3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
//通过工厂创建Curator
Curatorframework client =
CuratorframeworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
//开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)