zookeeper第七课-分布式锁实现

之前学习redis的时候,我们也提及过分布式锁的概念。
但也谈及了redis想要实现分布式锁的复杂度,虽然redis的确很快,但其实并不是那么的可靠,而想要达到可靠,就需要落地持久化,反而降低了性能。。。
而基于zookeeper,其实是可以比较简单的实现一个比较可靠的分布式锁。
下面我们来聊一聊,想要实现一个分布式锁,我们基于zookeeper应该注意些什么?
在这里插入图片描述
1、作为锁来说,肯定是需要保证同一时刻只有一个人可以获得锁。
2、一个获得锁的人出现了问题,我们需要保证不能造成死锁,因此zookeeper的临时节点非常合适。
3、一个获得锁的人在操作完成后,会将锁进行释放,但其他的人是怎么知道它什么时候释放的呢? 轮询定时探测? 不不不,存在时效性。zookeeper有更高效的做法:watch监听回调。
4、watch应该watch谁呢? 如果1000个线程都想要得到一把锁,当一个获得锁的线程释放锁时,1000个线程蜂拥而至,势必会对zookeeper服务造成巨大压力。
那如何有效避免高密度的争抢? 选用队列锁。 当锁释放时, 如果可以使每个线程有序的排列,每个线程分别去监听它前面的线程 。永远由队列第一个线程获得锁。
这样的话,每当一个线程释放锁,只有监听这个线程锁的线程会得到通知,顺位获得锁,而不会唤醒其他线程的争抢。

大致模型:

在这里插入图片描述

代码实现

还是先准备一个zookeeper的连接工具类

public class ZkUtil {
    //使用/testConfig作为我们的配置工作路径
    private static final String CONNECT_ZK_URL = "192.168.221.66:2181,192.168.221.68:2181,192.168.221.70:2181,192.168.221.72:2181/testConfig";
    private static final Integer SESSION_TIME_OUT = 3000; //会话超时时间
    private static final DefaultWatcher watcher = new DefaultWatcher(); //会话级别watcher
    private static ZooKeeper zkInstance;

    static {
        try {
            zkInstance = new ZooKeeper(CONNECT_ZK_URL,SESSION_TIME_OUT,watcher);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static ZooKeeper getZkInstance() {
        try {
            watcher.countDownLatch.await();//阻塞直到zookeeper连接成功后
            return zkInstance;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;

    }
    //会话级别watcher
    private static class DefaultWatcher implements Watcher{
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        @Override
        public void process(WatchedEvent event) {
            Event.KeeperState state = event.getState();
            switch (state) {
                case Unknown:
                    break;
                case Disconnected:
                    break;
                case NoSyncConnected:
                    break;
                case SyncConnected:
                    System.out.println("连接ZK成功。。");
                    countDownLatch.countDown();
                    break;
                case AuthFailed:
                    break;
                case ConnectedReadOnly:
                    break;
                case SaslAuthenticated:
                    break;
                case Expired:
                    break;
            }

        }
    }
}
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 *  使用zk监听器实现分布式锁
 */
public class LockWatcher implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zk;
    private String pathName;
    private String threadName;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public LockWatcher(ZooKeeper zk) {
        this.zk = zk;
    }

    public String getPathName() {
        return pathName;
    }

    public void setPathName(String pathName) {
        this.pathName = pathName;
    }

    public String getThreadName() {
        return threadName;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    /**
     * watcher自身监听事件
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        Event.EventType type = event.getType();
        switch (type) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //当有节点挂了的时候,会触发监听这个节点的监听器,然后重新获取当前zk该路径下的所有节点,
                // 1.如果此时当前节点是第一个节点,则可以得到执行机会进行处理
                // 2.否则的话,将当前节点的监听改为之前监听节点的前一个节点
                zk.getChildren("/",false,this,"getChildrenNodes");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }

    public void tryLock() {
//        if(zk.getData("/"))  //此处可获取当前工作根节点的数据,如果和当前线程是一样的,直接通过。 也就是实现了重入锁的概念
        zk.create("/lock","zk_lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"tryToLock");
        try {
            this.countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void unLock() {
        try {
            zk.delete("/"+pathName,-1);
            this.countDownLatch = new CountDownLatch(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    /**
     *  创建节点回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println(threadName);
        this.pathName = name.substring(1);
        zk.getChildren("/",false,this,"getChildrenNodes");

    }

    /**
     *  获取子节点列表回调
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        //后面监听前面的   例如有三个zk节点同时被创建(代表三个同时的事务线程),  创建的顺序是  a,b,c     那 每一个后面的会依次监听前面的
        // 如:  a <- b <- c  只有第一个节点才是可以进行操作的。这样的话,前面的节点删除了(锁释放),后面的可以立马监听到,这样就可以保证事务锁的效果
        Collections.sort(children);
        int indexOf = children.indexOf(pathName);
        if(indexOf==0){
            try {
                //TODO 此处可记录当前持有锁的线程信息,进而实现重入锁
                zk.setData("/",pathName.getBytes(),-1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            return;
        }

        zk.exists("/"+children.get(indexOf-1),this,this,"exist");


    }

    /**
     * exist回调
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //防止监听的前一个节点还未存在,或者已经挂掉, 此时需要重新获取节点
        if(stat==null){
            zk.getChildren("/",false,this,"getChildrenNodes");
        }

    }
}

程序入口:

@Test
    public void test1(){
        ZooKeeper zkInstance = ZkUtil.getZkInstance();
        //模拟10个线程抢锁的过程
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                LockWatcher lockWatcher = new LockWatcher(zkInstance);
                lockWatcher.setThreadName(Thread.currentThread().getName());
                lockWatcher.tryLock();
                System.out.println(lockWatcher.getThreadName()+":to do something~");
//                System.out.println(lockWatcher.getThreadName()+":to do something2~");
                lockWatcher.unLock();

            }).start();
        }
        
        //同步阻塞住
        while (true){

        }
    }

热门文章

暂无图片
编程学习 ·

如何用JUnit单元测试List

问题 JUnit测试List时差强人意。 解法 引入依赖 hamcrest-library包含许多有用方法来测试List数据类型。 <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version>&l…
暂无图片
编程学习 ·

嵌入式linux读取sht20温湿度传感器例程

sht20主要是i2c总线接口 设备地址0x40 读取温度原理&#xff1a;读取0xe3寄存器地址两个字节 读取温度原理&#xff1a;读取0xe5寄存器地址两个字节 以下是代码 sht20.c #include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <sys/type…
暂无图片
编程学习 ·

实用frida进阶:内存漫游、hook anywhere、抓包

目录1 内存漫游1.1 获取基本信息1.2 提取内存信息1.3 内存堆搜索与执行1.4 启动activity或service2 Frida hook anywhere2.1 objection&#xff08;内存漫游&#xff09;2.2 objection&#xff08;hook&#xff09;2.3 ZenTracer&#xff08;hook&#xff09;3 Frida用于抓包3.…
暂无图片
编程学习 ·

冰河木马的使用实验

前言&#xff1a; 作者是一个普通高校学生&#xff0c;在博客上开通了[网络安全学习]专栏&#xff0c;以此激励自己坚持学习。由于是初次进行博客创作、经验不足、可能比较粗糙&#xff0c;如有错漏之处希望大家能够指正、也欢迎大家一起交流学习。 如需查看完整学习博文&#…
暂无图片
编程学习 ·

贷款 银行一二三类账户区别是什么?

文章目录I类账户定位&#xff1a;II类账户定位&#xff1a;III类账户定位&#xff1a;账户功能特点使用限制账户形式开户渠道可办理账户数目自2016年12月1日起&#xff0c;银行在现有个人银行账户基础上&#xff0c;将个人银行账户分为一类银行账户、二类银行账户和三类银行账户…
暂无图片
编程学习 ·

java实现分布式项目搭建的方法

1 分布式 1.1 什么是分布式 分布式系统一定是由多个节点组成的系统。其中&#xff0c;节点指的是计算机服务器&#xff0c;而且这些节点一般不是孤立的&#xff0c;而是互通的。这些连通的节点上部署了我们的节点&#xff0c;并且相互的操作会有协同。分布式系统对于用户而言…
暂无图片
编程学习 ·

torch.nn.CrossEntropyLoss()用法

CLASS torch.nn.CrossEntropyLoss(weight: Optional[torch.Tensor] None, size_averageNone, ignore_index: int -100, reduceNone, reduction: str mean) 这个评价损失将 nn.LogSoftmax() 和 nn.NLLLoss() 结合在一个类中。 在训练带有C类的分类问题时很有用。 如果提供&…
暂无图片
编程学习 ·

项目开发过程复盘

前言 2020年5月至2020年11月&#xff0c;我参与了公司一个燃油防盗系统开发的小项目&#xff0c;在这里复盘一下过程中的一些问题及我的一些个人看法&#xff0c;希望在以后的项目中能有所借鉴意义。 项目简介 该项目旨在为商用车开发一套燃油防盗系统&#xff0c;在用户离开…
暂无图片
编程学习 ·

自主数据类型:在TVM中启用自定义数据类型探索

自主数据类型&#xff1a;在TVM中启用自定义数据类型探索 介绍 在设计加速器时&#xff0c;一个重要的决定是如何在硬件中近似地表示实数。这个问题有一个长期的行业标准解决方案&#xff1a;IEEE 754浮点标准.1。然而&#xff0c;当试图通过构建高度专业化的设计来最大限度地利…
暂无图片
编程学习 ·

php 面向对象_练习

插入usb行为 interface usb{function cha();function check(); } class port{protected $state 0;protected $thing "";public function __construct($thing) {$this->thing $thing;}public function cha() {$this->state 1;}public function check() {if…
暂无图片
编程学习 ·

rosconsole 设定

rosconsole 设定verbose 介绍BaseNamedverbose设置改变方式1: 通过configuration文件方式2: 通过 rqt_logger_level / rqt_console tool方式3: 直接在code中设定verbose 介绍 这是一个rospackage&#xff0c;可以用来控制console output方式, log的详尽程度。它有8种logging s…
暂无图片
编程学习 ·

11. JUC阻塞队列

栈与队列 栈就&#xff1a;先进后出&#xff0c;后进先出 队列&#xff1a;先进先出 阻塞队列 必须要阻塞/不得不阻塞 线程1往阻塞队列里添加元素&#xff0c;线程2从阻塞队列里移除元素 以蛋糕店为例&#xff0c;假设去买蛋糕的时候&#xff0c;如果柜子里没有蛋糕&#x…
暂无图片
编程学习 ·

SQL学习___06:

1.数据类型 其它数据库数据类型Oracle补充 Oracle常见数据类型&#xff1a;&#xff08;其中数字不代表大小&#xff0c;只代表数据长度&#xff09; char(n)&#xff1a;定长数据类型&#xff0c;长度固定。当"abc"存在char(20)中时&#xff0c;包含17个空字符。…
暂无图片
编程学习 ·

linux下解压rar文件

1、在home目录下打开终端&#xff0c;执行wget http://www.rarlab.com/rar/rarlinux-3.8.0.tar.gz 2、 tar zxvf rar rarlinux-3.8.0.tar.gz cd rar make make install 3、 将xxx目录压缩为xxx.rar指令 rar a xxx.rar /xxx将xxx.rar解压的命令 unrar -e xxx.rar 附&…
暂无图片
编程学习 ·

Hi3861致敬hello world

说明&#xff1a;文章是在电脑笔记上完成&#xff0c;复制到博客上来无图片&#xff1b;带有操作截图的文章已经以PDF的形式通过资源上传博客&#xff0c;链接如下&#xff1a; https://mp.csdn.net/console/upDetailed 通过致敬hello world&#xff0c;编写简单业务程序了解H…
暂无图片
编程学习 ·

【力扣-中等】649. Dota2 参议院

Dota2 的世界里有两个阵营&#xff1a;Radiant(天辉)和 Dire(夜魇) Dota2 参议院由来自两派的参议员组成。现在参议院希望对一个 Dota2 游戏里的改变作出决定。他们以一个基于轮为过程的投票进行。在每一轮中&#xff0c;每一位参议员都可以行使两项权利中的一项&#xff1a; 禁…