Redisson重入锁是通过setnx命令实现的?别再云了

一:简述

问过很多面试者,redisson的可重复锁是怎么实现的,很多面试者都会不假思索的回答是通过redis的setnx命令来实现的,那么真的是这样吗?今天我们就一起来看下redisson分布式可重入锁到底是怎么实现的。

二:分布式锁的实现需要满足什么条件

首先我们在研究redisson分布式重入锁的实现原理之前,我们首先要知道,设计一个分布式锁需要满足什么条件。

1.需要有一个进程级别的共享资源,并且能够实现互斥

2.需要满足原子性

接下来我们通过源码来了解redisson是怎么实现分布式重入锁的。

三:redisson原理分析

我们从tryLock(),lock(),unlock()三个重要的方法为入口对相关代码进行分析。

首先看tryLock()方法。

tryLock()方法有三个重载方法。

代码语言:javascript
复制
//第一个参数表示获取锁的超时时间 第二个参数表示锁自动释放的时间,第三个参数是时间的单位
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
//第一个参数表示获取锁的超时时间,第二个参数是时间单位
boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException;

boolean tryLock();

先看三个参数的方法

首先调用tryAcquire()方法获取锁,如果返回null,代表获取到了锁,那么直接返回true,否则计算时间,看是否获取锁超时,超时的话返回false。如果没有超时,订阅锁释放的事件。(这个事件我们在分析unlock()的时候可以看到),收到锁释放的事件之后,再试尝试获取锁。

代码语言:javascript
复制
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//等待获取锁的时间
long time = unit.toMillis(waitTime);
//当前时间
long current = System.currentTimeMillis();
//获取当前线程id
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//如果返回null 那么代表获取到锁
if (ttl == null) {
//获取到锁 返回true
return true;
}
// 计算时间 看是否已经获取锁超时 如果已经超时 返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}

    current = System.currentTimeMillis();
    //订阅一个redis的事件 这个事件会在unlock的时候发布 
    RFuture&lt;RedissonLockEntry&gt; subscribeFuture = subscribe(threadId);
    //如果在超时时间内没收到了锁释放的消息 表示锁没有释放 锁获取失败,所以直接返回false
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -&gt; {
                if (e == null) {
                    //没有出现异常 取消订阅事件 因为已经获取锁超时了 不需要再订阅了
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }


    try {
        //再次校验 是否获取锁超时
        time -= System.currentTimeMillis() - current;
        if (time &lt;= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        while (true) {
            long currentTime = System.currentTimeMillis();
            //尝试获取锁 返回null代表获取到锁 否则返回的是锁的剩余时间
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                //获取到锁 直接返回true
                return true;
            }
            //校验获取锁是否超时 超时返回false
            time -= System.currentTimeMillis() - currentTime;
            if (time &lt;= 0) {
                acquireFailed(threadId);
                return false;
            }


            // waiting for message
            currentTime = System.currentTimeMillis();
            //如果锁没有释放 并且锁剩余的时间比超时的时间小的话 可以阻塞等待锁释放然后再次尝试获取锁 这里阻塞是通过Semaphore来实现的
            if (ttl &gt;= 0 &amp;&amp; ttl &lt; time) {
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }


            time -= System.currentTimeMillis() - currentTime;
            if (time &lt;= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        //跳出了while循环表示获取到了锁 或者出现了异常 取消订阅释放锁的事件
        unsubscribe(subscribeFuture, threadId);
    }

// return get(tryLockAsync(waitTime, leaseTime, unit));
}

tryAcquire()方法很关键,所以我们接下来看tryAcquire()的实现

tryAcquire()

我们可以看到tryAcquire()方法主要调用了tryAcquireAsync(),而tryAcquireAsync()方法返回的是一个RFuture,get()方法获取这个Future的结果。

代码语言:javascript
复制
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}

tryAcquireAsync()

tryAcquireAsync()方法的作用是调用tryLockInnerAsync()方法异步加锁,如果设置的leaseTime不是-1,那么在异步加锁完成之后,还会调用scheduleExpirationRenewal()方法加入一个延迟执行的任务,用来自动延期锁的时间(也就是看门狗机制)。所以我们重点看tryLockInnerAsync()异步加锁方法和scheduleExpirationRenewal()的续期机制。

注意:值得一提的是,如果我们传入了leaseTime,并且传入的值不是-1,那么就不会有自动延期锁时间的机制,到了leaseTime的时间,就会自动释放锁。

代码语言:javascript
复制
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
//如果是leaseTime不是-1,那么就异步加锁 然后返回结果 不会有刷新锁时间的机制
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
//出现异常就返回
return;
}

        // lock acquired
        if (ttlRemaining == null) {
            //创建一个刷新锁时间的任务
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}</code></pre></div></div><p><strong>tryLockInnerAsync()</strong></p><p>从这里我们就可以知道redisson加锁机制并不是通过setnx命令,而是通过lua脚本来实现的。简单说下这段lua脚本的意思,首先判断传入的key是否存在,如果不存在,那么就将hash的值自增1,并且设置锁的过期时间,然后返回null;其次如果key已经存在,并且是当前线程获取的锁,就将hash的值自增1,并且重新设置过期时间,然后返回null;最后没有获取到锁的话返回锁的过期时间。</p><p>注: 这个hash的key是锁的名称,hashkey是当前线程id生成的,为什么需要用到hash的结构,是因为要支持重入,需要记录获取锁的线程以及重入的次数,最好的结构应该就是hash。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">&lt;T&gt; RFuture&lt;T&gt; tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand&lt;T&gt; command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    
    //异步执行lua脚本命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              //如果key不存在,那么直接设置key 并且设置过期时间,然后返回null
              &#34;if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then &#34; +
                  &#34;redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); &#34; +
                  &#34;redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); &#34; +
                  &#34;return nil; &#34; +
              &#34;end; &#34; +
              //如果已经存在的锁是当前线程获取的,那么将hash的value自增1,并且重新设置过期时间,然后返回null
              &#34;if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then &#34; +
                  &#34;redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); &#34; +
                  &#34;redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); &#34; +
                  &#34;return nil; &#34; +
              &#34;end; &#34; +
              //没有获取到锁 返回锁的剩余过期时间
              &#34;return redis.call(&#39;pttl&#39;, KEYS[1]);&#34;,
                Collections.&lt;Object&gt;singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}</code></pre></div></div><p><strong>scheduleExpirationRenewal()</strong></p><p>这个方法是用来自动刷新锁的过期时间的,也就是我们常说的看门狗机制。首先会把需要刷新的线程封装成ExpirationEntry,如果ExpirationEntry已经存在,那么就只需要把线程加入进去即可,否则需要调用renewExpiration()方法初始化一个由时间轮去执行的延时任务。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}</code></pre></div></div><p><strong>renewExpiration()</strong></p><p>renewExpiration()方法的作用就是初始化一个延时任务并且加入到时间轮中,我们可以看到延时任务是在延迟(锁超时时间/3)之后执行,而刷新锁的过期时间的逻辑主要是renewExpirationAsync()方法来完成。</p><p>注:如果有同学不懂时间轮,可以百度了解一下,也可以暂时把它看成是一个延时任务,如果有需要可以在评论区留言,后续我会出文章一起讨论,这里由于篇幅原因就不仔细分析了。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">    private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //创建一个任务并且加入到时间轮中
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //调用renewExpirationAsync()方法去刷新锁的过期时间
            RFuture&lt;Boolean&gt; future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -&gt; {
                if (e != null) {
                    log.error(&#34;Can&#39;t update lock &#34; + getName() + &#34; expiration&#34;, e);
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    //如果刷新成功  继续调用renewExpiration()添加延时任务 用于下次刷新 
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}</code></pre></div></div><p><strong>renewExpirationAsync()</strong></p><p>可以看到为了保证原子性,锁的续期也是通过执行lua脚本来实现的,简单说下这段lua脚本的意思,首先判断锁的key是否存在,如果存在,那么就将锁续期,然后返回1,否则key不存在的话返回0。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">protected RFuture&lt;Boolean&gt; renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            &#34;if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then &#34; +
                &#34;redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); &#34; +
                &#34;return 1; &#34; +
            &#34;end; &#34; +
            &#34;return 0;&#34;,
        Collections.&lt;Object&gt;singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}</code></pre></div></div><p><strong>tryLock(long waitTime, TimeUnit unit)</strong></p><p>这个方法没有设置leaseTime,那么leaseTime默认就是-1。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);

}

接下来看tryLock()方法。

tryLock()

调用tryLockAsync()方法来加锁,并且返回加锁结果。我们重点分析下tryAcquireOnceAsync()方法。

代码语言:javascript
复制
public boolean tryLock() {
return get(tryLockAsync());
}
代码语言:javascript
复制
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
代码语言:javascript
复制
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
}

tryAcquireOnceAsync()

tryAcquireOnceAsync()方法同样是通过tryLockInnerAsync()方法异步加锁,并且通过scheduleExpirationRenewal()方法创建锁延期的延迟任务。这两个方法在上文我们已经分析过了,这里就不再重复分析了。

代码语言:javascript
复制
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
//如果leaseTime不是-1 那么就不会刷新锁的过期时间
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

        // lock acquired
        if (ttlRemaining) {
            //创建锁延期的延时任务 加入到时间轮中
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}</code></pre></div></div><p>接下来看lock()方法</p><p><strong>lock()</strong></p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">public void lock() {
    try {
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}</code></pre></div></div><p>首先通过tryAcquire()方法获取锁,如果获取成功直接返回。没有获取锁成功的话,订阅锁释放的事件,然后进入while死循环,如果获取到锁就跳出循环返回,否则会通过Semaphore将当前线程阻塞,直到其他线程调用unlock()方法。tryAcquire()方法我们前面已经分析过了,这里就不再重复分析。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    //返回null 表示获取锁成功
    if (ttl == null) {
        return;
    }


    //没有获取锁成功的话 订阅锁释放的事件
    RFuture&lt;RedissonLockEntry&gt; future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }


    try {
        while (true) {
            //再次尝试获取锁
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                //成功就返回
                break;
            }


            // waiting for message
            if (ttl &gt;= 0) {
                try {
                    //如果锁没有过去,那么通过Semaphore阻塞线程 阻塞时间为锁的剩余过期时间
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    //如果锁已经过期,那么尝试竞争锁,没有抢到锁,则会一直阻塞,直到其他线程调用unlock()方法
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }

// get(lockAsync(leaseTime, unit));
}

小结

至此,加锁的逻辑我们已经分析完。我们可以总结以下几点

1. 加锁是通过lua脚本来实现的,并且使用hash的结构来存储。

2. 如果我们不设置leaseTime参数,那么redisson会自动延期锁的过期时间,直到线程调用unlock()方法锁才会过期,如果我们设置了leaseTime参数,并且值不是-1,那么redisson不会自动续期,到了过期时间,锁会自动释放。

3.自动刷新锁的过期时间是通过netty的时间轮机制,创建延时任务完成的,延时任务在(过期时间/3) 之后执行,每次续期成功之后,又会继续加入新延时任务,这样就可以达到不断刷新锁过期时间的目的。

4.tryLock()超过等待获取锁时间后不会阻塞没有获取到锁的线程,而是返回false。lock()方法如果没有获取到锁,那么线程会被阻塞。

接下来我们看redisson的unlock方法。

unlock()

unlock()方法用于主动释放锁,主要通过调用unlockAsync()方法异步释放锁,然后获取结果,

代码语言:javascript
复制
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}

unlockAsync()

首先调用unlockInnerAsync()方法释放锁,释放成功之后调用cancelExpirationRenewal()方法取消刷新锁过期时间的任务。我们主要看unlockInnerAsync()和cancelExpirationRenewal()这两个方法。

代码语言:javascript
复制
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);

future.onComplete((opStatus, e) -&gt; {
    cancelExpirationRenewal(threadId);


    if (e != null) {
        result.tryFailure(e);
        return;
    }


    if (opStatus == null) {
        IllegalMonitorStateException cause = new IllegalMonitorStateException(&#34;attempt to unlock lock, not locked by current thread by node id: &#34;
                + id + &#34; thread-id: &#34; + threadId);
        result.tryFailure(cause);
        return;
    }


    result.trySuccess(null);
});


return result;

}

unlockInnerAsync()

真正用于解锁的方法,同样是通过lua脚本来实现,简单说下这段lua脚本的意思。

首先看key是否还存在,不存在的话直接返回null。如果key存在,那么将重入次数-1,重入次数不为0,重新刷新锁过期时间,不需要释放锁。如果重入次数已经为0,那么就将key删除,并且发布一个释放锁的消息。

代码语言:javascript
复制
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

订阅了消息的线程必然会执行相关的回调,前面我们已经分析了,线程如果没有获取到锁,那么就会调用subscribe()方法订阅锁释放的消息。subscribe()方法代码很长,重点就是通过createListener()创建一个RedisPubSubListener的监听,收到消息之后会调用RedisPubSubListener中的onMessage()方法。

代码语言:javascript
复制
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
代码语言:javascript
复制
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};

    Runnable listener = new Runnable() {


        @Override
        public void run() {
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getPromise().onComplete(new TransferListener&lt;E&gt;(newPromise));
                return;
            }
            
            E value = createEntry(newPromise);
            value.acquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.acquire();
                semaphore.release();
                oldValue.getPromise().onComplete(new TransferListener&lt;E&gt;(newPromise));
                return;
            }
            
            RedisPubSubListener&lt;Object&gt; listener = createListener(channelName, value);
            service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}</code></pre></div></div><p><strong>onMessage()</strong></p><p>在onMessage()方法中会调用Semaphore的release()方法释放阻塞的线程。让阻塞的线程可以重新开始竞争锁资源。</p><p>注:关于Semaphore,后续肯定会有相关文章对其进行分析,JUC包中的常用工具后续都会有文章进行分享。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">protected void onMessage(RedissonLockEntry value, Long message) {
    //如果是锁释放消息
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        //恢复阻塞的线程
        value.getLatch().release();
       //下面是读写锁的逻辑 可以不管
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }


        value.getLatch().release(value.getLatch().getQueueLength());
    }
}</code></pre></div></div><p>至此解锁的逻辑我们也就分析完了。</p><p><strong>四:小结</strong></p><p>到这里相信我们对redisson的重入锁已经非常了解了,如果遇到面试官再问redisson重入锁的话,相信你已经可以从容面对了。</p><p>如果嫌弃写的太多太长(要分析清楚的话确实没办法的),可以直接看这两张流程图帮助你理解。</p><p><strong>tryLock流程图</strong></p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:100%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1723327113963297960.png" /></div></div></div></figure><p><strong>lock()与unLock()流程图</strong></p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:100%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1723327114049212614.png" /></div></div></div></figure><p>小知识:为什么lua脚本能保证原子性,因为redis在执行lua脚本的时候,如果这时候有其他命令请求,会直接返回服务繁忙,直到lua脚本执行完成。所以lua脚本也要谨慎使用。</p><p>例如写一个死循环的lua脚本:</p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:100%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1723327114425930117.png" /></div></div></div></figure><p>那么其他连接在lua脚本执行完之前执行命令就会报错。</p><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:100%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1723327114707212274.png" /></div></div></div></figure><p><strong>五:最后</strong></p><p>如果有什么疑问,欢迎在下方留言或者私聊我。</p>