线程池
1815字约6分钟
2024-08-08
池化技术
程序运行的本质:占用系统的资源
优化资源的使用 => 池化技术(线程池、内存池、对象池)
池化技术:提前准备好一些资源,用的时候直接取,用完直接归还
线程池好处
降低资源的消耗
提高响应速度
线程复用,可以控制最大并发数,管理线程
线程池
alibaba
java
开发规约
【强制】线程池不允许使用 Executors
去创建,而是通过 ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
说明:Executors
返回的线程池对象的弊端如下:
FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致OOM
CachedThreadPool
和ScheduledThreadPool
:允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM
三大方法
newSingleThreadExecutor
:单个线程的线程池newFixedThreadPool
:固定线程数量的线程池newCachedThreadPool
:可缓存线程池,线程数量大小不限制
public static void main(String[] args) {
// 单个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 5; i++) {
// 使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " SingleThread");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,关闭线程池
threadPool.shutdown();
}
}
// pool-1-thread-1 SingleThread
// pool-1-thread-1 SingleThread
// pool-1-thread-1 SingleThread
// pool-1-thread-1 SingleThread
// pool-1-thread-1 SingleThread
public static void main(String[] args) {
// 固定线程数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
for (int i = 0; i < 5; i++) {
// 使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " FixedThread");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,关闭线程池
threadPool.shutdown();
}
}
// pool-1-thread-1 SingleThread
// pool-1-thread-2 SingleThread
// pool-1-thread-1 SingleThread
// pool-1-thread-2 SingleThread
// pool-1-thread-1 SingleThread
public static void main(String[] args) {
// 可缓存线程池,线程数量大小不限制
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 5; i++) {
// 使用线程池来创建线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " CachedThread");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,关闭线程池
threadPool.shutdown();
}
}
// pool-1-thread-1 SingleThread
// pool-1-thread-3 SingleThread
// pool-1-thread-2 SingleThread
// pool-1-thread-4 SingleThread
// pool-1-thread-5 SingleThread
七大参数
int corePoolSize
:核心线程池大小int maximumPoolSize
:线程池最大线程数量long keepAliveTime
:空闲线程存活时间TimeUnit unit
:存活时间单位BlockingQueue<Runnable> workQueue
:工作队列ArrayBlockingQueue
:基于数组的有界阻塞队列,按FIFO
排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize
后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize
,则会执行拒绝策略LinkedBlockingQuene
:基于链表的无界阻塞队列(其实最大容量为Interger.MAX
),按照FIFO
排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize
后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize
,因此使用该工作队列时,参数maxPoolSize
其实是不起作用的SynchronousQuene
:一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize
,则执行拒绝策略PriorityBlockingQueue
:具有优先级的无界阻塞队列,优先级通过参数Comparator
实现
ThreadFactory threadFactory
:线程工厂RejectedExecutionHandler handler
:拒绝策略
===== 源码 =====
// Executors
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// Executors
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
四种拒绝策略
AbortPolicy
:中止策略。默认的拒绝策略,直接抛出RejectedExecutionException
。调用者可以捕获这个异常,然后根据需求编写自己的处理代码DiscardPolicy
:抛弃策略。什么都不做,直接抛弃被拒绝的任务DiscardOldestPolicy
:抛弃最老策略。抛弃阻塞队列中最老的任务,相当于就是队列中下一个将要被执行的任务,然后重新提交被拒绝的任务。如果阻塞队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将该策略和优先级队列放在一起使用CallerRunsPolicy
:调用者运行策略。在调用者线程中执行该任务。该策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者(调用线程池执行任务的主线程),由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任务,从而使得线程池有时间来处理完正在执行的任务
// RejectedExecutionHandler
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// ThreadPoolExecutor
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
// ThreadPoolExecutor
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
// ThreadPoolExecutor
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
// ThreadPoolExecutor
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
最大线程数(调优)
CPU
密集型
通过代码 Runtime.getRuntime().availableProcessors()
获取服务器的核数,是多少就设置多少,可以保持 CPU
的效率最高
IO
密集型
判断你程序中十分耗 IO
的线程,比如有 15
个,那么大于他就行,设置 30
个