在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。那具体什么是分布式锁,分布式锁应用在哪些业务场景、如何来实现分布式锁呢?
我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,并且可以完美的运行,毫无Bug!
注意这是单机应用,后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:
上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!
如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。
基于数据库实现分布式锁;
基于缓存(Redis等)实现分布式锁;
基于Zookeeper实现分布式锁;
基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
(1)创建一个表:
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
(2)想要执行某个方法,就使用这个方法名向表中插入数据:
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
因为我们对method_name
做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!
使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:
1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,
解决方案: ,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,
解决方案: ,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;(数据库的定时任务,或者代码中的都行)
4、不具备阻塞锁特性,获取不到锁直接返回失败,
解决办法: 看门狗机制: 循环(自旋)多次去获取-直到获取成功。
5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。
1、选用Redis实现分布式锁原因:
(1)Redis有很高的性能;
(2)Redis命令对此支持较好,实现起来比较方便
2、使用命令介绍:
(1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
(2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
(3)delete
delete key:删除key
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
3、实现思想:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
可以参考SpringBoot-AOP里的案例(但是这种方式有缺陷,只能redis单机模式下使用)
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
(1)创建一个目录mylock;
(2)线程A想获取锁就在mylock目录下创建临时顺序节点;
(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。
缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。
上面的三种实现方式,没有在所有场合都是完美的,所以,应根据不同的应用场景选择最适合的实现方式。
在分布式环境中,对资源进行上锁有时候是很重要的,比如抢购某一资源,这时候使用分布式锁就可以很好地控制资源。
当然,在具体使用中,还需要考虑很多因素,比如超时时间的选取,获取锁时间的选取对并发量都有很大的影响,上述实现的分布式锁也只是一种简单的实现,主要是一种思想,但是如果真的需要使用分布式锁的话,这里推荐使用Redisson的方式来实现,
好的,接下来就通过一张手绘图,给大家说说Redisson这个开源框架对Redis分布式锁的实现原理。
原理可参考 https://www.cnblogs.com/AnXinliang/p/10019389.html
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
可以理解为主节点掉了,子节点自动补充为主节点
Config config = new Config();
config.useSentinelServers().addSentinelAddress(
"redis://172.29.3.245:26378","redis://172.29.3.245:26379", "redis://172.29.3.245:26380")
.setMasterName("mymaster")
.setPassword("a123456");
可以理解为redis 的读写分离,但是主节点挂掉了那么, 子节点是不能向哨兵模式一样自动升级为主节点的
Config config = new Config();
config.useMasterSlaveServers()
//可以用"rediss://"来启用SSL连接
.setMasterAddress("redis://192.168.81.145:6379")//主节点
//从节点
.addSlaveAddress("redis://192.168.81.146:6379")
.addSlaveAddress("redis://192.168.81.148:6379")
.setPassword("123456");
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
.addNodeAddress("redis://127.0.0.1:7000")
.addNodeAddress("redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002")
.setPassword("123456");
红锁模式的节点前提必须都是主节点,或者都是单机Redis
解决: 哨兵/主从/集群 , 这些模式,出现的问题
有时候程序就是这么巧,比如说正好一个节点挂掉的时候,多个客户端同时取到了锁。如果你可以接受这种小概率错误,那用之前的复制方案就完全没有问题。否则的话,我们建议你实现下面描述的解决方案。
假设有5个redis节点,这些节点之间既没有主从,也没有集群关系。客户端用相同的key和随机值在5个节点上请求锁,请求锁的超时时间应小于锁自动释放时间。当在3个(超过半数)redis上请求到锁的时候,才算是真正获取到了锁。如果没有获取到锁,则把部分已锁的redis释放掉。
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.29.1.180:5378")
.setPassword("123456");
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.29.1.180:5379")
.setPassword("123456");
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.29.1.180:5380")
.setPassword("123456");
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
// 同时给3个redis上锁
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
isLock = redLock.tryLock(500, 30000, TimeUnit.MILLISECONDS);
System.out.println("isLock = "+isLock);
if (isLock) {
//TODO if get lock success, do something;
Thread.sleep(30000);
}
} catch (Exception e) {
} finally {
// 无论如何, 最后都要解锁
System.out.println("");
redLock.unlock();
}
到时候自己可以根据情况该代码这里,就把关键核心代码放上去了
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<!--驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.45</version>
</dependency>
<!--Mybati 和Spring boot 自动整合依赖 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
<!-- 分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 实现SpringBoot AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies>
<!-- 自动查找主类 用于打包 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
CREATE TABLE `t_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`userId` varchar(255) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`pass` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
`address` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8;
自己添加一条记录就行
一般适用于: 商品秒杀, 限制一个商品,在抢购的时候,程序未处理完成,被同一个用户重复抢购的情况,或者限制重复接口提交情况
好处就是,加快程序速度,防止多次提交导致的数据错误
server:
port: 10101
spring:
# 开发阶段关闭thymeleaf的模板缓存 否则每次修改都需要重启引导
thymeleaf:
cache: false
datasource:
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/voidme?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&serverTimezone=UTC&useSSL=false
username: root
password: root
hikari.idle-timeout: 60000
hikari.maximum-pool-size: 30
hikari.minimum-idle: 10
# redis配置
redis:
host: 192.168.81.144
port: 6379
password: 123456
#spring集成Mybatis环境
#实体类别名扫描包 如果使用接口的话这个可以省略
# 加载Mybatis映射文件 classpath:mapper/*Mapper.xml 代表是 resources 下mapper包里所有是 ..Mapper.xml结尾的xml配置文件 如果使用接口的话这个可以省略
mybatis:
type-aliases-package: com.aop.pojo
mapper-locations: classpath:mybaitis/*Mapper.xml
configuration:
map-underscore-to-camel-case: true
package com.aop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
package com.redis.acpect;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoRepeatSubmit {
/**
* 设置请求锁定时间 默认3秒 ,如果请求内容过长这个需要调整大
*
* @return
*/
int lockTime() default 3;
String doc() default "重复提交请求,请稍后再试";
}
package com.redis.acpect;
import com.redis.utils.ApiResult;
import com.redis.utils.DistributedRedisLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
@Aspect
@Component
public class RepeatSubmitAspect {
@Autowired
private DistributedRedisLock redisLock;
@Pointcut("@annotation(noRepeatSubmit)")
public void pointCut(NoRepeatSubmit noRepeatSubmit) {
}
public static HttpServletRequest getRequest() {
ServletRequestAttributes ra= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
return ra.getRequest();
}
@Around("pointCut(noRepeatSubmit)")
public Object around(ProceedingJoinPoint pjp, NoRepeatSubmit noRepeatSubmit) throws Throwable {
int lockSeconds = noRepeatSubmit.lockTime();
String doc = noRepeatSubmit.doc();
HttpServletRequest request = getRequest();
Assert.notNull(request, "request can not null");
// 此处可以用token或者JSessionId
// String token = request.getHeader("Authorization");
String token = "admin-小明";
String path = request.getServletPath();
String key = getKey(token, path);
//加锁
boolean isSuccess = redisLock.acquire(key, lockSeconds);
if (isSuccess) {
// 获取锁成功
Object result;
try {
// 执行进程
result = pjp.proceed();
} finally {
// 解锁
redisLock.release(key);
}
return result;
} else {
// 获取锁失败,认为是重复提交的请求
return new ApiResult(200, doc, null);
}
}
private String getKey(String token, String path) {
return token + path;
}
}
package com.redis.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
package com.redis.utils;
public class ApiResult {
private Integer code;
private String message;
private Object data;
public ApiResult(Integer code, String message, Object data) {
this.code = code;
this.message = message;
this.data = data;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message == null ? null : message.trim();
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
@Override
public String toString() {
return "ApiResult{" +
"code=" + code +
", message='" + message + '\'' +
", data=" + data +
'}';
}
}
package com.redis.utils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedRedisLock {
//从配置类中获取redisson对象
@Autowired
private RedissonClient redissonClient;
private final String LOCK_TITLE = "redisLock_";
//加锁 , 线程加锁的时候发现,锁有人用了 ,那么就会进入自旋等待
public boolean acquire(String lockKey, long seconds){
//获取锁对象
RLock mylock = redissonClient.getLock((LOCK_TITLE+lockKey));
try {
boolean b = mylock.tryLock(0, seconds, TimeUnit.SECONDS);
if (b) {
System.out.println("获取锁成功");
return true;
} else {
System.out.println("重复提交");
return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
//锁的释放
public void release(String lockKey){
//获取所对象
RLock mylock = redissonClient.getLock((LOCK_TITLE+lockKey));
// 这里判断下当前key是否上锁,不然业务执行时间大于锁自动释放时间后,解锁报异常
if(mylock.isLocked()){ // 是否还是锁定状态
if(mylock.isHeldByCurrentThread()){ // 时候是当前执行线程的锁
mylock.unlock(); // 释放锁
System.out.println("解锁:"+lockKey);
}
}
}
}
package com.redis.controller;
import com.redis.acpect.NoRepeatSubmit;
import com.redis.service.UserService;
import com.redis.utils.ApiResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author huanmin
*/
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
@Autowired
private UserService userService;
//http://127.0.0.1:10102/user/updateAgeAsyncLock
/***
* 加锁
* @return List<User>
*/
@NoRepeatSubmit(lockTime = 15)
@GetMapping(value = "/updateAgeAsyncLock" )
public ApiResult updateAgeAsyncLock() {
return userService.updateAgeAsyncLock();
}
//http://127.0.0.1:10102/user/updateAgeAsyncNotLock
/***
* 不加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncNotLock" )
public void updateAgeAsyncNotLock(){
userService.updateAgeAsyncNotLock();
}
}
package com.redis.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
/**
* @Author huanmin
*/
@Mapper
public interface UserDao {
@Update("UPDATE t_user SET age=#{add} WHERE id=#{id}")
Integer upDateAge(int add,int id);
@Select("SELECT age FROM t_user WHERE id=#{id}")
Integer getdateAge(int id);
}
package com.redis.service;
import com.redis.utils.ApiResult;
/**
* @Author huanmin
*/
public interface UserService {
ApiResult updateAgeAsyncNotLock();
ApiResult updateAgeAsyncLock();
}
package com.redis.service.impl;
import com.redis.dao.UserDao;
import com.redis.service.UserService;
import com.redis.utils.ApiResult;
import com.redis.utils.DistributedRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Autowired
private DistributedRedisLock distributedRedisLock;
//不加锁
@Override
public ApiResult updateAgeAsyncNotLock() {
updateAge();
return new ApiResult(200,"ok","true");
}
//加锁
@Override
public ApiResult updateAgeAsyncLock() {
updateAge();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ApiResult(200,"ok","true");
}
public void updateAge(){
// 实际开发中 我们可以直接这样 update t_user set age=age+1 WHERE id=4
//下面是为了测试,才这样写的
int i = userDao.getdateAge(4) + 1;
userDao.upDateAge(i, 4);
}
}
一个单服务可以直接使用 synchronized 或者lock 这些同步锁来实现但是,如果是多个服务呢?
变量存在数据库中A和B同时去修改数据库的age就会发送数据错误的情况,因为现在微服务的流行
一个服务会因为负载均衡来搭建多个,这样就肯定会出现上面这样的问题的.
注意: 悲观锁尽量少用,大量并发情况下速度会很慢的,因为同一时间只能有一个执行成功的线程其他线程都会进入自旋
package com.aop.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
//线程池设置
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(8);//核心线程 和当前cpu的线程数量保持一致
taskExecutor.setMaxPoolSize( 10000 );//最大线程
taskExecutor.setQueueCapacity( 2000 );//队列大小
taskExecutor.setThreadNamePrefix( "async-service-" );
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
//直接返回 祖籍 AsyncUncaughtExceptionHandler 对象
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}
package com.aop.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
可以自由控制,加锁和关锁
package com.aop.utils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedRedisLock {
//从配置类中获取redisson对象
@Autowired
private RedissonClient redissonClient;
private final String LOCK_TITLE = "redisLock_";
//加锁 , 线程加锁的时候发现,锁有人用了 ,那么就会进入自旋等待
public boolean acquire(String lockName){
//声明key对象
String key = LOCK_TITLE + lockName;
//获取锁对象
RLock mylock = redissonClient.getLock(key);
//加锁,并且设置锁过期时间,防止死锁的产生
mylock.lock(3, TimeUnit.SECONDS);//表示3秒后过期
System.err.println("======lock======"+Thread.currentThread().getName());
//加锁成功
return true;
}
//锁的释放
public void release(String lockName){
//必须是和加锁时的同一个key
String key = LOCK_TITLE + lockName;
//获取所对象
RLock mylock = redissonClient.getLock(key);
// 这里判断下当前key是否上锁,不然业务执行时间大于锁自动释放时间后,解锁报异常
if(mylock.isLocked()){ // 是否还是锁定状态
if(mylock.isHeldByCurrentThread()){ // 时候是当前执行线程的锁
mylock.unlock(); // 释放锁
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
}
}
package com.aop.service;
/**
* @Author huanmin
*/
public interface UserService {
void updateAgeAsyncNotLock();
void updateAgeAsyncLock() ;
}
package com.aop.service.impl;
import com.aop.dao.UserDao;
import com.aop.service.UserService;
import com.aop.utils.DistributedRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Autowired
private DistributedRedisLock distributedRedisLock;
//不加锁
@Override
@Async //异步
public void updateAgeAsyncNotLock() {
updateAge();
}
//加锁
@Override
@Async//异步
public void updateAgeAsyncLock() {
String test = "test";
distributedRedisLock.acquire(test);
updateAge();
distributedRedisLock.release(test);
}
public void updateAge(){
// 实际开发中 我们可以直接这样 update t_user set age=age+1 WHERE id=4
//下面是为了测试,才这样写的
int i = userDao.getdateAge(4) + 1;
userDao.upDateAge(i, 4);
}
}
package com.aop.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Mapper
public interface UserDao {
@Update("UPDATE t_user SET age=#{add} WHERE id=#{id}")
Integer upDateAge(int add,int id);
@Select("SELECT age FROM t_user WHERE id=#{id}")
Integer getdateAge(int id);
}
package com.aop.controller;
import com.aop.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author huanmin
*/
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
@Autowired
private UserService userService;
//http://127.0.0.1:10101/user/updateAgeAsyncLock
/***
* 加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncLock" )
public void updateAgeAsyncLock() throws Exception {
for (int i = 0; i < 10000; i++) {
userService.updateAgeAsyncLock();
}
}
//http://127.0.0.1:10101/user/updateAgeAsyncNotLock
/***
* 不加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncNotLock" )
public void updateAgeAsyncNotLock(){
for (int i = 0; i < 10000; i++) {
userService.updateAgeAsyncNotLock();
}
}
}
package com.aop.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
//线程池设置
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(8);//核心线程 和当前cpu的线程数量保持一致
taskExecutor.setMaxPoolSize( 10000 );//最大线程
taskExecutor.setQueueCapacity( 2000 );//队列大小
taskExecutor.setThreadNamePrefix( "async-service-" );
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
//直接返回 祖籍 AsyncUncaughtExceptionHandler 对象
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}
package com.aop.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
package com.aop.utils;
public interface LockBusiness<T> {
T Business();
String setName();
}
package com.aop.utils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedRedisLock {
//从配置类中获取redisson对象
@Autowired
private RedissonClient redissonClient;
private final String LOCK_TITLE = "redisLock_";
//加锁 , 线程加锁的时候发现,锁有人用了 ,那么就会进入自旋等待
public <T> T acquire(LockBusiness<T> lockBusiness){
//声明key对象
String key = LOCK_TITLE + lockBusiness.setName();
//获取锁对象
RLock mylock = redissonClient.getLock(key);
//加锁,并且设置锁过期时间,防止死锁的产生
mylock.lock(3, TimeUnit.SECONDS);//3秒过期
System.err.println("======lock======"+Thread.currentThread().getName());
try {
return lockBusiness.Business();
} catch (Exception e) {
e.printStackTrace();
} finally {
mylock.unlock(); // 释放锁
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
return null;
}
}
package com.aop.service;
/**
* @Author huanmin
*/
public interface UserService {
void updateAgeAsyncNotLock();
void updateAgeAsyncLock() ;
}
package com.aop.service.impl;
import com.aop.dao.UserDao;
import com.aop.service.UserService;
import com.aop.utils.DistributedRedisLock;
import com.aop.utils.LockBusiness;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Autowired
private DistributedRedisLock distributedRedisLock;
//不加锁
@Override
@Async //异步
public void updateAgeAsyncNotLock() {
updateAge();
}
//加锁
@Override
@Async//异步
public void updateAgeAsyncLock() {
Object acquire = distributedRedisLock.acquire(new LockBusiness<Object>() {
@Override
public Object Business() {
updateAge();
return true;
}
@Override
public String setName() {
return "test";
}
});
System.out.println(acquire);
}
public void updateAge(){
// 实际开发中 我们可以直接这样 update t_user set age=age+1 WHERE id=4
//下面是为了测试,才这样写的
int i = userDao.getdateAge(4) + 1;
userDao.upDateAge(i, 4);
}
}
package com.aop.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Mapper
public interface UserDao {
@Update("UPDATE t_user SET age=#{add} WHERE id=#{id}")
Integer upDateAge(int add,int id);
@Select("SELECT age FROM t_user WHERE id=#{id}")
Integer getdateAge(int id);
}
package com.aop.controller;
import com.aop.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author huanmin
*/
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
@Autowired
private UserService userService;
//http://127.0.0.1:10101/user/updateAgeAsyncLock
/***
* 加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncLock" )
public void updateAgeAsyncLock() throws Exception {
for (int i = 0; i < 10000; i++) {
userService.updateAgeAsyncLock();
}
}
//http://127.0.0.1:10101/user/updateAgeAsyncNotLock
/***
* 不加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncNotLock" )
public void updateAgeAsyncNotLock(){
for (int i = 0; i < 10000; i++) {
userService.updateAgeAsyncNotLock();
}
}
}
这种方式一般用于Controller接口的方式
package com.aop.acpect;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoRepeatSubmit {
/**
* 设置请求锁定时间 默认3秒,如果并发多的话这个需要调大
*
* @return
*/
int lockTime() default 3;
}
package com.aop.acpect;
import com.aop.acpect.NoRepeatSubmit;
import com.aop.utils.DistributedRedisLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
@Aspect
@Component
public class RepeatSubmitAspect {
@Autowired
private DistributedRedisLock redisLock;
@Pointcut("@annotation(noRepeatSubmit)")
public void pointCut(NoRepeatSubmit noRepeatSubmit) {
}
public static HttpServletRequest getRequest() {
ServletRequestAttributes ra= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
return ra.getRequest();
}
@Around("pointCut(noRepeatSubmit)")
public Object around(ProceedingJoinPoint pjp, NoRepeatSubmit noRepeatSubmit) throws Throwable {
int lockSeconds = noRepeatSubmit.lockTime();
HttpServletRequest request = getRequest();
Assert.notNull(request, "request can not null");
String path = request.getServletPath();//多个服务-此接口同一时间只能有一个人访问
System.out.println(path);
//加锁
redisLock.acquire(path, lockSeconds);
// 获取锁成功
Object result;
try {
// 执行进程
result = pjp.proceed();
return result;
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
redisLock.release(path);
}
return null;
}
}
package com.aop.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
//线程池设置
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(8);//核心线程 和当前cpu的线程数量保持一致
taskExecutor.setMaxPoolSize( 10000 );//最大线程
taskExecutor.setQueueCapacity( 2000 );//队列大小
taskExecutor.setThreadNamePrefix( "async-service-" );
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
//直接返回 祖籍 AsyncUncaughtExceptionHandler 对象
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}
package com.aop.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
package com.aop.utils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedRedisLock {
//从配置类中获取redisson对象
@Autowired
private RedissonClient redissonClient;
private final String LOCK_TITLE = "redisLock_";
//加锁 , 线程加锁的时候发现,锁有人用了 ,那么就会进入自旋等待
public boolean acquire(String lockName,int lockSeconds){
//声明key对象
String key = LOCK_TITLE + lockName;
//获取锁对象
RLock mylock = redissonClient.getLock(key);
//加锁,并且设置锁过期时间,防止死锁的产生
mylock.lock(lockSeconds, TimeUnit.SECONDS);//表示毫秒后过期
System.err.println("======lock======"+Thread.currentThread().getName());
//加锁成功
return true;
}
//锁的释放
public void release(String lockName){
//必须是和加锁时的同一个key
String key = LOCK_TITLE + lockName;
//获取所对象
RLock mylock = redissonClient.getLock(key);
// 这里判断下当前key是否上锁,不然业务执行时间大于锁自动释放时间后,解锁报异常
if(mylock.isLocked()){ // 是否还是锁定状态
if(mylock.isHeldByCurrentThread()){ // 是当前执行线程的锁
mylock.unlock(); // 释放锁
System.err.println("======unlock======"+Thread.currentThread().getName());
}
}
}
}
package com.aop.service;
/**
* @Author huanmin
*/
public interface UserService {
void updateAgeAsyncNotLock();
void updateAgeAsyncLock() ;
}
package com.aop.service.impl;
import com.aop.dao.UserDao;
import com.aop.service.UserService;
import com.aop.utils.DistributedRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserDao userDao;
@Autowired
private DistributedRedisLock distributedRedisLock;
//不加锁
@Override
@Async //异步
public void updateAgeAsyncNotLock() {
updateAge();
}
//加锁
@Override
public void updateAgeAsyncLock() {
updateAge();
}
public void updateAge(){
// 实际开发中 我们可以直接这样 update t_user set age=age+1 WHERE id=4
//下面是为了测试,才这样写的
int i = userDao.getdateAge(4) + 1;
userDao.upDateAge(i, 4);
}
}
package com.aop.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Service;
/**
* @Author huanmin
*/
@Mapper
public interface UserDao {
@Update("UPDATE t_user SET age=#{add} WHERE id=#{id}")
Integer upDateAge(int add,int id);
@Select("SELECT age FROM t_user WHERE id=#{id}")
Integer getdateAge(int id);
}
package com.aop.controller;
import com.aop.acpect.NoRepeatSubmit;
import com.aop.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author huanmin
*/
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
@Autowired
private UserService userService;
//http://127.0.0.1:10101/user/updateAgeAsyncLock
/***
* 加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncLock" )
@NoRepeatSubmit
public void updateAgeAsyncLock() {
userService.updateAgeAsyncLock();
}
//http://127.0.0.1:10101/user/updateAgeAsyncNotLock
/***
* 不加锁
* @return List<User>
*/
@GetMapping(value = "/updateAgeAsyncNotLock" )
public void updateAgeAsyncNotLock(){
userService.updateAgeAsyncNotLock();
}
}
自行使用jmeter进行测试,反正我测试是没啥问题的, 开了10000个线程的0秒并发,数据库数据稳稳的没啥问题
点赞 -收藏-关注-便于以后复习和收到最新内容有其他问题在评论区讨论-或者私信我-收到会在第一时间回复如有侵权,请私信联系我感谢,配合,希望我的努力对你有帮助^_^
与50位技术专家面对面
20年技术见证,附赠技术全景图
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_45203607/article/details/124096414
内容来源于网络,如有侵权,请联系作者删除!