Zookeeper分布式锁在缓存数据生产服务中的应用

发布 : 2017-07-14 分类 : 大数据 浏览 :

ZK分布式锁原理

1
2
3
4
5
6
7
8
9
创建zk的一个临时node,来模拟给摸一个商品id加锁

zk会给你保证,只会创建一个临时node,其他请求过来如果再要创建临时node,就会报错,NodeExistsException

所谓上锁,其实就是去创建某个product id对应的一个临时node

如果临时node创建成功了,说明成功加锁了,此时就可以去执行对redis里面数据的操作

如果临时node创建失败了,说明其他进程已经拿到锁了,在操作reids中的数据,那么就不断的等待,直到自己可以获取到锁为止

服务分布在不同机器上面,通过分布式锁让线程串行化运行

代码实现

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>

ZookeeperSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.matrix.eshop.cache.zk;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
* ZookeeperSession
*
* @author matrix
*
*/
public class ZooKeeperSession {

// 声明一个静态同步类
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;

public ZooKeeperSession() {
// 去连接Zookeeper server,创建会话的时候,是异步去进行的
// 所以需要一个监听器,告诉什么时候是真正完成跟zk server的连接
// 创建zk客户端
try {
this.zookeeper = new ZooKeeper("192.168.31.231:2181,192.168.31.232:2181,192.168.31.232:2181", 50000,
new ZooKeeperWatcher());
// 给一个状态CONNECTING,连接中
System.out.println(zookeeper.getState());

try {
// CountDownLatch
// java多线程并发同步的一个工具类
// 会传递进去一些数字,比如说1,2,3 都可以
// 然后await(),如果数字不是0,那么就卡住,等待

// 其他的线程可以调用coutnDown(),减1
// 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
// 继续向下运行

connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取分布式锁
*
* @param productId
*/
public void acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;

try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success to accquire lock for product[id=" + productId + "]");
} catch (Exception e) {
// 如果商品对应的锁的node,已经存在,就是已经被别人加锁了,那么这里就会报错
// NodeExistException

int count = 0;
while (true) {
try {
Thread.sleep(20);
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
e2.printStackTrace();
count++;
continue;
}
System.out.println(
"success to accquire lock for product[id=" + productId + "] after " + count + " times ");
break;
}
}
}

/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 建立zk session的watcher
*
* @author Administrator
*
*/
private class ZooKeeperWatcher implements Watcher {

public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}

}

/**
* 封装单例的静态内部类
*
* @author matrix
*
*/
private static class Singleton {
private static ZooKeeperSession instance;

static {
instance = new ZooKeeperSession();
}

public static ZooKeeperSession getInstance() {
return instance;
}
}

/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}

/**
* 初始化单例便捷方法
*/
public static void init() {
getInstance();
}
}

ZookeeperTest

1
2
3
4
5
6
7
8
9
10
package com.matrix.eshop.cache;

import com.matrix.eshop.cache.zk.ZooKeeperSession;

public class ZookeeperTest {

public static void main(String[] args){
ZooKeeperSession zkSEssion = ZooKeeperSession.getInstance();
}
}

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2017/07/14/Zookeeper分布式锁在缓存数据生产服务中的应用/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹