事务和锁
Redis的事务定义
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。
Multi、Exec、discard
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。
比如在shell中组队成功,提交成功如下:
在shell中组队阶段报错,提交失败如下:
事务的错误处理
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
特别的说明,只有执行到错误的命令后才会造成队列销毁取消。这与mysql的事务并不太一样,mysql的事务是要么全部成功,要么全部失败。 redis的事务是如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
为什么要有事务
想想一个场景:有很多人有你的账户,同时去参加双十一抢购
事务冲突
事务冲突的问题
例子
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000
解决方案
悲观锁
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。
WATCH key
执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch
取消 WATCH 命令对所有 key 的监视。 如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。 http://doc.redisfans.com/transaction/exec.html
事务三特性(Redis特有原子性)
- 单独的隔离操作
事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。 - 没有隔离级别的概念 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
- 不保证原子性(一般解决原子性方案使用lua脚本即可)
事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
Redis实现秒杀例子
秒杀例子
书写一个秒杀例子
此例子,存在一定的问题,后面会解决。
使用spring boot 和 Jedis,
开启一个controller,路径是/Seckill/doseckill
public class SecKill_redis {
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}
//2 连接redis
Jedis jedis = new Jedis("xxxxx",6379);
//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
//7 秒杀过程
//7.1 库存-1
jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
jedis.sadd(userKey,uid);
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
}
一个商品开展了秒杀活动,如图左侧,该商品此次参与秒杀的数量假定10,右侧最后的结果肯定是10个人秒杀成功。同时,库存的值变成0.
秒杀模拟工具
可以使用ab工具,其中,如果linux有网络的话,安装命令为yum install httpd-tools
;否则,linux没有网络的话,可以用系统自带的工具解包后安装,
- 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)
- 顺序安装
apr-1.4.8-3.el7.x86_64.rpm
apr-util-1.5.2-6.el7.x86_64.rpm
httpd-tools-2.4.6-67.el7.centos.x86_64.rpm
ab测试
ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://127.0.0.1:8080/Seckill/doseckill
测试结果
超卖,库存变成了负数
超卖问题
用上面的例子和ab工具测试,发现了新问题,如图所示、
已经秒杀结束了,后面又出现了秒杀成功,同时查看redis里面,发现库存为负数了。
期望是,秒杀结束后,不允许出现秒杀结束,同时库存为0。
分析问题: 如图,图中,展示,用户每次拿到的初始值都是10个,因此造成了库存减少的错误。
乐观锁解决超卖
因为redis的事务,默认用的是乐观锁,因此,我们不需要在程序中使用version等常见乐观锁方案去实现事务,因此我们 只需要使用redis的watch加multi既能使用redis的乐观锁。 修改程序如下
增加乐观锁jedis.watch(kcKey)
, 然后把流程7的减库存,存秒杀成功用户的操作修改为multi的事务组队形式。
public class SecKill_redis {
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}
//2 连接redis
Jedis jedis = new Jedis("xxxxx",6379);
//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//监视库存
jedis.watch(kcKey);
//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
//7 秒杀过程
//使用事务
Transaction multi = jedis.multi();
//组队操作
multi.decr(kcKey);
multi.sadd(userKey,uid);
//执行
List<Object> results = multi.exec();
if(results == null || results.size()==0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
//7.1 库存-1
//jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
//jedis.sadd(userKey,uid);
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
}
使用ab工具再次测试。截图结果如下。
发现,左边的程序的命令行还使用问题的,但是右边的redis的库存确实变成了0。
那么左边的问题里面出现了2次秒杀失败,那么这就是程序的问题了。其实这是乐观锁的通病,大并发情况下,乐观锁容易造成超时,以至于秒杀失败。
如果ab工具的并发更多的话,那么库存也有可能会有很多剩余。
增大ab工具并发
在程序中把库存改大,同时增大ab的并发数和cpu数。
ab -n 2000 -c 200 -k -p postfile -T 'application/x-www-form-urlencoded' http://127.0.0.1:8080/seckill/doseckill
因为ab默认对出口有限制,防止资源暂用过多问题,因此会有此警告报错,可以添加-r参数,强制使用,移除socket限制。
ab -n 2000 -c 200 -k -r -p postfile -T 'application/x-www-form-urlencoded' http://127.0.0.1:8080/seckill/doseckill
测试结果
已经秒光,查看redis还有库存。
其次就是,在命令行还有超时问题。
使用lua脚本解决遗留库存
已经秒光,可是还有库存。原因,就是乐观锁导致很多请求都失败。先点的没秒到,后点的可能秒到了。
那么如果把一个完整的请求作为一个整体,就能解决问题。这里使用lua脚本。
LUA脚本
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。 很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。 这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。
LUA脚本与Redis
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。
利用lua脚本淘汰用户,解决超卖问题。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
修改代码如下; 主要是把整体的代码逻辑,修改为lua脚本。
public class SecKill_redisByScript {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SecKill_redisByScript.class);
static String secKillScript = "local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1";
static String secKillScript2 = "local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
" return 1";
public static boolean doSecKill(String uid, String prodid) throws IOException {
Jedis jedis = Jedis jedis = new Jedis("127.0.0.1",6379);
String sha1 = jedis.scriptLoad(secKillScript);
Object result = jedis.evalsha(sha1, 2, uid, prodid);
String reString = String.valueOf(result);
if ("0".equals(reString)) {
System.err.println("已抢空!!");
} else if ("1".equals(reString)) {
System.out.println("抢购成功!!!!");
} else if ("2".equals(reString)) {
System.err.println("该用户已抢过!!");
} else {
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}
使用连接池解决超时
每次请求都创建一个redis链接实例后,这样子容易造成redis大量实例创建,资源不够。如果能够反复使用redis的实例,那么就可以反复循环使用redis实例,节约资源。
在redis中,常用如下的链接池参数
MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。
maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;
testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;
使用代码封装连接池
使用的单例模式,主要是这行代码jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379, 60000 );
public class JedisPoolUtil {
private static volatile JedisPool jedisPool = null;
private JedisPoolUtil() {
}
public static JedisPool getJedisPoolInstance() {
if (null == jedisPool) {
synchronized (JedisPoolUtil.class) {
if (null == jedisPool) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(100*1000);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true); // ping PONG
jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379, 60000 );
}
}
}
return jedisPool;
}
public static void release(JedisPool jedisPool, Jedis jedis) {
if (null != jedis) {
jedisPool.returnResource(jedis);
}
}
}
连接池使用
在前面的lua代码的基础上,把redis的初始化,改为使用这个连接处的类即可
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedispool.getResource();
完整代码如下
public class SecKill_redisByScript {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SecKill_redisByScript.class);
public static void main(String[] args) {
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedispool.getResource();
System.out.println(jedis.ping());
Set<HostAndPort> set = new HashSet<HostAndPort>();
// doSecKill("201","sk:0101");
}
static String secKillScript = "local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1";
static String secKillScript2 = "local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
" return 1";
public static boolean doSecKill(String uid, String prodid) throws IOException {
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedispool.getResource();
// String sha1= .secKillScript;
String sha1 = jedis.scriptLoad(secKillScript);
Object result = jedis.evalsha(sha1, 2, uid, prodid);
String reString = String.valueOf(result);
if ("0".equals(reString)) {
System.err.println("已抢空!!");
} else if ("1".equals(reString)) {
System.out.println("抢购成功!!!!");
} else if ("2".equals(reString)) {
System.err.println("该用户已抢过!!");
} else {
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}
总结
- 最简单的版本
使用程序一步一步的思维写代码,请求一步,减去一次库存。
问题是使用工具ab模拟并发测试,会出现超卖情况。查看库存会出现负数。
- 使用redis的事务乐观锁
添加了 事务乐观锁,在并发少的情况下,一切正常,但是如果并发超级多,那么会出现两个问题,一种是redis实例连接超时,另一种是库存遗留问题。
- 库存遗留
因为乐观锁会控制每次请求的时候版本对比,会造成多次一样的版本乐观锁处理,容易造成部分人请求失败。先点的没有秒杀到库存,后点的秒杀到库存了。
这里使用lua脚本,把多个操作控制为一个操作,利用redis的单线程特性,每次只执行一个lua脚本,这样子就完成了每次处理一个用户的请求。
local userid=KEYS[1];
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr';
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then
return 0;
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
return 1;
- 超时问题
因此每次创建一个redis实例,过分的依赖服务器资源了,所以说可以使用redis的连接池反复的使用资源,一定程度上节约资源。
解决就是创建一个单例模式的redis连接池,可以是懒汉式也可以是饿汉式,不过懒汉式有多线程问题,如果要使用懒汉式单例模式的话,需要对线程锁要有一定的认识。