Redis 分布式锁以及 Redisson 的使用

chan 作者
阅读 8254 喜欢 2

场景设计:购买车票,扣减库存

假设存在一个 SpringBoot 的控制器,其扣减库存的业务逻辑如下:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    // 将商品库存获取出来
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判定库存是否能扣减
    if (stock > 0) {
        // 扣减库存并设置进缓存
        int resultStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
        log.info("扣减库存成功,剩余库存:" + resultStock);
    } else {
        log.info("库存不足,已售罄");
        return "fail";
    }
    return "success";
}

不难看出,在应用服务器运行这段代码的时候就会有线程安全性问题。因为多个线程同时去修改 Redis 服务中的数据。因此考虑给这段代码加上一把锁:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    synchronized(this) {
        // 将商品库存获取出来
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判定库存是否能扣减
        if (stock > 0) {
            // 扣减库存并设置进缓存
            int resultStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
            log.info("扣减库存成功,剩余库存:" + resultStock);
        } else {
            log.info("库存不足,已售罄");
            return "fail";
        }
        return "success";
    }
}

这样一来,当多个 HTTP 请求来请求数据的时候,多个线程去修改同一数据会有 JVM 本地锁来进行合理的资源限制。虽然这样解决了线程安全性问题,但是这仅仅是 JVM 级别的锁,在分布式的环境下,由于像这样的 Web 应用随时会进行动态扩容,因此当多个应用的时候,同样会有线程安全性问题,当上面这段代码遇到类似下面的架构时还是会有各种各样的问题:

对于上述的情况,我们可以使用 redis api 提供的 setnx 方法解决:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    // 通过 Redis setnx 设置锁,只有在 key 不存在的时候才会设置成功,返回 1
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "buy");

    // 锁设置失败,表面存在锁,有进程在使用
    if (!flag) {
        return "fail";
    }

    // 将商品库存获取出来
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判定库存是否能扣减
    if (stock > 0) {
        // 扣减库存并设置进缓存
        int resultStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
        log.info("扣减库存成功,剩余库存:" + resultStock);
    } else {
        log.info("库存不足,已售罄");
        // 删除锁
        stringRedisTemplate.delete("lock");
        return "fail";
    }
    // 删除锁
    stringRedisTemplate.delete("lock");
    return "success";
}

这样的话,首先尝试获取锁,然后当业务执行完成的时候再删除锁。但是还是有问题的,当获取锁的时候抛出异常或者业务执行抛出异常怎么办,所以加入异常处理逻辑:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    try {
        // 通过 Redis setnx 设置锁,只有在 key 不存在的时候才会设置成功,返回 1
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "buy");

        // 锁设置失败,表面存在锁,有进程在使用
        if (!flag) {
            return "fail";
        }

        // 将商品库存获取出来
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判定库存是否能扣减
        if (stock > 0) {
            // 扣减库存并设置进缓存
            int resultStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
            log.info("扣减库存成功,剩余库存:" + resultStock);
        } else {
            log.info("库存不足,已售罄");
            return "fail";
        }
    } finally {
        stringRedisTemplate.delete("lock");
    }
    return "success";
}

经过这样的修改,看起来没什么问题了。但是当程序获得锁并且开始执行业务逻辑的时候,突然程序挂掉了或者被一些粗暴的运维工程师给kill,在finally中删除锁的逻辑就会得不到执行,因此就会产生死锁。对于这种情况,我们可以给这个锁设置一个超时时间:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    try {
        // 通过 Redis setnx 设置锁,只有在 key 不存在的时候才会设置成功,返回 1
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "buy");

        // 设置超时时间, 根据业务场景估计超时时长
        stringRedisTemplate.expire("lock", 10, TimeUnit.SECONDS);

        // 锁设置失败,表面存在锁,有进程在使用
        if (!flag) {
            return "fail";
        }

        // 将商品库存获取出来
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判定库存是否能扣减
        if (stock > 0) {
            // 扣减库存并设置进缓存
            int resultStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
            log.info("扣减库存成功,剩余库存:" + resultStock);
        } else {
            log.info("库存不足,已售罄");
            return "fail";
        }
    } finally {
        stringRedisTemplate.delete("lock");
    }
    return "success";
}

如果程序这么来写,相对来说安全一些了,但是还是存在问题。试想一下,当获取锁成功时,正想给这把锁设置超时的时候,程序挂掉了,还是会出现死锁的,因此在redis较高的版本中提供的setIfAbsent方法中可以同时设置锁的超时时间:

Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", "buy", 10, TimeUnit.SECONDS);

这样一来,尝试获取锁和设置锁的超时时间就具备原子性了。实际上经过我们这一番改造,这在小型企业已经没有太大的问题, 因为像这种代码每天也就执行几百次,并不算做高并发的场景。当这样的代码被暴露在超高并发场景下的时候,还是会存在各种各样的问题。假设场景:当第一个请求获取锁并成功设置超时时间,但执行业务逻辑的时长超过了锁的超时时间,此时,第一个请求的业务逻辑在锁过期后还没执行完毕,第二个请求又获取了锁,而这时第二个请求刚获取了锁,第一个请求执行完毕并删除了锁,这个时候 redis 中又没有锁了,这样第三个HTTP请求又会获得锁,所以情况就不妙了。
为了解决上面的问题,我们可以将代码优化为下面的样子:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "buy")
public String but() {
    // 为每个请求的客户端设置 uuid
    String client = UUID.randomUUID().toString();
    Delay delay = null;
    try {
        // 获取锁同时设置锁超时时间,具有原子性
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("lock", client, 10, TimeUnit.SECONDS);

        // 锁设置失败,表面存在锁,有进程在使用
        if (!flag) {
            return "fail";
        }

        // 开启守护线程,定时续期锁,防止业务未执行完毕锁过期
        delay = new Delay("lock");
        delay.setDaemon(true);
        delay.start();

        // 将商品库存获取出来
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判定库存是否能扣减
        if (stock > 0) {
            // 扣减库存并设置进缓存
            int resultStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
            log.info("扣减库存成功,剩余库存:" + resultStock);
        } else {
            log.info("库存不足,已售罄");
            return "fail";
        }
    } finally {
        // 删除属于自己的锁
        if (client.equals(stringRedisTemplate.opsForValue().get("lock"))) {
            delay.stop();
            stringRedisTemplate.delete("lock");
        }
    }
    return "success";
}

class Delay extends Thread {
    String lock;

    public Delay(String lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(9000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 为锁续期
            stringRedisTemplate.expire(lock, 10, TimeUnit.SECONDS);
        }
    }
}

以上基本可以解决分布式锁的问题,但是这样程序的复杂性就会增加,每个业务逻辑都要写好多的代码,因此这里推荐在分布式环境下使用 redisson 实现分支线程的代码:

  • 引入依赖
<!-- Redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.0</version>
</dependency>
  • 配置 redisson
@Bean
public RedissonClient redissonClient(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    return Redisson.create(config);
}
  • 封装一个获取锁、释放锁的类
/**
 * @author CHAN
 * @date 2020/03/15
 */
@Component
@Slf4j
public class DistributeRedisLock {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 获取锁
     * @param lockName 锁名
     * @return -
     */
    public boolean lock(String lockName) {
        try {
            RLock lock = redissonClient.getLock(lockName);
            lock.lock(30, TimeUnit.SECONDS);
            return true;
        } catch (Exception ex) {
            log.info("加锁失败");
            return false;
        }
    }

    /**
     * 释放锁
     * @param lockName 锁名
     * @return -
     */
    public boolean unLock(String lockName) {
        try {
            RLock lock = redissonClient.getLock(lockName);
            if (lock != null) {
                lock.unlock();
                return true;
            }
            log.info("没有锁");
            return false;
        } catch (Exception ex) {
            log.info("加锁失败");
            return false;
        }
    }
}
  • 最终的库存业务代码
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Autowired
private DistributeRedisLock distributeRedisLock;

@RequestMapping(value = "buy")
public String but() {
    try {
        // 获取锁
        boolean flag = distributeRedisLock.lock("lock");

        // 锁设置失败,表面存在锁,有进程在使用
        if (!flag) {
            return "fail";
        }

        // 将商品库存获取出来
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判定库存是否能扣减
        if (stock > 0) {
            // 扣减库存并设置进缓存
            int resultStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(resultStock));
            log.info("扣减库存成功,剩余库存:" + resultStock);
        } else {
            log.info("库存不足,已售罄");
            return "fail";
        }
    } finally {
        // 释放锁
        distributeRedisLock.unLock("lock");
    }
    return "success";
}

但是这个架构还是存在问题的,因为redis服务器是主从的架构,当在master节点设置锁之后,slave节点会立刻同步。但是如果刚在master节点设置上了锁,slave节点还没来得及设置,master节点就挂掉了。还是会产生上同样的问题,新的线程获得锁。

因此使用redis构建高并发的分布式锁,仅适合单机架构,当使用主从架构的redis时还是会出现线程安全性问题。

全部评论0