线程池作用就是限制系统中执行线程的数量。
根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排 队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。
我们先来了解一下Executors 和 ExecutorService
下表列出了 Executor 和 ExecutorService 的区别:
Executor | ExecutorService |
---|---|
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务 | ExecutorService 是 Executor 接口的扩展,提供了异步执行和关闭线程池的方法 |
提供execute()方法用来提交任务 | 提供submit()方法用来提交任务 |
execute()方法无返回值 | submit()方法返回Future对象,可用来获取任务执行结果 |
不能取消任务 | 可以通过Future.cancel()取消pending中的任务 |
没有提供和关闭线程池有关的方法 | 提供了关闭线程池的方法 |
简单来说ExecutorService 继承了 Executors 多了些东西 那么我们在下面创建线程池时候使用的对象记得是 ExecutorService
ExecutorService对象内常用的方法
void execute(Runnable task); 提交线程到线程池无返回值 (最常用)
executorService.submit() 提交线程到线程池有返回值
注意提交线程到线程池后会自动运行线程了
void shutdown(); 关闭线程池 在关闭前创建的线程会继续执行 而关闭后创建的线程将不会执行将会报错 (一般不建议使用)
还有其他的方法 比如 shutdownNow(); 关闭线程池不管是否有线程还未执行完成
注意的是shutdownNow这个方法底层会将所有线程的中断标识符设置为中断状态(true) 可以自己看源码
…自己看ExecutorService 类里的源码
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
通过Executors提供四种线程池,分别为:
public static ExecutorService newFiexedThreadPool(int Threads)
创建一个固定长度的线程池,超出的线程会在队列中等待
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static ExecutorService newCachedThreadPool():
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
//添加一个线程
int finalI = i;
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(finalI);
}
});
}
//Executors.newSingleThreadExecutor() //
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
简单来说就是保证所有使用SingleThreadExecutor创建的线程的执行顺序
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
结果依次输出,相当于顺序执行各个任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
延迟执行示例代码如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS); //延迟3秒后执行
定期执行示例代码如下:
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS); //表示延迟1秒后每3秒执行一次。 无限执行下去
}
我们之所以要手动创建线程池,是因为 JDK 自带的工具类所创建的线程池存在一定的弊端
/** * 各参数含义 * corePoolSize : 线程池中常驻的线程数量。核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会 * 受存活时间 keepAliveTime 的限制,除非将 allowCoreThreadTimeOut 设置为 true。 * maximumPoolSize : 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的 * LinkedBlockingQueue时,这个值无效。 * keepAliveTime : 当线程数量多于 corePoolSize 时,空闲线程的存活时长,超过这个时间就会被回收 * unit : keepAliveTime 的时间单位 * workQueue : 存放待处理任务的队列 * threadFactory : 线程工厂 * handler : 拒绝策略,拒绝无法接收添加的任务 */
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... ... }
使用 JDK 自带的 Executors工具类 类似于 Collections 和 Arrays) 可以直接创建以上面几种线程池:
JDK 自带工具类创建的线程池存在的问题
有的线程池可以无限添加任务或线程,容易导致 OOM;就拿我们最常用FixedThreadPool和 CachedThreadPool来说,
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
可见其任务队列用的是LinkedBlockingQueue,且没有指定容量,相当于无界队列,这种情况下就可以添加大量的任务,甚至达到Integer.MAX_VALUE的数量级,如果任务大量堆积,可能会导致 OOM。
…
还有一个问题就是这些线程池的线程都是使用 JDK 自带的线程工厂 (ThreadFactory)创建的,线程名称都是类似pool-1-thread-1的形式,第一个数字是线程池编号,第二个数字是线程编号,这样很不利于系统异常时排查问题。
如果你安装了“阿里编码规约”的插件,在使用Executors创建线程池时会出现以下爆红警告信息:
为避免这些问题,我们最好还是手动创建线程池。
首先要说明一点,定制线程池的线程数并不是多么高深的学问,也不是说一旦线程数设定不合理,你的程序就无法运行,而是要尽量避免以下两种极端条件:
所以在实际开发中我们需要根据实际的业务场景合理设定线程池的线程数量,那又如何分析业务场景呢?我们的业务场景大致可以分为以下两大类:
这样为了让 CPU 达到期望的使用率,最优的线程数量计算公式如下:
Nthreads = Ncpu Ucpu ( 1 + W / C )
CPU 核心数可以通过以下方法获取:
int N_CPUS = Runtime.getRuntime().availableProcessors();
当然,除了 CPU,线程数量还会受到很多其他因素的影响,比如内存和数据库连接等,需要具体问题具体分析。
使用可自定义线程名称的线程工厂
这个就简单多了,可以借助大名鼎鼎的谷歌开源工具库 Guava,首先引入如下依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
然后我就可以使用其提供的ThreadFactoryBuilder类来创建线程工厂了,Demo 如下:
public class ThreadPoolDemo {
// 线程数
public static final int THREAD_POOL_SIZE = 16;
public static void main(String[] args) throws InterruptedException {
// 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("hyn-demo-pool-%d").build();
// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
THREAD_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 新建 1000 个任务,每个任务是打印当前线程名称
for (int i = 0; i < 1000; i++) {
executor.execute(() -> System.out.println(Thread.currentThread().getName()));
}
// 优雅关闭线程池
executor.shutdown();
executor.awaitTermination(1000L, TimeUnit.SECONDS);
// 任务执行完毕后打印"Done"
System.out.println("Done");
}
}
package cn.mq.rabbitmqconfig;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.*;
/** * @author HuAnmin * @version 1.0 * @email 3426154361@qq.com * @date 2021/4/15-3:41 * @description 类描述.... */
@Component
@Getter
@Setter
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConnection {
// 线程数/连接池数量
public static final int THREAD_POOL_SIZE = 100;
ArrayBlockingQueue<Connection> list = new ArrayBlockingQueue<Connection>(THREAD_POOL_SIZE);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("hyn-demo-pool-%d").build();
// 创建线程池,其中任务队列需要结合实际情况设置合理的容量
ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
THREAD_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(Math.round(THREAD_POOL_SIZE/2)),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
public String host;
private int port;
private String virtualHost;
private String username;
private String password;
@PostConstruct //加上该注解表明该方法会在bean初始化后调用
private void init() throws InterruptedException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址 如果是本机就是localhost 如果在其他 地方比如:虚拟机中 那么就是ip地址
connectionFactory.setHost(host);
//连接端口;默认为 5672
connectionFactory.setPort(port);
//虚拟主机名称 就是和你用户绑定的虚拟机 在创建用户时候就指定了
connectionFactory.setVirtualHost(virtualHost);
//连接用户名
connectionFactory.setUsername(username);
//连接密码
connectionFactory.setPassword(password);
for (int i = 0; i < THREAD_POOL_SIZE - 1; i++) {
executor.execute(() -> {
while (true) {
try {
list.put(connectionFactory.newConnection());
// System.out.println("创建连接到连接池中:"+list.size());
} catch (InterruptedException | TimeoutException | IOException e) {
e.printStackTrace();
}
}
});
}
}
public Connection getNewConnection() throws InterruptedException {
Connection take = list.take();
while (true) {
if (take.isOpen()) {
return take;
} else {
take = list.take();
}
}
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_45203607/article/details/120236969
内容来源于网络,如有侵权,请联系作者删除!