更新時(shí)間:2020-11-04 來源:黑馬程序員 瀏覽量:
1. Redis分布式鎖實(shí)現(xiàn)原理
分布式鎖本質(zhì)上要實(shí)現(xiàn)的目標(biāo)就是在Redis里面占一個(gè)“茅坑”,當(dāng)別的進(jìn)程也要來占時(shí),發(fā)現(xiàn)已經(jīng)有人蹲在那里了,就只好放棄或者稍后再試。占坑一般是使用setnx(set if not exists)指令,只允許被一個(gè)客戶端占坑。先來先占,用完了,再調(diào)用del指令釋放茅坑。
死鎖問題:如果邏輯執(zhí)行到中間出現(xiàn)異常了,可能會(huì)導(dǎo)致del指令沒有被調(diào)用,這樣就會(huì)陷入死鎖,鎖永遠(yuǎn)得不到釋放,解決這個(gè)問題我們?cè)谀玫芥i之后,再給鎖加上一個(gè)過期時(shí)間,比如 5s,這樣即使中間出現(xiàn)異常也可以保證 5 秒之后鎖會(huì)自動(dòng)釋放。
2. 普通非阻塞鎖實(shí)現(xiàn)
public class RedisLock {
private Jedis jedis;
public RedisLock(Jedis jedis) {
this.jedis = jedis;
}
public boolean lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}
public void unlock(String key) {
jedis.del(key);
}
}
2.1 存在問題
如果某一個(gè)進(jìn)程沒有拿到鎖得到了false的結(jié)果那么次進(jìn)程是否執(zhí)行當(dāng)前任務(wù)?顯然對(duì)于一般情況來說我們的任務(wù)都是必須執(zhí)行的那么此時(shí)我們就要考慮該何時(shí)執(zhí)行了,在傳統(tǒng)的鎖中我們?nèi)绻麤]有拿到鎖線程就進(jìn)入了阻塞狀態(tài)那么此處我們是否可以改進(jìn)同樣實(shí)現(xiàn)阻塞喚醒機(jī)制。
3. 分布式阻塞鎖具體實(shí)現(xiàn)
3.1 解決思路
(1)首先我們改造lock鎖,當(dāng)不能創(chuàng)建key時(shí)就利用當(dāng)前key阻塞當(dāng)前線程
(2)當(dāng)某一個(gè)線程釋放鎖時(shí)通過redis的pub/sub發(fā)送一個(gè)消息消息內(nèi)容為key
(3)所有使用鎖的應(yīng)用監(jiān)聽lock通道的消息,在收到消息時(shí)通過key喚醒對(duì)應(yīng)線程
3.2具體實(shí)現(xiàn)
package com.hgy.common.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import java.util.HashMap;
public class RedisLock extends JedisPubSub {
//是否已經(jīng)初始化監(jiān)聽
private static volatile boolean isListen = false;
//每一個(gè)redis的key對(duì)應(yīng)一個(gè)阻塞對(duì)象
private HashMap<String, Object> blockers = new HashMap<>();
private Jedis jedis;
//當(dāng)前獲得鎖的線程
private Thread curThread;
public RedisLock(Jedis jedis) {
this.jedis = jedis;
//保證沒一個(gè)應(yīng)用只初始化一次監(jiān)聽
if (!isListen) {
synchronized (RedisLock.class) {
if (!isListen) {
// 啟動(dòng)一個(gè)線程做消息監(jiān)聽
new Thread(()->{
new Jedis("192.168.200.128", 6379).subscribe(this,"lock");
}).start();
isListen = true;
}
}
}
}
public void lock(String key) throws InterruptedException {
//循環(huán)判斷是否能夠創(chuàng)建key, 不能則直接wait釋放CPU執(zhí)行權(quán)
while (jedis.set(key, "", "nx", "ex", 20L) == null) {
synchronized (key) {
System.out.println(Thread.currentThread().getName() + "======="+ key);
blockers.put(key, key);
key.wait();
}
}
blockers.put(key, key);
//能夠成功創(chuàng)建,獲取鎖成功記錄當(dāng)前獲取鎖線程
curThread = Thread.currentThread();
}
public void unlock(String key) {
//判斷是否為加鎖的線程執(zhí)行解鎖, 不是則直接忽略
if( curThread == Thread.currentThread()) {
jedis.del(key);
//刪除key之后需要notifyAll所有的應(yīng)用, 所以這里采用發(fā)訂閱消息給所有的應(yīng)用
jedis.publish("lock", key);
}
}
/**
* 所有應(yīng)用接收到消息后在當(dāng)前應(yīng)用中執(zhí)行對(duì)應(yīng)key的notifyAll方法
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
Object lock = blockers.get(message);
if(lock != null) {
synchronized (lock) {
lock.notifyAll();
}
}
}
}
4.測(cè)試
目標(biāo): 開啟兩個(gè)mian線程, 在第一個(gè)中首先暫停3秒然后打印1-100然后線程休眠5秒釋放鎖并打印最后的毫秒數(shù);main1在執(zhí)行的同時(shí)執(zhí)行main2,在2中打印開始時(shí)間;最后比對(duì)1和2的開始時(shí)間即可驗(yàn)
證。
注意: 先啟動(dòng)1然后啟動(dòng)2
·main1
package com.hgy;
import com.hgy.common.redis.RedisLock;
import redis.clients.jedis.Jedis;
public class RedisLockApp1 {
private static RedisLock redisLock;
public static void main(String[] args) throws InterruptedException {
Jedis client = new Jedis("192.168.200.128", 6379);
redisLock = new RedisLock(client);
redisLock.lock("demo");
Thread.sleep(3000);
for (int i = 0; i < 100; i++) {
System.out.println("app1" + i);
}
Thread.sleep(5000);
redisLock.unlock("demo");
System.out.println("App1==> end:" + System.currentTimeMillis());
}
}
·main2
package com.hgy;
import com.hgy.common.redis.RedisLock;
import redis.clients.jedis.Jedis;
public class RedisLockApp2 {
private static RedisLock redisLock;
public static void main(String[] args) throws InterruptedException {
Jedis client = new Jedis("192.168.200.128", 6379);
redisLock = new RedisLock(client);
redisLock.lock("demo");
System.out.println("App2==> start:" + System.currentTimeMillis());
for (int i = 0; i < 100; i++) {
System.out.println("app2" + i);
}
redisLock.unlock("demo");
}
}
注意
如果細(xì)心的小伙伴兒可能已經(jīng)發(fā)現(xiàn)了unlock其實(shí)不是一個(gè)原子操作,可能在未發(fā)布消息但刪除key之后的這段時(shí)間如果有人此時(shí)執(zhí)行l(wèi)ock那么可以直接拿到鎖;但是影響不大因?yàn)槟玫芥i之后其他被阻塞的線程被喚醒之后將會(huì)繼續(xù)阻塞。
猜你喜歡