概述

举例:解决 MySQL 库存超卖问题。

方案 1单机时可以使用 JVM 本地锁来解决,但要注意事务、多例模式导致的锁失效。应在事务之外加解锁,否则由于事务隔离级别 (一般是 RC、RR),MVCC 会导致读到的是快照数据。

方案 2:使用单 sql 语句解决,如 update stock set cnt=cnt-1 where product_id=1,优点是并发量比使用 JVM 本地锁要高,但需要注意:

  1. 只适用于简单业务场景、无法记录库存变化状态;
  2. 期望锁的粒度为行级锁,参考 MySQL事务与并发控制和锁,查询字段必须使用索引,否则可能会锁表;

方案 3:使用 MySQL 悲观锁 select xxx from stock for update,可以用于复杂业务场景。需要注意代码中应使用手动事务,并采用合适的索引。存在问题如下:

  1. 性能问题,并发量稍优于 JVM 本地锁,劣于方案 2 单 sql 方式;
  2. 对多条数据的加锁顺序可能导致死锁问题。例如事务 A 锁定了行 1,然后尝试锁定行 2,同时,事务 B 锁定了行 2,然后尝试锁定行 1。应当利用数据库的死锁检测机制 (自动回滚其中一个消耗小的事务),在编码时就考虑因死锁导致的事务回滚,并实现相应的重试逻辑。

方案 4乐观锁,使用 CAS (Compare And Swap) 的思想不加锁。

-- 先查询
select id, cnt, version from stock where product_id=1;
-- 再更新库存与版本号,根据影响条数判断成功失败
update stock set cnt=xx, version=version+1 where id=xx and version=xx;

优点是减少了锁创建释放的开销,适合读多写少场景,缺点:

  1. 高并发下有大量的重试,会浪费 CPU、降低吞吐量;
  2. ABA 问题,通过使用版本号或时间戳解决,这就是为什么不直接使用库存字段判断;
  3. 读写分离情况下,由于主从数据同步延迟,导致乐观锁不可靠。

在分布式微服务高并发、业务复杂的情况下,基于数据库实现乐观锁/悲观锁会有较大压力,因此需要使用分布式锁。分布式锁可以基于 MySQL、Redis 或分布式协调服务 ZooKeeper、etcd 等来实现。

实现分布式锁需要考虑:

  • 原子性,获取锁的行为不可再拆分;
  • 互斥,不能同时被多方持有;
  • 防误删(只有持有锁的人才可以释放锁);
  • 防止死锁:
    • 可重入(引入计数机制);
    • 超时自动释放;
  • 自动续期;
  • 极端场景锁失效问题。

可重入的必要性:例如 methodA() 调用 methodB(),而这两个方法都需要对共享资源进行操作,因此它们需要获取同一把锁来保证数据一致性。

基于 MySQL 实现

分析

使用悲观锁来实现原子性;使用唯一键实现互斥;添加持有者标识字段实现防误删;添加计数字段实现可重入;重置获取锁的时间来实现自动续期;对于超时自动释放比较难实现,可以考虑获取锁时比对该锁的上次获取时间字段,判断是否超过阈值,若超时则直接剥夺。

表结构

create table distributed_lock
(
    lock_key        varchar(255)  not null comment '锁标识(主键)'
        primary key,
    holder_id       varchar(255)  not null comment '持有者标识 uuid',
    reentrant_count int default 0 not null comment '重入次数',
    expire_time     datetime      not null comment '过期时间'
) comment '分布式锁表';

思路

INSERT INTO ... ON DUPLICATE KEY UPDATE 无论有没有冲突,都至少返回 1,若发生了修改则返回 2,即没有返回 0 的情况,无法判断是否获取锁,即无法使用单条语句来实现需求。

使用 SELECT ... FOR UPDATE 添加锁来解决多条语句执行的原子性问题,隔离级别 RC、RR 区别是是否有间隙锁,为了不锁住其他 lock_key,使用 RC 避免间隙锁 (也减小了死锁概率)。MySQL 8.x 版本可以使用 FOR UPDATE NOWITE 来快速失败,避免阻塞。

  • 先 select for update 申请写锁,判断是否有数据:
    • 无数据,则插入,RC 情况下不锁间隙,先插入先成功返回 true,后插入报唯一键冲突返回 false;
    • 有数据,加行锁,判断是否是自己持有的锁:
      • 是,则重入并更新时间,返回 true;
      • 否,则判断是否超时,超时则直接剥夺 (避免上一个持有者异常未释放造成死锁),未超时则返回 false;

其他问题

  • 使用 ThreadLocal 保存线程的持有者标识,确保每次 new 的时候都是相同的,所以不支持多线程环境下调用;
  • 使用 ConcurrentHashMap 保存自动续期任务的引用,防止重入时重复注册自动续期任务;
    • 自动续期任务中应持有业务线程的引用,判断如果业务线程挂了,停止续期;
    • unlock 时判断锁最终被释放时 (即没有重入了) 停止续期任务;
  • 锁过期时间,目前设计是写死的,如果通过构造方法传参的话,无法保 new 多个相同锁的时候传的过期时间相同,影响续期时间。

实现

代码如下

@Slf4j
public class MySqlDsLock implements Lock {
 
    /** 使用 ThreadLocal 确保每个线程创建的锁的持有者标识相同 */
    private static final ThreadLocal<String> threadHolderId = ThreadLocal.withInitial(
        () -> Thread.currentThread().getId() + "@" + UUID.randomUUID()
    );
 
    private final TransactionTemplate transactionTemplate;
    private final JdbcTemplate jdbcTemplate;
 
    /** 锁名称 */
    private final String lockKey;
    /** 持有者标识 */
    private final String holderId;
    /** 过期时间,秒数 */
    private final long expireSeconds;
 
    /** 自动续期任务线程池 */
    private final ScheduledExecutorService RENEWAL_TASK_SCHEDULER = Executors.newScheduledThreadPool(2);
    /** 自动续期任务引用,key 为 lockKey + holderId,防止重入时重复注册续期任务 */
    private static final ConcurrentMap<String, ScheduledFuture<?>> RENEWAL_TASKS_MAP = new ConcurrentHashMap<>();
 
    /**
     * 创建一个分布式锁,默认30秒过期
     *
     * @param lockKey      锁名称
     * @param jdbcTemplate 操作数据库
     */
    public MySqlDsLock(String lockKey, JdbcTemplate jdbcTemplate, PlatformTransactionManager transactionManager) {
        this.transactionTemplate = new TransactionTemplate(transactionManager);
        this.transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
        this.lockKey = lockKey;
        this.jdbcTemplate = jdbcTemplate;
        this.holderId = threadHolderId.get();
        this.expireSeconds = 30; // 默认 n 秒过期
    }
 
    /**
     * 阻塞获取锁,直到成功
     */
    @Override
    public void lock() {
        while (!tryLock()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                log.debug("获取锁时线程被中断,重置中断标志", e);
                Thread.currentThread().interrupt();
            }
        }
    }
 
    /**
     * 尝试获取锁,立即返回
     */
    @Override
    public boolean tryLock() {
        return tryLockInternal(0, null);
    }
 
    /**
     * 尝试在给定时间内获取锁
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time <= 0) {
            throw new IllegalArgumentException("参数 time 必须 > 0");
        }
        if (unit == null) {
            throw new NullPointerException("参数 unit 不能位空");
        }
 
        return tryLockInternal(time, unit);
    }
 
    private boolean tryLockInternal(long time, TimeUnit unit) {
        long start = System.currentTimeMillis();
        long timeout = unit != null ? unit.toMillis(time) : Long.MAX_VALUE;
 
        do {
            if (acquireLock()) {
                startRenewalTask(); // 启动自动续期
                return true;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                log.debug("获取锁时线程被中断,重置中断标志", e);
                Thread.currentThread().interrupt();
            }
        } while (System.currentTimeMillis() - start < timeout);
 
        return false;
    }
 
    /**
     * <p>INSERT INTO ... ON DUPLICATE KEY UPDATE,无论有没有冲突,都至少返回 1,若发生了修改则返回 2,即没有返回 0 的情况,无法判断是否获取锁;
     * <p>所以要使用 SELECT ... FOR UPDATE 添加锁来解决多条语句执行的原子性问题,隔离级别 RC、RR 区别是是否有间隙锁,为了不锁住其他锁,使用 RC;
     * <p>先 select for update 申请写锁,判断是否有数据:
     * <p>  无数据则插入,RC 情况下不锁间隙,先插入先成功,后插入报唯一键冲突;
     * <p>  有数据则判断是否是自己的:
     * <p>      是则重入并更新时间;
     * <p>      否则判断是否超时,超时则直接剥夺(避免上一个持有者异常未释放造成死锁),未超时则返回 false
     *
     * @return boolean
     */
    private boolean acquireLock() {
        Boolean result = transactionTemplate.execute(status -> {
            try {
                String sql = "SELECT lock_key, holder_id, reentrant_count, expire_time FROM distributed_lock WHERE lock_key = ? FOR UPDATE";
                MySqlDsLockEntity entity = jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(MySqlDsLockEntity.class), lockKey).stream().findFirst().orElse(null);
 
                // 锁未被持有,尝试插入新锁记录
                if (entity == null) {
                    int update = jdbcTemplate.update(
                        "INSERT INTO distributed_lock (lock_key, holder_id, reentrant_count, expire_time) " +
                            "VALUES (?, ?, 1, NOW() + INTERVAL ? SECOND) ",
                        lockKey, holderId, expireSeconds
                    );
                    return update > 0;
                }
                // 锁已被持有
                else {
                    long currentTimeMillis = System.currentTimeMillis();
                    // 自己持有,重入并且更新时间
                    if (Objects.equals(entity.getHolderId(), holderId)) {
                        int update = jdbcTemplate.update(
                            "UPDATE distributed_lock SET " +
                                "reentrant_count = reentrant_count + 1, " +
                                "expire_time = NOW() + INTERVAL ? SECOND " +
                                "WHERE lock_key = ? AND holder_id = ?",
                            expireSeconds, lockKey, holderId
                        );
                        return update > 0;
                    }
                    // 非自己持有,判断是否超时,若超时则剥夺
                    else {
                        if (entity.getExpireTime().getTime() > currentTimeMillis) return false;
                        int update = jdbcTemplate.update(
                            "UPDATE distributed_lock SET " +
                                "holder_id = ?, " +
                                "reentrant_count = 1, " +
                                "expire_time = NOW() + INTERVAL ? SECOND " +
                                "WHERE lock_key = ?",
                            holderId, expireSeconds, lockKey
                        );
                        return update > 0;
                    }
                }
            } catch (Exception e) {
                status.setRollbackOnly();
                // 忽略主键冲突和死锁
                if (e instanceof DuplicateKeyException || e instanceof DeadlockLoserDataAccessException) return false;
                log.debug("获取锁,操作数据库时出现异常", e);
                throw e;
            }
        });
        return Boolean.TRUE.equals(result);
    }
 
    @Override
    public void unlock() {
        int u1 = 0;
        int u2 = 0;
        try {
            // 释放 reentrant_count <= 1
            u1 = jdbcTemplate.update(
                "DELETE FROM distributed_lock " +
                    "WHERE lock_key = ? AND holder_id = ? AND reentrant_count <= 1",
                lockKey, holderId
            );
            // u1 等于 0 说明锁被重入,则执行可重入次数 - 1
            if (u1 == 0) {
                u2 = jdbcTemplate.update(
                    "UPDATE distributed_lock SET reentrant_count = reentrant_count - 1 " +
                        "WHERE lock_key = ? AND holder_id = ? AND reentrant_count > 1",
                    lockKey, holderId
                );
            }
 
            if (u1 == 0 && u2 == 0) {
                this.stopRenewalTask();
                throw new IllegalStateException("锁已释放或非当前持有者");
            }
        } finally {
            // 说明锁已释放,停止自动续期任务
            if (u1 > 0) this.stopRenewalTask();
        }
    }
 
    /**
     * 自动续期任务
     */
    private void startRenewalTask() {
        // 避免重复启动
        ScheduledFuture<?> future = RENEWAL_TASKS_MAP.get(lockKey + holderId);
        if (future != null && !future.isDone()) return;
 
        Thread currentThread = Thread.currentThread();
 
        // 启动任务并添加到 MAP
        future = RENEWAL_TASK_SCHEDULER.scheduleAtFixedRate(() -> {
            // 如果业务线程挂了,就停止续期任务
            if (!currentThread.isAlive()) {
                this.stopRenewalTask();
                return;
            }
            // 更新过期时间
            int update = jdbcTemplate.update(
                "UPDATE distributed_lock SET expire_time = NOW() + INTERVAL ? SECOND " +
                    "WHERE lock_key = ? AND holder_id = ?",
                expireSeconds, lockKey, holderId
            );
            if (update == 0) {
                this.stopRenewalTask();
            }
        }, expireSeconds / 3, expireSeconds / 3, TimeUnit.SECONDS);// 每 1/3 过期时间续期一次
 
        RENEWAL_TASKS_MAP.put(lockKey + holderId, future);
    }
 
    private void stopRenewalTask() {
        ScheduledFuture<?> future = RENEWAL_TASKS_MAP.get(lockKey + holderId);
        if (future != null) {
            future.cancel(true);
            RENEWAL_TASKS_MAP.remove(lockKey + holderId);
        }
    }
 
    // 以下方法在分布式锁中不实现
    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }
 
    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
 
    @Data
    private static class MySqlDsLockEntity {
 
        private String lockKey;
        private String holderId;
        private Integer reentrantCount;
        private Date expireTime;
 
    }
 
}
 

基于 Redis 实现

setnx + Lua

只适用于单机 Redis,主从/集群状态下锁可能会失效

例如:

  1. 客户端 A 获取 master 中的锁;
  2. master 在将对 key 的写入同步到 slave 前宕机;
  3. slave 提升为 master;
  4. 客户端 B 获取 A 已经持有的锁。❌

不可重入

  • 加锁:使用 UUID 作为身份标识
// set lock uuid ex 3 nn
redisTemplate.opsForValue.setIfAbsent("lock", "${uuid}", 3, TimeUnit.SECONDS)
  • 解锁:使用 Lua 脚本实现原子性对比身份标识释放锁
-- 解锁脚本.lua
if redis.call('get', KEYS[1]) == ARGV[1] then 
  return redis.call('del', KEYS[1])
else 
  return 0
end
// lock 锁的 key
// requestId 身份标识
DefaultRedisScript<Long> script = new DefaultRedisScript<>("解锁脚本.lua");
script.setResultType(Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(lock), requestId);

可重入与续期

解决可重入问题,除了身份标识,还需要锁状态,需要使用 hash 结构

  • 加锁.lua
-- 参数
-- key: lockKey
-- arg: requestId expireSeconds
 
-- 如果锁不存在、或存在且是当前请求人持有的
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1
then
    -- 则锁标识 state++
    redis.call('hincrby', KEYS[1], ARGV[1], 1)
    -- 更新过期时间
    redis.call('expire', KEYS[1], ARGV[2])
    return 1
else
    return 0
end
  • 释放锁.lua
-- 参数
-- key: lockKey
-- arg: requestId
 
--  如果当前请求人没有持有锁,返回 nil
if redis.call('hexists', KEYS[1], ARGV[1]) == 0
then return nil
-- 否则判断 state-- 是否等于 0
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0
-- 等于 0 可以释放锁
then return redis.call('del', KEYS[1])
-- 否则返回 0
else return 0
end
  • 续期.lua
-- 参数
-- key: lockKey
-- arg: requestId expireSeconds
if redis.call('hexists', KEYS[1], ARGV[1]) == 1
then return redis.call('expire', KEYS[1], ARGV[2])
else return 0
end

Java 代码中续期使用 Timer,每隔一段时间 (例如 1/3 过期时间),判断当前业务线程存活状态决定是否续期。

代码实现

@Slf4j
public class LuaRedisDsLock implements Lock {
 
    // lua 脚本
    private static final DefaultRedisScript<Long> LOCK_SCRIPT;
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    private static final DefaultRedisScript<Long> RENEWAL_SCRIPT;
 
    static {
        LOCK_SCRIPT = new DefaultRedisScript<>();
        LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lock.lua")));
        LOCK_SCRIPT.setResultType(Long.class);
 
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/unlock.lua")));
        UNLOCK_SCRIPT.setResultType(Long.class);
 
        RENEWAL_SCRIPT = new DefaultRedisScript<>();
        RENEWAL_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/renewal.lua")));
        RENEWAL_SCRIPT.setResultType(Long.class);
    }
 
    /** 使用 ThreadLocal 确保每个线程创建的锁的持有者标识相同 */
    private static final ThreadLocal<String> threadHolderId = ThreadLocal.withInitial(
        () -> Thread.currentThread().getId() + "@" + UUID.randomUUID()
    );
    /** 自动续期任务线程池 */
    private final ScheduledExecutorService RENEWAL_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor();
    /** 自动续期任务引用,key 为 lockKey + holderId,防止重入时重复注册续期任务 */
    private static final ConcurrentMap<String, ScheduledFuture<?>> RENEWAL_TASKS_MAP = new ConcurrentHashMap<>();
 
    private final RedisTemplate<String, Object> redisTemplate;
    /** 锁名称 */
    private final String lockKey;
    /** 持有者标识 */
    private final String holderId;
    /** 过期时间,秒数 */
    private final long expireSeconds = 30;
 
    public LuaRedisDsLock(String lockKey, RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.holderId = threadHolderId.get();
    }
 
    /**
     * 阻塞获取锁,直到成功
     */
    @Override
    public void lock() {
        while (!tryLock()) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                log.debug("获取锁时线程被中断,重置中断标志", e);
                Thread.currentThread().interrupt();
            }
        }
    }
 
    /**
     * 尝试获取锁,立即返回
     */
    @Override
    public boolean tryLock() {
        return tryLockInternal(0, null);
    }
 
    /**
     * 尝试在给定时间内获取锁
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time <= 0) {
            throw new IllegalArgumentException("参数 time 必须 > 0");
        }
        if (unit == null) {
            throw new NullPointerException("参数 unit 不能位空");
        }
 
        return tryLockInternal(time, unit);
    }
 
 
    @Override
    public void unlock() {
        Long result = redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(lockKey), holderId);
        if (result == null) throw new IllegalStateException("锁已释放或非当前持有者");
        if (result > 0) this.stopRenewalTask();
    }
 
    private boolean tryLockInternal(long time, TimeUnit unit) {
        long start = System.currentTimeMillis();
        long timeout = unit != null ? unit.toMillis(time) : Long.MAX_VALUE;
 
        do {
            if (acquireLock()) {
                startRenewalTask(); // 启动自动续期
                return true;
            }
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                log.debug("获取锁时线程被中断,重置中断标志", e);
                Thread.currentThread().interrupt();
            }
        } while (System.currentTimeMillis() - start < timeout);
 
        return false;
    }
 
    private boolean acquireLock() {
        Long result = redisTemplate.execute(LOCK_SCRIPT, Collections.singletonList(lockKey), holderId, expireSeconds);
        return Objects.equals(result, 1L);
    }
 
    /**
     * 自动续期任务
     */
    private void startRenewalTask() {
        // 避免重复启动
        ScheduledFuture<?> future = RENEWAL_TASKS_MAP.get(lockKey + holderId);
        if (future != null && !future.isDone()) return;
 
        Thread currentThread = Thread.currentThread();
 
        // 启动任务并添加到 MAP
        future = RENEWAL_TASK_SCHEDULER.scheduleAtFixedRate(() -> {
            // 如果业务线程挂了,就停止续期任务
            if (!currentThread.isAlive()) {
                this.stopRenewalTask();
                return;
            }
            Long result = redisTemplate.execute(RENEWAL_SCRIPT, Collections.singletonList(lockKey), holderId, expireSeconds);
 
            if (result == 0) {
                this.stopRenewalTask();
            }
        }, expireSeconds / 3, expireSeconds / 3, TimeUnit.SECONDS);// 每 1/3 过期时间续期一次
 
        RENEWAL_TASKS_MAP.put(lockKey + holderId, future);
    }
 
    private void stopRenewalTask() {
        ScheduledFuture<?> future = RENEWAL_TASKS_MAP.get(lockKey + holderId);
        if (future != null) {
            future.cancel(true);
            RENEWAL_TASKS_MAP.remove(lockKey + holderId);
        }
    }
 
 
    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }
 
    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
 
}

RedLock(了解)

RedLock 是 Redis 官方提供的分布式锁方案,以解决 Redis 单点故障导致分布式锁失效的问题。需要部署 N 个完全独立的 Redis 实例(无主从、无集群关联),成本较高。

RedLock 获取锁时,客户端执行步骤如下:

  1. 获取当前时间;
  2. 使用相同的键名称和随机值,依次尝试获取所有 N 个实例中的锁。要给获取锁的动作设置一个小于锁过期时间的超时时间,避免客户端长时间与已宕机的实例通信;
  3. 当能够在半数以上 (N/2+1) 实例上获取锁时,计算获取锁花费的时间(当前时间减去步骤一的时间)进行判断,如果获取锁花费的时间小于锁过期时间,则认为获取锁成功;
  4. 如果获取锁成功,则锁的有效期为设置的有效期减去获取锁花费的时间;
  5. 如果获取锁失败(未达到半数以上实例或锁过期),则尝试在所有 N 个实例上释放锁。

注意:某个客户端获取锁失败后,应当在随即延迟后重试。如果是未能在半数以上实例获取锁的情况,则应当尽快释放锁

How to do distributed locking — Martin Kleppmann’s blog 这篇博客提到了分布式锁的一些问题,指出不推荐 RedLock,讨论了 RedLock 失效的一些情况,例如:

1. 客户端 1 获取节点 A、B、C 上的锁。由于网络问题,无法访问 D 和 E;
2. 节点 C 上的时钟向前跳转或宕机重启,导致锁过期;
3. 客户端 2 获取节点 C、D、E 上的锁。由于网络问题,无法访问 A 和 B;
4. 客户 1 和 2 现在都认为他们掌握了锁;

如何解决上面的情形,RedisLock 文档建议一个方法是延时启动,即当一个实例下线后,保证其在大于我们所使用的最大过期时间之后,再上线。那还有下面一种情形:

1. 客户端 1 请求节点 A、B、C、D、E 上的锁;
2. 当对客户端 1 的响应正在传输时,客户端 1 进入停止世界 GC;
3. 所有 Redis 节点上的锁都会过期;
4. 客户端 2 获取节点 A、B、C、D、E 上的锁;
5. 客户端 1 完成 GC,并从 Redis 节点接收响应,表明它成功获取了锁(当进程暂停时,它们保存在客户端 1 的内核网络缓冲区中);
6. 客户 1 和 2 现在都认为他们掌握了锁;

即使没有 GC,较长的网络时延或硬件问题也可能导致类似的效果。所以 RedLock 的准确性建立在三个条件上:有限的网络延迟、有限的进程暂停、有限的时钟错误 (指 NTP 同步无问题)。

Redisson

Redisson 是一个开源的基于 Redis 实现的 Java 工具类库,除了分布式锁实现,还包含了许多如分布式 Java 常用对象 (List、Map、Set…) 等工具类。

集成 SpringBoot 参考:Integration with Spring - Redisson Reference Guide

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.41.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-data-34</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- redisson-spring-data-xx 要与 spring-data-redis 版本对应 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-26</artifactId>
    <version>3.41.0</version>
</dependency>
// 同一个 redissonClient 创建的锁,持有人id为 redissonClient实例ID:线程id
@Autowired
RedissonClient redissonClient;
 
RLock lock = redisson.getLock("myLock");
// traditional lock method
lock.lock();
// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);
// or wait for lock acquisition up to 100 seconds and auto-unlock after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
    try {
        // ...
    } finally {
        lock.unlock();
    }
}

RedissonLock 在获取锁时,使用了 Redis 的发布订阅来解决轮询获取浪费 CPU 的问题。

基于 ZooKeeper 实现

Curator 是 Netflix 开源的 ZooKeeper 客户端,封装了分布式锁实现。

其中 InterProcessMutex 类是一个基于 ZooKeeper 临时顺序节点实现的可重入公平分布式锁,它使用本地 ConcurrentHashMap 保存重入次数,每个请求者只操作自己获取的节点从而解决误删问题。zk 天然支持原子性和互斥,临时节点的特性满足了自动续期和释放。

InterProcessMutex 获取锁的流程如下图所示,相关代码分析可以参考 这篇文章

sequenceDiagram
    participant B as 业务线程
    participant C as Curator
    participant Z as ZooKeeper
    B->>C: acquire()
    C->>Z: 创建临时有序节点
    Z-->>C: 返回节点路径
    C->>Z: 获取所有子节点
    alt 是最小节点
        C-->>B: 立即返回(获得锁)
    else 非最小节点
        C->>Z: 在前驱节点设置Watcher
        B->>B: 线程阻塞等待
        Z->>C: 前驱节点删除通知
        C-->>B: 唤醒线程重新检查
    end
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.7.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.7.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.9.3</version>
</dependency>
// 创建 Curator Client
@Bean
public CuratorFramework curatorFramework() {
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("localhost:2181")
        .retryPolicy(new ExponentialBackoffRetry(3000, 3))
        .build();
    // 必须调用 start()
    client.start();
    return client;
}
 
@Autowired
private CuratorFramework curatorFramework;
@Test
public void testCurator1() throws Exception {
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
    // InterProcessMutex 可复用,路径参数必须唯一
    InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/curator/locks/test");
 
    for (int i = 0; i < 1000; i++) {
        executor.submit(() -> {
            try {
                try {
                    lock.acquire();
                    // 临界区操作
                    cnt -= 1;
                } finally {
                    lock.release();
                }
            } catch (Exception e) {
                log.error("锁操作异常", e);
            }
        });
    }
 
    executor.shutdown();
    executor.awaitTermination(10, TimeUnit.MINUTES);
    log.info("最终cnt值: {}", cnt);
}