线程池也是一种池化思想的实现,常见的有 线程池、字符串常量池、数据库连接池 等。
线程池就是一个存放线程的容器,使得每次需要使用线程的时候,直接获取到线程对象,执行任务即可。
先看一下没有线程池时使用线程:
每一次使用到线程,都需要执行一遍以上的步骤,这使得效率很低下。
这就需要考虑到线程池的使用了,线程池的优点如下(池化容器优点):
先看一个简单的例子:
有客户来到银行办理业务,假设有五个柜台,三个等待位置。当来了一个客户,就开启一个柜台办理业务。当某个柜台一定的时间没有用户来办理业务,就关闭此柜台。
当来了8个客户,5个客户在办理,3个客户在等待区等待,而这时又来了一个客户,银行就告知此客户这里没有位置办理他的业务。
以上例子正是线程池的思想:
线程池位于java.util.concurrent
包下。Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
下面就介绍一下ThreadPoolExecutor类线程池,这一个线程池更加的稳定简明。下面是它的一个构造方法(总共有四个,这个最全)。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 下面是参数判断,可以不看
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中:
下面进行简单的使用演示。代码测试(使用JUC工具包)
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(
2,// 线程初始数
4,// 线程池最大容量
10,// 线程空闲存活时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),// 等待队列
Executors.defaultThreadFactory(), // 生成线程的策略
new ThreadPoolExecutor.AbortPolicy()// 异常处理
);
// 在短时间内,for循环所消耗时间不记,默认为同时发起指定的请求
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "===> 办理业务");
});
}
}
}
以上代码,核心线程数为2,最大容量是4,等待队列可容纳1,则此线程池一次性可并发执行5个任务,如果有六个任务需要执行,就会走异常。
输出:
pool-1-thread-1===> 办理业务
pool-1-thread-3===> 办理业务
pool-1-thread-1===> 办理业务
pool-1-thread-2===> 办理业务
pool-1-thread-4===> 办理业务
不看也影响线程池的使用,内部引用二进制运算符,提高了运行效率。
ThreadPoolExecutor类的execute方法(执行任务入口):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取clt,clt记录着线程池状态和运行线程数。
int c = ctl.get();
//运行线程数小于核心线程数时,创建线程放入线程池中,并且运行当前任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//创建线程失败,重新获取clt。
c = ctl.get();
}
//线程池是运行状态并且运行线程大于核心线程数时,把任务放入队列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查线程池不是运行状态时,
//把任务移除队列,并通过拒绝策略对该任务进行处理。
if (! isRunning(recheck) && remove(command))
reject(command);
//当前运行线程数为0时,创建线程加入线程池中。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//运行线程大于核心线程数时并且队列已满时,
//创建线程放入线程池中,并且运行当前任务。
else if (!addWorker(command, false))
reject(command);
}
在execute方法中,多次调用的addWorker方法,再看一下这个方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
//线程池处于关闭状态,或者当前任务为null
//或者队列不为空,则直接返回失败。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//线程数超过线程池最大容量,则返回false;
//这里的core是addWorker方法的第二个参数,
//如果为true则根据核心线程数进行比较,
//如果为false则根据最大线程数进行比较。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//尝试增加线程数,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加线程数失败,则重新获取ctl
c = ctl.get(); // Re-read ctl
//如果当前的运行状态不等于SHUTDOWN,说明状态已被改变,
//返回第一个for循环继续执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
在SpringBoot项目中,可以用Spring提供的对ThreadPoolExecutor
封装的线程池ThreadPoolTaskExecutor
,直接使用注解启用。它的内部也是通过ThreadPoolExecutor实现。
本地环境:
IDEA、JDK1.8+、Maven、Win10,还有网络。
话不多说,【点击我查看如何快速搭建一个SpringBoot项目】
pom.xml导入swagger2、lombok依赖【Maven点击我查询】
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
在application.yml文件中配置参数:
server:
port: 8082
# thread pool
async:
executor:
thread:
core_pool_size: 3
max_pool_size: 5
queue_capacity: 3
name:
prefix: async-service-
让SpringBoot加载此配置类
package com.pdh.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Data
@Slf4j
@Configuration
@EnableAsync // 开启多线程
public class ThreadPoolConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean("asyncTaskExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
编写两个Service,一个是ThreadService,作为异步线程执行服务。另一个是OtherService,是用户具体的服务类
ThreadService
package com.pdh.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ThreadService {
@Async("asyncTaskExecutor") // 在Spring容器中使用 asyncTaskExecutor对象执行以下代码块
public void executeAsync(){
log.info("start executeAsync");
System.out.println("异步线程要做的事情");
try {
// 睡眠10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("可以在这里执行批量插入等耗时的事情");
log.info("end executeAsync");
}
}
OtherService
package com.pdh.service;
import org.springframework.stereotype.Service;
@Service
public class OtherService {
public String test(){
return "我是OtherService的test方法";
}
}
TestController响应前端请求
package com.pdh.controller;
import com.pdh.service.OtherService;
import com.pdh.service.ThreadService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/** * @Author: 彭_德华 * @Date: 2021-10-19 16:00 */
@RestController
public class TestController {
@Autowired
private ThreadService threadService;
@Autowired
private OtherService otherService;
@GetMapping("/async")
public String async(){
String str = otherService.test();
// 如果是异步执行,就会不会等待10s(因为executeAsync方法设置了sleep 10s)
threadService.executeAsync();
return str;
}
}
使用Swagger2进行接口测试,【点击我跳转 快速使用Swagger2】,编写Swagger2配置类
@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
public Docket docket(Environment environment){
// 设置要显示的swagger环境
Profiles profiles = Profiles.of("dev","test");
// 判断是否处在自己设定的环境中
boolean flag = environment.acceptsProfiles(profiles);
return new Docket(DocumentationType.SWAGGER_2)
.groupName("ThreadPool")
.apiInfo(apiInfo2())
.enable(true)
.select()
.apis(RequestHandlerSelectors.basePackage("com.pdh.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("线程池异步执行测试")
.description("测试后端移步进行相应的操作")
.termsOfServiceUrl("https://blog.csdn.net/yeahPeng11")
.contact(new Contact("彭德华","https://blog.csdn.net/yeahPeng11","pdhcool@163.com"))
.version("v1.0")
.build();
}
}
启动项目后,访问 http://localhost:8082/swagger-ui.html
。端口改成application.yml中配置的端口,默认为 8080 端口(我是8082端口)。
swagger2中测试 /async
请求:
2021-10-19 16:58:34.768 INFO 77324 --- [async-service-1] com.pdh.service.ThreadService : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2021-10-19 16:58:44.775 INFO 77324 --- [async-service-1] com.pdh.service.ThreadService : end executeAsync
2021-10-19 17:15:14.768 INFO 77324 --- [async-service-2] com.pdh.service.ThreadService : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2021-10-19 17:15:24.775 INFO 77324 --- [async-service-2] com.pdh.service.ThreadService : end executeAsync
至此,线程池的使用都已经涉及到了。
对于以上所有的功能确实都能实现,那么出现问题了我需要怎么排查?下面提供两种解决思路:
setUncaughtExceptionHandler
方法,自定义异常处理方法。例如:ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("judge-pool-%d")
.setUncaughtExceptionHandler((thread, throwable)-> logger.error("ThreadPool {} got exception", thread,throwable))
.build();
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/yeahPeng11/article/details/120851158
内容来源于网络,如有侵权,请联系作者删除!