zookeeper学习 1.Zookeeper简介 Zookeeper是一个分布式的,开源的分布式应用协调服务。可用于同步,配置维护,群组,和命名。并且是一个常见的文件系统的树型结构的数据模型,运行在java中。

它允许程序通过一个共享的类似于标准文件系统的有组织的分层明明空间分布式处理协调。它和一般的文件系统不同在于,它的目的是为了存储,zk的数据保持在内存中,所以它具有高吞吐和低延迟的效果。

1.1 Zk的角色

1.2 Zk的目的

组成zk的各个服务器必须能相互通信。他们在内存中保存了服务器状态,也保存了操作日志,并且持久化快照。只要大多数服务器是可用的,那么zk就是可用的。 客户端连接到zk服务器,并保存了一个tcp连接,通过tcp连接发送请求,获取响应,获取watch时间和发送心跳。如果连接断了,自动连接到其他不同的服务器。

1.3 Zk是有序的 序列。Zk用数字标记每一个更新,用它来反射出所有的事务顺序,随后的操作可以通过这个顺序实现更高级的抽象。例如同步原件。

1.4 Zk是高校的。 Zk的高效在于以读为主的系统上。Zk可以在千台服务器组成的读写比例大约为10:1的分布式系统上表现优异。

1.5 Zk存储结构 Zk提供的命名空间非常像一个标准的文件系统。一个名字是一系列的以‘/’隔开的。Zk命名空间中的所有的节点都是通过路径识别的。且每个路径都唯一。 Znode:树型结构。其上每个节点成为znode

不同于标准的文件系统,zk的每个节点都i可以又数据也可以有子目录。它就像一个即可以是文件也可以是目录的文件系统。 Zk还有一个临时节点的概念。这些znode和session存活的一样长。Session创建时存活,结束时也跟着删除。

1.6 Znode中的存在类型 1)持久化节点:客户端和zk服务端断开后,节点仍然存在 2)持久化顺序编号节点:有编号的持久化节点 3)临时节点:客户端和zk服务端断开后,节点被删除 4)临时顺序编号节点:有编号的临时节点。

条件的更新和watches Zk支持watches。客户端可以在znode上配置一个watch。当znode发生变化时触发并移除watch。

2.监听机制 监听就是对zk的znode节点的数据,结构变化进行简体那个。用户对哪个znode节点监听,一旦数据发生编号就会触发监控。 监听原理:

3.Zk的应用场景 由于zk的特点-zookeeper=文件系统+监听通知机制 Zookeeper作用 服务注册与订阅(共用节点) 分布式通知(监听znode)分布式配置中心 服务命名(znode特性) 数据订阅、发布(watcher) 分布式锁(临时节点)

4.zk实现分布式锁 Zookeeper实现分布式锁原理: Zookeeper节点路径不能重复 保证唯一性。 临时节点+事件通知

1.获取锁方法: 多个jvm同时在zk上创建一个临时节点/lockPath, 最终只能够有一个jvm创建临时节点成功,如果能够创建 临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑, 如果没有创建临时节点成功的jvm,则表示获取锁失败。 获取锁失败之后,可以采用不断重试策略,重试多次 获取锁失败之后,当前的jvm就进入到阻塞状态。

2.释放锁方法: 直接调用.close();释放锁 因为采用临时节点,当我们调用close()方法的时候 该临时节点会自动被删除。 其他没有获取到锁的jvm,就会从新进入到获取锁的状态。

3.被唤醒的方法:

被阻塞的jvm(没有获取锁成功的jvm),采用事件监听的方式 监听到节点已经被删除的情况下,则开始从新进入到获取锁的状态。

Zookeeper实现分布式锁代码 /**

@ClassName Lock @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com @Version V1.0 **/ public interface Lock { /**

获取锁 */ void getLock(); /**

释放锁 */ void unlock(); }

import com.mayikt.task.lock.Lock; import com.mayikt.task.utils.ZkClientUtils; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.ZkClient; import org.springframework.beans.factory.annotation.Value;

/**

@ClassName ZookeeperAbstractTemplateLock @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com @Version V1.0 **/ @Slf4j public abstract class ZookeeperAbstractTemplateLock implements Lock { @Value(“${server.port}”) private String serverPort; // // @Override // public void getLock() { // // 获取锁 // if (tryLock()) { // log.info(“>>{}服务获取成功…<<<”, serverPort); // } else { // // 阻塞等待 // waitLock(); // // 被唤醒后 从新获取锁 // getLock(); // } // } protected abstract void waitLock(); protected abstract boolean tryLock(); @Override public void unlock() { ZkClientUtils.getZkClient().close(); log.info(“>>[{}]服务释放了锁<<<”, serverPort); } /**

重试机制版 */ @Override public void getLock() { // 自旋重试版本 for (int i = 0; i < 5; i++) { boolean tryLock = tryLock(); if (tryLock) { log.info(“>>{}服务获取成功…<<<”, serverPort); return; } } // 重试5初次还是失败,则开始阻塞。 // 阻塞等待 waitLock(); // 被唤醒后 从新获取锁 getLock(); }

}

import com.mayikt.task.utils.ZkClientUtils; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.IZkDataListener;

import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

/**

@ClassName ZookeeperTemplateLock @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com @Version V1.0 **/ @Slf4j @Component public class ZookeeperTemplateLock extends ZookeeperAbstractTemplateLock { private String lockPath = “/lockPath”; private CountDownLatch countDownLatch = new CountDownLatch(1); @Value(“${server.port}”) private String serverPort; @Override protected boolean tryLock() { try { // String s = zkClient.create(lockPath, lockPath, CreateMode.EPHEMERAL); ZkClientUtils.newZkClient().createEphemeral(lockPath); return true; } catch (Exception e) { e.printStackTrace(); return false; } } @Override protected void waitLock() { // 注册一个监听事件 IZkDataListener iZkDataListener = new IZkDataListener() {

@Override

public void handleDataChange(String s, Object o) throws Exception {

}

@Override

public void handleDataDeleted(String s) throws Exception {

log.info(">>[{}],其他节点已经释放锁<<", serverPort);

countDownLatch.countDown();

}

};

ZkClientUtils.getZkClient().subscribeDataChanges(lockPath, iZkDataListener);

try {

boolean exists = ZkClientUtils.getZkClient().exists(lockPath);

if (exists) {

log.info(">>[{}],其他节点已经获取到锁,我要等待啦;<<", serverPort);

countDownLatch.await();

}

} catch (Exception e) {

e.printStackTrace();

}

// 唤醒成功之后,则移除该事件

ZkClientUtils.getZkClient().unsubscribeDataChanges(lockPath, iZkDataListener);

}

}

Zk重试机制 在zk获取锁失败后,若直接阻塞,消耗过大,可以通过重试机制,尝试再次获取锁。 若获取锁的次数超过设定次数后,再阻塞。防止因为网络等原因,若去锁失败。

Zk避免羊群效应 若jvm服务器较多,zk在唤醒阻塞中的jvm时,可能会导致zk阻塞问题 被称为羊群效应。 解决方法:zk创建临时顺序节点。唤醒时只会唤醒当前最小的节点。这时的重试机制中,获取的必须时上一个节点。

获取锁的流程: Jvm01---- 1.获取当前jvm创建的临时编号节点lockPath01 2.查询到lockPath 下所有的子节点,实现排序 查找到最小的。 3.最小的临时顺序编号节点:lockPath01 lockPath01==lockPath01 如果是小的节点情况下,则表示获取锁成功。 Jvm02---- 1.获取当前jvm创建的临时编号节点lockPath02 2.查询到lockPath 下所有的子节点,实现排序 查找到最小的。 3.最小的临时顺序编号节点:lockPath01 4.lockPath02!=lockPath01 5.如果当前自己创建的临时顺序编号节点不是最小的情况下,则会直接阻塞。 6.Jvm02 订阅到 lockPath01该临时顺序编号节点。

唤醒之后: Zk服务器端,当lockPath01节点被删除之后,会通知给jvm02 Jvm02从阻塞状态到运行状态。 1.获取当前jvm创建的临时编号节点lockPath02 2.查询到lockPath 下所有的子节点,实现排序 查找到最小的。 3.最小的临时顺序编号节点:lockPath02 4.lockPath02==lockPath02 属于最小节点 5.获取锁成功。

怎么提高zk效率? 1)设置重试机制,在最大次数内,可以重试,超过次数则等待。 2)通过创建zk临时顺序节点,避免羊群效应。

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: