[date: 2018-12-17 20:49] [visits: 54]

利用Redis实现分布式锁

在之前的答题对战项目中,游戏规则是根据双方回答同一道题所花时间长短判胜负,但测试过程中遇到一个偶现BUG:“对战开始后双方收到的题目不一致”,经过分析代码,发现问题原因在于发题逻辑没有加锁,所以写篇文章聊聊这个话题。

什么是锁

在编程领域,锁是一种用来做独占代码执行的方式,通俗来讲就是在打算做某件事情之前要申请许可证,如果没有得到许可证则无法做这件事。需要许可证的原因是因为并发现象:“同时出现多个人做这件事”,这种并发现象可能导致逻辑错误。在许可证这个比喻中,“人”是计算机领域的线程、进程等,要做的“事情”则是执行代码。

线程锁

假设有这样一段Java代码:

class Account {
    private int money;

    Account(int init) {
        this.money = init;
    }

    // 购买一个价格为price的商品,成功返回true,否则返回false
    public boolean buyWithPrice(int price) {
        if (this.money < price) {
            return false;
        }

        this.money -= price;
        return true;
    }

    public static void main(String[] args) {
        Account account = new Account(100); 

        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (account.buyWithPrice(100)) {
                        System.out.println("buy success");
                    }
                }
            }).start();
        }
    }
}

因为Javascript是单线程模型,不方便直接模拟线程并发,所以这一段示例代码用的是Java语言。示例中模拟了一个最简单的账户购物场景:“用账户余额(money)购买指定价格(price)的商品,购买成功返回true,余额不够则返回false”,场景中初始化一个余额为100的账户,然后创建两个线程去执行购买操作。

上述场景期望的正确结果是只有一个线程购买成功,另外一个会由于余额不足购买失败,所以我们执行代码只会打印一行'buy success'。但实际运行结果却可能与我们所期望的不同,出现两个线程都购买成功,而出现两者都成功的原因是因为buyWithPrice方法没有“加锁”(Java中称为非线程同步方法),在并发情况下,线程双方在this.money -= price没有执行之前,都执行到if (this.money < price)这一行,判断条件不成立紧接着也就购买成功了,所以最终打印两次'buy success',余额变成了-100。

tips: 这个例子由于代码从判断余额充足到扣除余额执行过快,运行多次也难遇到一次两者都购买成功,但如果在this.money -= price前添加一个短时间的sleep,则很容易出现两者都购买成功

导致两个并行线程能够同时进入buyWithPrice方法的原因是没有锁机制,在Java中添加锁机制避免这种行为的最简单办法是用synchronized关键词将方法声明为“同步方法”,这样就能自动利用内置锁机制保证同时只有一个线程进入到方法执行,可以有效避免出现余额被扣成负数。

tips: 出现线程并发执行的原因不只多核CPU,理论上单核CPU线程上下文切换同样可以出现场景中一样的并发问题

以上内容便是线程锁的概念,用于避免多个线程同时执行某一段代码的锁,线程锁的实现依赖编程语言层面提供的解决方案,在Java中有synchronized,而在Javascript中则不存在,因为Javascript的执行是单线程的。

分布式锁

解释完线程锁,来看看什么是分布式锁?我认为可简单理解为进程锁,线程锁解决的是线程并发所带来的问题,而分布式锁解决的是进程并发所带来的问题,它们之间“锁”是一样的概念,但scope不同。使用分布式锁的进程可以在同一台机器上,也可以在不同机器上,可能正是因为能跨机器的特点,让它被称为分布式锁。

对于使用Node.js的朋友而言,线程锁难以接触,但分布式锁或者说进程锁则相对常见,尤其是做HTTP Server,假设有一个使用余额购买并支付指定商品的接口,这个接口的处理步骤大致如下:

用户发起一个购买请求,服务器就按上述四个步骤去处理,假设某用户只有100的余额,他同时发起两个请求去购买价格为100的商品,服务器应该如何保证他只成功购买一个?因为判断余额充足与扣除余额是两个操作,在余额没被扣除之前,两个请求可能同时执行到判断余额是否充足的步骤,然后再分别执行后续步骤(扣除余额、创建订单),最终导致用户用100余额购买了200的商品。

要避免并发所带来的问题,有多种方式,利用分布式锁就是其中一种比较常见的方式,通过给“查余额”与“扣余额”步骤进行加锁,这个锁能保证同时只有一个请求能处于这两个步骤中,当一个请求释放锁之后,其他的请求再次获得锁并查询余额时会发现余额不足,从而购买失败。

上述的查余额、扣余额绝大数情况下是数据库操作,以MySQL举例,上述场景的对应锁的实现,参考以下代码:

function buyWithGoodsIdAndUserId(goodsId, userId) {
    // 执行SQL
    return mysql.execSql('select price from goods where id = :goodsId', {
        goodsId
    }).then(ret => {
        let goods = ret;

        if (_.isEmpty(goods)) {
            return Promise.reject(new Error('not found'));
        }

        let user = {};

        // 事务封装
        return mysql.transactionTask([
            query => {
                // 查询用户余额,锁的开始
                return query('select balance from user where id = :userId for update', {
                    userId
                }).then(ret => user = ret);
            },
            query => {
                if (user.balance < goods.price) {
                    return Promise.reject('insufficient balance');
                }

                // 扣除余额
                return query('update user set balance = balance - :price where id = :userId', {
                    userId,
                    price: goods.price
                });
            },
            query => {
                return query('insert order ...');
            }
        ]); // 事务提交,锁的结束
    }); 
}

以上代码片段描述了购买商品4个步骤对应的数据库操作,看起来好像未体现分布式锁的有关内容,但其中确实隐含着分布式锁。mysql.transactionTask事务操作中的第一句SQL是:“select ... for update”,这句SQL是MySQL中锁的一个体现,具体细节有机会写MySQL相关文章的时候再细聊,在这个代码示例中由于transactionTask中用到的“select for update”,会导致buyWithGoodsIdAndUserId方法相对对于同一个用户,不论同时有多少调用者,只会有一个调用者处于事务里的三个步骤中,其他调用者需要等待那个唯一正在处理的完成之后才可能进行处理。

分布式锁的实现

利用数据库的锁机制来实现业务的分布式锁是一个比较常见的做法,前文的代码示例是其中隐含有一个分布式锁,而稍加修改则可以变成一个较为通用的分布式锁,实现原理是借助数据库本身的锁机制在代码内实现lock与unlock两个操作,一种方案:

其中lock表的id字段上有唯一约束,可以保证同时多个client去尝试拿锁时,只有一个可以成功

除了用MySQL实现分布式锁之外,我们还可以借助Redis实现分布式锁,原理是利用的Reids的SETNX命令,SETNX在给指定key设置value时会判断key是否已经存在,如果已存在直接返回0,否则设置key=value并返回1。利用这个特性,当出现多个client同时设置同一个key的值时,仅有一个client会得到返回值1,表明拿到了锁,其他client则未拿到。

借助Redis实现分布式锁的代码示例如下:

// key是锁名称,fn是需要做的事情,只支持Promise形式的异步,timeout是锁自动释放的超时间隔
function lockCallWithKey(key, fn, timeout) {
    timeout = timeout || 30000;

    let lockKey = `redis-lock:${key}`;
    let value = uuid.v4();

    return client.setAsync(lockKey, value, 'NX', 'PX', timeout).then(ret => {
        if (ret !== 'OK') {
            return new Promise((resolve, reject) => {
                setTimeout(() => {
                    lockCallWithKey(key, fn, timeout).then(resolve).catch(reject);
                }, Math.ceil(100 + Math.random() * 100)); // try after 100-200ms 
            });
        }

        return fn();
    }).finally(() => {
        let unlockScript = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
        client.evalAsync(unlockScript, 1, lockKey, value).catch(err => {
            logger.error(err);
        });
    });
}

// 使用
lockCallWithKey('example', () => {
    // 同时多个client调用,只会有一个处于执行这个代码块的过程中
    return Promise.resolve('ok');
});

上述代码借助Redis实现了一个分布式锁的机制,其中使用lua脚本而不是Redis del命令释放锁,是为了避免释放不属于自己的锁,假设client-A拿到锁在timeout之后才处理完成,如果使用del命令,则有可能释放了其他client获得的锁,从而可能导致多个client同时持有锁,而使用lua脚本利用value相同才释放的原则可以保证任意client只释放属于自己的锁。

利用Redis实现分布式锁,是一个比较常见的话题,Redis有官方说明,有兴趣的朋友可以参考,Node.js也有已实现的redlock,有需要可以直接拿来使用。

解决问题

写了这么多,回到自己项目中遇到的问题上:“对战双方收到相同题目”,对战流程是收到客户端的ready消息后派发该次对战的题目,BUG出现在两个人同时ready,服务器派发题目的逻辑判断该次对战的题目还未确定,双方都去随机挑选题目然后发放,导致题目不同。而如果双方非同时ready,则是较早的一方为该次对战随机挑选题目,另外一方发现对战题目已经确定,故不会重新挑选。

这个问题与余额的例子是一样的,客户端读某个值然后依赖这个值做某件事继而更新这个值,在并发且没有锁的情况下就容易出现逻辑BUG,修改的方法就是借助上面Redis实现的锁机制为这三个步骤加锁,代码如下:

function dispatchQuestionById(matchId, userId) {
    // avoid client receive different question
    return lib.redis.lockCallWithKey(`zslt:question:${matchId}`, () => {
        return cache.match.questionById(matchId).then(ret => {
            if (!_.isEmpty(ret)) {
                return ret;
            }

            return proxy.question.randomItem().then(ret => {
                ret.timestamp = Date.now();

                return cache.match.setQuestion(matchId, ret, 15).then(() => ret);
            });
        });
    }).then(ret => {
        return tunnel.sendMessageByUserId(userId, {
            id: uuid.v4(),
            name: C.MESSAGE.QUESTION,
            content: {
                matchId,
                question: ret
            }
        });
    }).catch(err => {
        if (err && err.isBreak) {
            return err.result;
        }

        logger.error(err);
        return {};
    });
}