1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| package com.matrix.eshop.cache.zk;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;
/** * ZookeeperSession * * @author matrix * */ public class ZooKeeperSession {
// 声明一个静态同步类 private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
public ZooKeeperSession() { // 去连接Zookeeper server,创建会话的时候,是异步去进行的 // 所以需要一个监听器,告诉什么时候是真正完成跟zk server的连接 // 创建zk客户端 try { this.zookeeper = new ZooKeeper("192.168.31.231:2181,192.168.31.232:2181,192.168.31.232:2181", 50000, new ZooKeeperWatcher()); // 给一个状态CONNECTING,连接中 System.out.println(zookeeper.getState());
try { // CountDownLatch // java多线程并发同步的一个工具类 // 会传递进去一些数字,比如说1,2,3 都可以 // 然后await(),如果数字不是0,那么就卡住,等待
// 其他的线程可以调用coutnDown(),减1 // 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态 // 继续向下运行
connectedSemaphore.await(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("ZooKeeper session established......"); } catch (Exception e) { e.printStackTrace(); } }
/** * 获取分布式锁 * * @param productId */ public void acquireDistributedLock(Long productId) { String path = "/product-lock-" + productId;
try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to accquire lock for product[id=" + productId + "]"); } catch (Exception e) { // 如果商品对应的锁的node,已经存在,就是已经被别人加锁了,那么这里就会报错 // NodeExistException
int count = 0; while (true) { try { Thread.sleep(20); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { e2.printStackTrace(); count++; continue; } System.out.println( "success to accquire lock for product[id=" + productId + "] after " + count + " times "); break; } } }
/** * 释放掉一个分布式锁 * * @param productId */ public void releaseDistributedLock(Long productId) { String path = "/product-lock-" + productId; try { zookeeper.delete(path, -1); } catch (Exception e) { e.printStackTrace(); } }
/** * 建立zk session的watcher * * @author Administrator * */ private class ZooKeeperWatcher implements Watcher {
public void process(WatchedEvent event) { System.out.println("Receive watched event: " + event.getState()); if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } }
}
/** * 封装单例的静态内部类 * * @author matrix * */ private static class Singleton { private static ZooKeeperSession instance;
static { instance = new ZooKeeperSession(); }
public static ZooKeeperSession getInstance() { return instance; } }
/** * 获取单例 * * @return */ public static ZooKeeperSession getInstance() { return Singleton.getInstance(); }
/** * 初始化单例便捷方法 */ public static void init() { getInstance(); } }
|