Zookeeper的四种节点类型

1、持久化节点 :所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。

2、持久化顺序节点:这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。基于持久顺序节点原理的经典应用-分布式唯一ID生成器。

3、临时节点:和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点,集群zk环境下,同一个路径的临时节点只能成功创建一个,利用这个特性可以用来实现master-slave选举。

4、临时顺序节点:相对于临时节点而言,临时顺序节点比临时节点多了个有序,也就是说,没创建一个节点都会加上节点对应的序号,先创建成功,序号越小。其经典应用为实现分布式锁。  

原理

Curator内部是通过InterProcessMutex(可重入锁)来在zookeeper中创建临时有序节点实现的,如果通过临时节点及watch机制实现锁的话,这种方式存在一个比较大的问题:所有取锁失败的进程都在等待、监听创建的节点释放,很容易发生"惊群效应",zookeeper的压力是比较大的,而临时有序节点就很好的避免了这个问题,Curator内部就是创建的临时有序节点。

原理

创建临时有序节点,每个线程均能创建节点成功,但是其序号不同,只有序号最小的可以拥有锁,其它线程只需要监听比自己序号小的节点状态即可

基本思路如下:

1、在你指定的节点下创建一个锁目录lock;

2、线程X进来获取锁在lock目录下,并创建临时有序节点;

3、线程X获取lock目录下所有子节点,并获取比自己小的兄弟节点,如果不存在比自己小的节点,说明当前线程序号最小,顺利获取锁;

4、此时线程Y进来创建临时节点并获取兄弟节点 ,判断自己是否为最小序号节点,发现不是,于是设置监听(watch)比自己小的节点(这里是为了发生上面说的惊群效应);

5、线程X执行完逻辑,删除自己的节点,线程Y监听到节点有变化,进一步判断自己是已经是最小节点,顺利获取锁。

 代码

org.apache.curator

curator-recipes

5.2.0

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**

* classname:DistributedLock

* desc:基于zookeeper的开源客户端Cruator实现分布式锁

*/

public class DistributedLock {

public static Logger log = LoggerFactory.getLogger(DistributedLock.class);

private InterProcessMutex interProcessMutex; //可重入排它锁

private String lockName; //竞争资源标志

private String root = "/distributed/lock/";//根节点

private static CuratorFramework curatorFramework;

private static String ZK_URL = "zookeeper1.tq.master.cn:2181,zookeeper3.tq.master.cn:2181,zookeeper2.tq.master.cn:2181,zookeeper4.tq.master.cn:2181,zookeeper5.tq.master.cn:2181";

static{

//设置重试策略,每隔一秒,最多重试3次

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

curatorFramework= CuratorFrameworkFactory.builder()

.connectString(ZK_URL)

.retryPolicy(retryPolicy)

.build();

curatorFramework.start();

}

/**

* 实例化

* @param lockName

*/

public DistributedLock(String lockName){

try {

this.lockName = lockName;

interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);

}catch (Exception e){

log.error("initial InterProcessMutex exception="+e);

}

}

/**

* 获取锁

*/

public void acquireLock(){

int flag = 0;

try {

//重试2次,每次最大等待2s,也就是最大等待4s

while (!interProcessMutex.acquire(2, TimeUnit.SECONDS)){

flag++;

if(flag>1){ //重试两次

break;

}

}

} catch (Exception e) {

log.error("distributed lock acquire exception="+e);

}

if(flag>1){

log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock fail");

}else{

log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock success");

}

}

/**

* 释放锁

*/

public void releaseLock(){

try {

if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){

interProcessMutex.release();

curatorFramework.delete().inBackground().forPath(root+lockName);

log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock success");

}

}catch (Exception e){

log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock exception="+e);

}

}

}

技巧 -- 添加idea,zookeeper插件

 

 

 

 

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: