ACID是指数据库管理系统(DBMS)在写入或更新资料的过程中,为保证事务(transaction)是正确可靠的,所必须具备的四个特性:原子性(atomicity,或称不可分割性)、一致性(consistency)、隔离性(isolation,又称独立性)、持久性(durability)。
单线程+多路复用
准备
安装
下载redis以及编译安装
1.官网下载redis文件
2.复制到/opt目录下cp /home/gaokaoli/Downloads/redis-6.2.3.tar.gz /opt
3.查看是否安装了gcc编译输入gcc --version
如果没有安装gcc,则输入yum install -y gcc
,或者是apt-get install -y gcc
4.进入目录号
cd /opt/redis-6.2.3
进行make
编译以及make install
启动
拷贝一份redis.conf到其他目录
设置redis.conf的daemonize值为yes
cp /opt/redis-6.2.3/redis.conf /etc/redis.conf
cd /usr/local/bin
redis-server /etc/redis.conf
使用redis-cli
登录服务(正常安装过程的话在/usr/local/bin
下)
1.基本操作
key值的操作:
keys *
查看当前库所有keyset key value
设置key值与valueexists key
判断key是否存在type key
查看key是什么类型del key
删除指定的key数据unlink key
根据value选择非阻塞删除
------仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作。expire key 10
10秒钟:为给定的key设置过期时间ttl key
查看还有多少秒过期,-1表示永不过期,-2表示已过期
库的选择:
select
命令切换数据库dbsize
查看当前数据库的key数量flushdb
清空当前库flushall
通杀全部库
配置文件详解
\#bind 127.0.0.1 -::1 //给本地远程注释,可以远程
protected-mode no //保护模式改为no,可以远程访问
daemonize yes //后台启动为yes
2.变量操作
2.1 string字符串
- 一个key对应一个value
- 二进制安全的,即可包含任何数据
- value最多可以是512m
参数设置:
set key value
设置key值get key
查询key值append key value
将给定的value追加到原值末尾strlen key
获取值的长度setnx key value
只有在key不存在的时候,设置key值incr key
将key值存储的数字增1,只对数字值操作,如果为空,新增值为1decr key
将key值存储的数字减1,只对数字值操作,如果为空,新增值为1decr key
将key值存储的数字减1incrby/decrby key <步长>
将key值存储的数字增减如步长
补充:
原子操作
不会被打断,从开始到结束
单线程不会被打断
多线程很难说,被打断的就不是原子操作
补充额外的字符串参数:
mset key value key value..
同时设置一个或者多个key-valuemget key key...
同时获取一个或多个valuemsetnx key value key value..
同时设置一个或者多个key-value.当且仅当所有给定key都不存在getrange key <起始位置> <结束位置>
获取key的起始位置和结束位置的值setrange key <起始位置> value
将value的值覆盖起始位置开始setex key [time] value
设置键值的同时,设置过期时间(可以用于验证码过期时间)getset key value
用新值换旧值
2.2 list列表
常用命令:
lpush/rpush key value value...
从左或者右插入一个或者多个值(头插与尾插)lpop/rpop key
从左或者右吐出一个或者多个值(值在键在,值都没,键都没)rpoplpush key1 key2
从key1列表右边吐出一个值,插入到key2的左边lrange key start stop
按照索引下标获取元素(从左到右)(0 -1代表取出所有)lrange key 0 -1
获取所有值lindex key index
按照索引下标获得元素llen key
获取列表长度linsert key before/after value newvalue
在value的前面插入一个新值lrem key n value
从左边删除n个value值lset key index value
在列表key中的下标index中修改值value
2.3 set集合
字典,哈希表
自动排重且为无序的
常用命令:
sadd key value value...
将一个或者多个member元素加入集合key中,已经存在的member元素被忽略smembers key
取出该集合的所有值sismember key value
判断该集合key是否含有改值scard key
返回该集合的元素个数srem key value value
删除集合中的某个元素spop key
随机从集合中取出一个元素srandmember key n
随即从该集合中取出n个值,不会从集合中删除smove <一个集合a><一个集合b>value
将一个集合a的某个value移动到另一个集合bsinter key1 key2
返回两个集合的交集元素sunion key1 key2
返回两个集合的并集元素sdiff key1 key2
返回两个集合的差集元素(key1有的,key2没有)
2.4 hash哈希
键值对集合,特别适合用于存储对象类型
常用命令:
hset key field value
给key集合中的filed键赋值value (在java中,field value类似于Map)hget key1 field
集合field取出valuehmset key1 field1 value1 field2 value2
批量设置hash的值hexists key1 field
查看哈希表key中,给定域field是否存在hkeys key
列出该hash集合的所有fieldhvals key
列出该hash集合的所有valuehincrby key field increment
为哈希表key中的域field的值加上增量1 -1hsetnx key field value
将哈希表key中的域field的值设置为value,当且仅当域field不存在
例如 hset user:1000 id 1
2.5 Zset有序集合
(有点像TreeSet)(底层使用跳跃表)
没有重复元素的字符串集合,按照相关的分数进行排名,排名从低到高,排名可重复
有序集合 zset 与普通集合 set 非常相似,是一个没有重复元素的字符串集合
常用命令:
zadd key score1 value1 score2 value2
将一个或多个member元素及其score值加入到有序key中zadd topn 200 java 300 c++
zrange key start stop (withscores)
返回有序集key,下标在start与stop之间的元素,带withscores,可以让分数一起和值返回到结果集。zrangebyscore key min max(withscores)
返回有序集key,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score的值递增次序排列zrevrangebyscore key max min (withscores)
同上,改为从大到小排列zincrby key increment value
为元素的score加上增量zrem key value
删除该集合下,指定值的元素zcount key min max
统计该集合,分数区间内的元素个数zrank key value
返回该值在集合中的排名,从0开始
2.6 bitmaps
1.合理使用操作位可以有效地提高内存使用率和开发使用率
2.本身是一个字符串,不是数据类型,数组的每个单元只能存放0和1,数组的下标在Bitmaps叫做偏移量
3.节省空间,一般存储活跃用户比较多
- setbit key offset value 值设置
- getbit key offset 值提取
- bitcount key [startIndex end]] 数量统计
bitop and(or/not/xor)destkey key1 [key2] ... 对key 进行与(异或非)操作,统计出1的数量
- 复合操作,它可以对多个bitmap做and or not xor,结果保存在destkey,常用来统计xxx用户是否访问
2.7HyperLogLog
用于统计基数,也就是不重复数
- pfadd key element 向key中加入element,成功则返回1,不成功返回0
- pfcount key 查看key中的element数量
- pfmerge destkey sourcekey1 sourcekey2 将和sourcekey1 sourcekey2合并成destkey
2.8 Geographic
提供经纬度设置,查询范围,距离查询等
- geoadd key longitude latitude member 向key中增加名为membe的经纬度,
- geopos key member 取出坐标位置
- geodist key member1 member2 [m km ft mi] 取出两member的直线距离
- georadius key longitude latitude radius [m km ft mi] 取出以longitude和latitude为中心,radius为半径范围内的member
3.应用
利用jedis访问redis
在maven中引入
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.1</version>
</dependency>
</dependencies>
利用jedis对象连接,以及可以执行上诉的变量对应的方法的操作
public class JedisDemo1 {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.242.110", 6379);
System.out.println(jedis.getClient().getPort());
System.out.println("连接本地的Redis服务器成功");
//查看服务是否运行
System.out.println("服务正在运行:" + jedis.ping());
//记得关闭
jedis.close();
}
}
springboot整合
1.直接使用redis类
1.1配置
1、在maven中的dependences标签中配置
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
2、利用properties或者yml配置连接池启动信息
#Redis服务器地址
spring.redis.host=172.22.109.205
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
3、配置Redis设置类
可以当作固定配置用
在classpath:config/RedisConfig.java
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
1.2使用
直接注入RedisTemplate
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis() {
//设置值到redis
redisTemplate.opsForValue().set("name","学习java可看码农研究僧的博客地址,https://blog.csdn.net/weixin_47872288");
//从redis获取值
String name = (String)redisTemplate.opsForValue().get("name");
return name;
}
}
2.使用jedis连接池(建议)
2.1设置工具类配置连接池
public class JedisPoolUtil {
private static volatile JedisPool jedisPool = null;
private JedisPoolUtil() {
}
public static JedisPool getJedisPoolInstance() {
if (null == jedisPool) {
synchronized (JedisPoolUtil.class) {
if (null == jedisPool) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(100*1000);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true); // ping PONG,判断是否还存在
jedisPool = new JedisPool(poolConfig, "114.114.114.114", 6379, 60000 );
}
}
}
return jedisPool;
}
public static void release(JedisPool jedisPool, Jedis jedis) {
if (null != jedis) {
jedisPool.returnResource(jedis);
}
}
}
2.2 类似jdbc连接池,直接使用
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
4.事务
Redis执行命令仍然是单线程顺序执行
- 单独的隔离操作(不会被打断)
- 没有隔离级别(任何事务提交之前都不会被执行)
- 不保证原子性(就算有一条命令执行失败,其他命令也会执行)
主要三个命令为multi、exec和discard,它执行方法为串联多个命令
在multi中如果出现了语法或者编译时就出现的错误,exec会整个执行不成功
如果multi中出现数值或运行时的错误,除了出错的语句外,其他的语句exec会照样执行
悲观锁:(适用于多写数据)
操作之前先加锁,直到本线程执行完成
乐观锁:(适用于多读数据)
利用版本号操作数据,每次操作时检查版本号是否符合,若不正确则重新取数据
乐观锁可以解决库存超卖的问题,但是同时也会出现库存遗留的问题
可以使用悲观锁和增加连接时间解决库存遗留问题,但是会导致大量用户卡死
利用watch实现乐观锁
watch key1 [key2]
当两端都同时监视了key的时候,任何一端改变了key的值,另外一边就会更改失败
WATCH 操作只是 session 级别的,不会影响其他 session,在同一个 session 中,对于同一个 Key ,在事务执行之前最多只能加一个 WATCH,WATCH 监视的 Key 在事务内部被修改后,无论这个事务是否执行成功,这个 Key 上的 WATCH 就会自动取消
关于ab并发模拟
使用apt install apache2-utils或者是
apt install httpd-tools安装
在postfile文件下输入body的内容:
prodid=0101&
&代表连接符
执行:ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://172.22.109.30:8081/Seckill/doseckill
- -n 请求次数
- -c 当前请求次数的并发请求
- -T 设计的类型,可以是post,get
- -p 提交的参数文件,内容是body中的内容
5.持久化
5.1 RDB
在指定的时间间隔内将内存中的数据集快照写入磁盘
具体的备份流程如下:
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能
备份文件默认在/usr/local/bin
目录下生成一个dum.rdb的文件
关于save和bgsave的比较
save
:save时只管保存,其它不管,全部阻塞。手动保存。不建议。bgsave
:Redis会在后台异步进行快照操作, 快照同时还可以响应客户端请求。
可以通过lastsave 命令获取最后一次成功执行快照的时间
优点:
- 适合大规模的数据恢复
- 对数据完整性和一致性要求不高更适合使用
- 节省磁盘空间
- 恢复速度快
缺点:
- Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
- 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
- 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
5.2 AOF
以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件
- redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
默认是不开启AOF,开启RDB
可以在redis.conf
将appendonly no
,改为yes来启动
可以在redis.conf中配置文件名称,默认为 appendonly.aof
AOF文件的保存路径,同RDB的路径一致
重写流程:
(1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。
(2)主进程fork出子进程执行重写操作,保证主进程不会阻塞。
(3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。
(4)1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
(5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写
优点:
- 备份机制更稳健,丢失数据概率更低
- 可读的日志文本,通过操作AOF稳健,可以处理误操作
缺点:
- 比起RDB占用更多的磁盘空间。
- 恢复备份速度要慢。
- 每次读写都同步的话,有一定的性能压力。
6.主从复制
主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主
Slave服务器配置参考:slave6379.conf
include /myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
使用redis-server <配置文件>
启动后,使用redis-cli <端口号>
连接
slaveof <ip><port>
设定为
使用info replication
可以查看当前redis信息
原理是设为从服务器时,从服务器会请求主服务器发送rdp文件来同步自己的数据库
其他时候都是主服务器发起的同步请求
在主服务器down时,可以使用salveof no one
使从服务器 能自动变成另一个主服务器,并自动将其他的从服务器收入靡下
哨兵模式
当监控主机宕机之后,从机可以立马变为主机,就和上面的反客为主一样,不用手动设置
能够后台监控主机是否故障,
如果主机此时重新上线,也会自动成为新主机的从机。
从机们依靠以下规则成为主机:
- 优先级在redis.conf中默认:slave-priority 100,值越小优先级越高
- 偏移量是指获得原主机数据最全的,也就是数据越多,变主机的机会越大
- 每个redis实例启动后都会随机生成一个40位的runid
使用
配置sentinel.conf
文件
sentinel monitor <哨兵名字> <ip> <port> <可以成为主机的从机数>
哨兵会检测主服务器,当主服务器down时自动将从服务器上位
在Java中使用jedisSentinelPool
创建对象即可
private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("192.168.11.103:26379");
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource(); //mymaster为哨兵名
}else{
return jedisSentinelPool.getResource();
}
}
7. 集群
redis6.0提供了,无中心化集群配置,即任何一个服务器都能作为代理服务器访问其他的服务器
无中心代表了数据会被分布式存储
概念
插槽
redis分布式存储中有16384 个插槽(hash slot),集群服务器们均分这些插槽,存储数据时,通过CRC16(key) % 16384 来计算键 key 属于哪个槽
组
在集群中不能直接使用mset
这类命令来同时添加多个数据,因为集群无法计算出插槽位置(slot),必须利用分组
mset <k1>{<groupName1>} <v1> <k2>{<groupName2>} <v2>
j集群会利用组来计算slot值
故障恢复
如果某个主机宕机了,从机上位变主机,之前那个主机上线之后,就会变成从机
那如果主从都宕机了,也就是负责该服务的主从都宕机了
就看具体的配置
cluster-require-full-coverage
- 为yes ,那么 ,整个集群都挂掉
- 为no ,那么,该插槽数据全都不能使用,也无法存储。
使用:
配置文件参考
include /myredis/redis.conf
pidfile "/var/run/redis_6391.pid"
port 6391
dbfilename "dump6391.rdb"
cluster-enabled yes
cluster-config-file nodes-6391.conf
cluster-node-timeout 15000
全部服务器开启,确认正常生成了cluster-config-file
的配置文件后
使用redis-cli --cluster create --cluster-replicas 1 <ip1>:<port1> <ip2>:<port2>...
将集群合成
ps 这里的redis-cli
启用集群功能需要有ruby环境,所以上面的命令可以切到redis源码目录下的src文件下执行,因为在redis6.0后集成了redis-trib.rb
这个环境
-replicas 1
代表简单方式启动,1台主机配1台从机
利用集群连接模式-c
任选一个服务器连接即可
redis-cli -c -p 6379
在redis中,使用cluster nodes
可以查看集群信息
在java中使用JedisCluster
类实现
public class JedisClusterTest {
public static void main(String[] args) {
HostAndPort hostAndPort = new HostAndPort("192.168.242.110", 6381);
JedisCluster jedisCluster = new JedisCluster(hostAndPort);
jedisCluster.set("k5","v5");
String k5 = jedisCluster.get("k5");
System.out.println(k5);
jedisCluster.close()
}
}
应用问题
1.缓存穿透
由于当缓存无法命中时会去访问数据库,若是过多请求不命中就会导致过多请求直接访问数据库,导致数据库崩溃
解决方案:
(1)对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
(2)设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3)采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
(4)进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务
2.缓存击穿
当redis某一个key过期时,刚刚好有大量的请求访问这个key,但是由于redis中这个key已经过期,这些请求就会都发送给数据库,有可能造成数据库崩溃
解决方法
(1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长
(3)使用锁:先判断值是否为空再让他进来与否
3.缓存雪崩
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key正常访问
解决方案:
(1)构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)
(2)使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
(3)设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
(4)将缓存失效时间分散开:
比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
分布式锁
指的是一把锁可以锁整个集群里的某个key
解决方案如下:
- 基于数据库redis实现分布式锁
- 基于缓存(Redis等)
- 基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
- 性能:redis最高
- 可靠性:zookeeper最高
基于数据库redis实现分布式锁
- 使用setnx上锁,通过del释放
- 给key设置上过期时间,到时间自动释放
- 使用结合语句保证原子性
set <key> <value> nx ex <time>
nx ex
代表上锁 和过期时间
- 设置UUID解决锁误释放的问题
java中利用setIfAbsent
方法实现:
@GetMapping("testLock")
public void testLock(){
// 设置UUID
String uuid = UUID.randomUUID().toString();
//1获取锁,setne ,以及设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4 判断UUID并释放锁,del
String lockUuid = (String)redisTemplate.opsForValue().get("lock");
if(uuid.equals(lockUuid)){
redisTemplate.delete("lock");
}
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
番外:lua脚本保证原子性
redis支持通过提交lua脚本来保证每次执行的原子性,所以我们可以利用lua脚本来给值判断锁和解锁
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
锁的小结
互斥性。在任意时刻,只有一个客户端能持有锁。
setIfAbsent
的实现
不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- redis
expire
的体现
- redis
解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 必须是原来线程解锁
加锁和解锁必须具有原子性
- lua脚本的原子性可以实现
杂项:
vim操作
- :/<字符> 可以搜索文档
- :%s/<字符1>/<字符2> 可以将字符1全部替换为字符2
1 条评论
感谢分享