锁是我们在实现大多数系统时绕不过的话题。一旦有竞态条件出现,任何不经保护的操作,都可能带来问题。而现代系统大多为分布式系统,这就引入了分布式锁,要求具有在分布各处的服务上保护资源的能力。而实现分布式锁,目前大多有以下 3 种方式:
- 使用数据库实现
- 使用 Redis 等缓存系统实现
- 使用 ZooKeeper 等分布式协调系统实现
其中 Redis 简便灵活,高可用分布式,且支持持久化。本文即介绍基于 Redis 实现分布式锁。
SETNX 语义
使用 Redis 实现分布式锁,根本原理是 SETNX 指令。其语义如下:
1 2 |
SETNX key value |
如果 key 不存在,则设置 key 值为 value(同 set);如果 key 已经存在,则不执行赋值操作。并使用不同的返回值标识。官网文档。
还可以通过 SET 命令的 NX 选项使用:
1 2 |
SET key value [expiration EX seconds|PX milliseconds] [NX|XX] |
NX – 仅在 key 不存在时执行赋值操作。官网文档。
而如下文所述,通过 SET 的 NX 选项使用,可同时使用其它选项,如 EX/PX 设置超时时间,是更好的方式。
SETNX 实现分布式锁
下面我们对比下几种具体实现方式。
方案1:SETNX + Delete
伪代码如下:
1 2 3 4 |
setnx lock_a random_value // do sth delete lock_a |
此实现方式的问题在于:一旦服务获取锁之后,因某种原因挂掉,则锁一直无法自动释放。从而导致死锁。
方案2:SETNX + SETEX
伪代码如下:
1 2 3 4 5 |
setnx lock_a random_value setex lock_a 10 random_value // 10s超时 // do sth delete lock_a |
按需设置超时时间。此方案解决了方案 1 死锁的问题,但同时引入了新的死锁问题:如果 SETNX 之后、SETEX 之前服务挂掉,会陷入死锁。根本原因为 SETNX/SETEX 分为了两个步骤,非原子操作。
方案3:SET NX PX
伪代码如下:
1 2 3 4 |
SET lock_a random_value NX PX 10000 // 10s超时 // do sth delete lock_a |
此方案通过 SET 的 NX/PX 选项,将加锁、设置超时两个步骤合并为一个原子操作,从而解决方案 1、2 的问题。( PX 与 EX 选项的语义相同,差异仅在单位。)此方案目前大多数SDK、Redis 部署方案都支持,因此是推荐使用的方式。
但此方案也有如下问题:
- 如果锁被错误的释放(如超时),或被错误的抢占,或因redis问题等导致锁丢失,无法很快的感知到。
方案4:SET Key RandomValue NX PX
方案 4 在 3 的基础上,增加对 value 的检查,只解除自己加的锁。类似于 CAS,不过是 compare-and-delete。此方案 Redis 原生命令不支持,为保证原子性,需要通过 Lua 脚本实现。
伪代码如下:
1 2 3 4 |
SET lock_a random_value NX PX 10000 // do sth eval "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 lock_a random_value |
此方案更严谨:即使因为某些异常导致锁被错误的抢占,也能部分保证锁的正确释放。并且在释放锁时能检测到锁是否被错误抢占、错误释放,从而进行特殊处理。
注意事项
超时时间
从上述描述可看出,超时时间是一个比较重要的变量:
- 超时时间不能太短,否则在任务执行完成前就自动释放了锁,导致资源暴露在锁保护之外。
- 超时时间不能太长,否则会导致意外死锁后长时间的等待。除非人为接入处理。
因此建议是根据任务内容,合理衡量超时时间,将超时时间设置为任务内容的几倍即可。如果实在无法确定而又要求比较严格,可以采用定期 SETEX/Expire 更新超时时间实现。
重试
如果拿不到锁,建议根据任务性质、业务形式进行轮询等待。等待次数需要参考任务执行时间。
与 Redis 事务的比较
SETNX 使用更为灵活方案。Multi/Exec 的事务实现形式更为复杂。且部分 Redis 集群方案 ( 如 Codis ),不支持 Multi/Exec 事务。
Golang Demo
基于 Redigo 简单实例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
package main import ( "fmt" "sync" "time" "github.com/garyburd/redigo/redis" ) func getLock(redisAddr, lockKey string, ex uint, retry int) error { if retry <= 0 { retry = 10 } conn, err := redis.DialTimeout("tcp", redisAddr, time.Minute, time.Minute, time.Minute) if err != nil { fmt.Println("conn to redis failed, err:%v", err) return err } defer conn.Close() ts := time.Now() // as random value for i := 1; i <= retry; i++ { if i > 1 { // sleep if not first time time.Sleep(time.Second) } v, err := conn.Do("SET", lockKey, ts, "EX", retry, "NX") if err == nil { if v == nil { fmt.Println("get lock failed, retry times:", i) } else { fmt.Println("get lock success") break } } else { fmt.Println("get lock failed with err:", err) } if i >= retry { err = fmt.Errorf("get lock failed with max retry times.") return err } } return nil } func unLock(redisAddr, lockKey string) error { conn, err := redis.DialTimeout("tcp", redisAddr, time.Minute, time.Minute, time.Minute) if err != nil { fmt.Println("conn to redis failed, err:%v", err) return err } defer conn.Close() v, err := redis.Bool(conn.Do("DEL", lockKey)) if err == nil { if v { fmt.Println("unLock success") } else { fmt.Println("unLock failed") return fmt.Errorf("unLock failed") } } else { fmt.Println("unLock failed, err:", err) return err } return nil } const ( RedisAddr = "127.0.0.1:3000" ) func main() { var wg sync.WaitGroup key := "lock_demo" for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() time.Sleep(time.Second) // getLock err := getLock(RedisAddr, key, 10, 10) if err != nil { fmt.Println(fmt.Sprintf("worker[%d] get lock failed:%v", id, err)) return } // sleep for random for j := 0; j < 5; j++ { time.Sleep(time.Second) fmt.Println(fmt.Sprintf("worker[%d] hold lock for %ds", id, j+1)) } // unLock err = unLock(RedisAddr, key) if err != nil { fmt.Println(fmt.Sprintf("worker[%d] unlock failed:%v", id, err)) } fmt.Println(fmt.Sprintf("worker[%d] done", id)) }(i) } wg.Wait() fmt.Println("demo is done!") } |
参考
SETEX 命令
SET 命令
Distributed locks with Redis
本文作者:王斌