线程池

线程池

为什么要用线程池?

池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池在实际应用中广泛应用于以下场景:

Web服务器:Web服务器通常需要处理多个并发的客户端请求。通过使用线程池,可以为每个请求分配一个线程,提高服务器的并发处理能力。

数据库连接池:数据库连接是一种宝贵的资源,每次请求都需要建立和释放连接会产生较大的开销。通过使用线程池管理数据库连接,可以避免频繁的连接建立和释放操作,提高数据库访问的效率。

多线程任务调度:某些应用场景下需要执行大量的异步任务或定时任务。通过使用线程池,可以方便地管理和调度这些任务,将任务分配给线程池中的线程进行执行。

并行计算任务:在一些需要进行大规模并行计算的场景中,可以利用线程池将计算任务分配给多个线程并发执行,从而提高计算性能和吞吐量。

后台任务处理:应用程序可能需要处理一些耗时的后台任务,如数据导入、文件处理、图片压缩等。通过将这些任务提交给线程池,可以在后台异步进行处理,不阻塞主线程的执行。

网络编程:在网络编程中,通常需要处理多个客户端连接,每个连接可能对应一个长期运行的任务。通过使用线程池,可以为每个连接分配一个线程来处理请求,实现并发的网络通信。

Executors

java.util.concurrent.Executors:线程池的工厂类,用来生成线程池

我们可以创建多种类型的 ThreadPoolExecutor

  • FixedThreadPool该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool 该方法返回一个可根据实际情况调整线程数量的线程池。初始大小为 0。当有新任务提交时,如果当前线程池中没有线程可用,它会创建一个新的线程来处理该任务。如果在一段时间内(默认为 60 秒)没有新任务提交,核心线程会超时并被销毁,从而缩小线程池的大小。
  • **ScheduledThreadPool**:该方法返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池。

为什么不推荐使用内置线程池?

Executors 返回线程池对象的弊端如下

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue任务队列最大长度为 Integer.MAX_VALUE,FixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的程,从而导致 OOM。
  • ScheduledThreadPoolSingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueueDelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。**DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,**所以最多只能创建核心线程数的线程。可能堆积大量的请求,从而导致 OOM

ExecutorService

java.util.concurrent.ExecutorService 是 Java 并发编程中用于执行异步任务的高级接口,它扩展了 Executor 接口。ExecutorService 提供了一系列方法用于提交任务、管理任务的执行以及关闭服务。以下是一些主要的方法:

  1. submit(Callable<T> task): Future<T>
    • 提交一个带有返回值的任务,并返回一个 Future 对象,用于获取任务的执行结果。
  2. submit(Runnable task): Future<?>
    • 提交一个不带返回值的任务,并返回一个 Future 对象。
  3. submit(Runnable task, T result): Future<T>
    • 提交一个不带返回值的任务,并返回一个 Future 对象,该对象会在任务完成时持有给定的结果。
  4. invokeAll(Collection<? extends Callable<T>> tasks): List<Future<T>>
    • 提交一组带有返回值的任务,并返回一个包含所有 Future 对象的列表。该方法会阻塞,直到所有任务完成。
  5. invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): List<Future<T>>
    • 提交一组带有返回值的任务,并返回一个包含所有 Future 对象的列表。该方法会阻塞,直到所有任务完成,或者超过指定的超时时间。
  6. invokeAny(Collection<? extends Callable<T>> tasks): T
    • 提交一组带有返回值的任务,返回其中一个成功执行的任务的结果。该方法会阻塞,直到至少有一个任务完成。
  7. invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): T
    • 提交一组带有返回值的任务,返回其中一个成功执行的任务的结果。该方法会阻塞,直到至少有一个任务完成,或者超过指定的超时时间。
  8. shutdown(): void
    • 优雅地关闭 ExecutorService,不再接受新的任务,但会等待已经提交的任务完成。
  9. shutdownNow(): List<Runnable>
    • 立即关闭 ExecutorService,尝试停止所有正在执行的任务,不等待任务完成,并返回尚未开始执行的任务列表。
  10. isShutdown(): boolean
    • 判断 ExecutorService 是否已经调用了 shutdown 方法。
  11. isTerminated(): boolean
    • 判断 ExecutorService 是否已经完全终止,即所有任务已经完成。
  12. awaitTermination(long timeout, TimeUnit unit): boolean
    • 阻塞当前线程,直到 ExecutorService 中所有任务完成或超时,或者当前线程被中断。
1
2
3
4
5
6
public class RunnableImpl implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"创建了一个新的线程执行");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Demo01ThreadPool {
public static void main(String[] args) {
//使用线程池的工厂类Executors里边提供的静态方法newFixedThreadPool生产一个指定线程数量的线程池
ExecutorService es = Executors.newFixedThreadPool(2);
//调用ExecutorService中的方法submit,传递线程任务(实现类),开启线程,执行run方法
es.submit(new RunnableImpl());//pool-1-thread-1创建了一个新的线程执行
//线程池会一直开启,使用完了线程,会自动把线程归还给线程池,线程可以继续使用
es.submit(new RunnableImpl());//pool-1-thread-1创建了一个新的线程执行
es.submit(new RunnableImpl());//pool-1-thread-2创建了一个新的线程执行

//调用ExecutorService中的方法shutdown销毁线程池(不建议执行)
es.shutdown();
es.submit(new RunnableImpl());//抛异常,线程池都没有了,就不能获取线程了
}
}

ThreadPoolExecutor

ThreadPoolExecutor 是 Java 中 ExecutorService 接口的一个实现,它提供了一个可灵活配置的线程池。ThreadPoolExecutor 允许你在创建时指定线程池的大小、任务队列、拒绝策略等参数,以满足不同应用场景的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor核心参数

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,多余的空闲线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁,线程池回收线程时,会对核心线程和非核心线程一视同仁,直到线程池中线程的数量等于 corePoolSize ,回收过程才会停止。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :饱和策略。

线程池的生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

1
2
3
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:

image-20240309200040813

其生命周期转换如下入所示:

image-20240309200103832

线程池的执行机制

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

其执行流程如下图所示:

image-20240309202334728

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:

image-20240309202047180

核心线程的创建和销毁时机

初始化时预创建调用 prestartAllCoreThreads 方法:可以显式地调用 ThreadPoolExecutorprestartAllCoreThreads 方法来预先启动所有核心线程,即使当前没有任务需要执行。

懒创建:在某些实现中,核心线程可能会在第一个任务到来时才被创建,而不是在线程池初始化时。这意味着核心线程的数量会随着任务的到来而逐渐增加,直到达到核心线程数的限制。当线程池接收到一个新任务,并且工作队列为空,同时当前运行的线程数量小于核心线程数时,线程池会创建一个新的核心线程来执行这个任务。

空闲线程复用:如果线程池中的某个核心线程空闲了,它会等待新的任务。如果新的任务到达,这个空闲的核心线程会被复用,而不会立即被销毁。

线程空闲超时(如果允许):如果线程池配置了允许核心线程超时(通过 allowCoreThreadTimeOut(true)),并且核心线程在一定时间内没有任务执行,那么这些核心线程可能会被终止。

线程池的保活机制了解吗?

线程保活是指线程池在空闲时保持一定数量的线程以便快速响应新任务。线程保活主要通过以下参数控制:

  1. **keepAliveTime**:线程空闲后的存活时间。
  2. **allowCoreThreadTimeOut**:是否允许核心线程超时。
  3. **timeUnit**:keepAliveTime的时间单位。

当线程池的活动线程数大于核心线程数时,超出核心线程数的线程会在空闲一定时间后被终止,以节省资源

如何保活

while (task != null || (task = getTask()) != null)

线程池中的线程通过while循环不停的getTask获取任务,保证线程需要一直接活

  • 如果获取到任务,就执行任务中我们自己写的代码。
  • 如果获取的是null,则退出while循环,该线程消失。

getTask

是否允许回收线程

1
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

当allowCoreThreadTimeOut为true或者工作线程数量大于核心线程数量timed为true

保活的关键在于阻塞队列

1
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

根据timed 的状态判断,当工作线程数量大于核心线程数量时调用poll方法获取任务,当工作线程数量小于和核心线程数量时调用take方法。

对于阻塞队列的介绍如下:

  • poll 方法如果队列不为空,返回头部元素。如果队列为空会将线程阻塞在此处,阻塞时间是keepAliveTime。当时间到了获取不到任务时返回null。
  • take方法如果队列不为空返回头部元素。如果队列为空会将线程阻塞在此处,直到队列中有元素可供使用。

以上代码不管执行poll还是take,在空队列的情况下,线程都会阻塞在这

对于非核心线程,当线程池中的线程数量超过核心线程数量且空闲时间超过keepAliveTime时,非核心线程会被回收。

通过这种方式,线程在没有任务时就阻塞在队列上,从而实现保活。

这就是线程保活的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
....代码省略
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

但在执行我们自己的代码逻辑时,可能会抛出异常,这就导致跳出了while循环。

注:completedAbruptly变量很关键,表示当前线程是否是正常退出while循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final void runWorker(Worker w) {
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
.....
// 执行我们写的代码逻辑
task.run();
....
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

如果是核心线程,就直接消失不接活了,这并不是我们想看到的结果。所以会在processWorkerExit中会进行补救,补救的措施也很简单,根据completedAbruptly变量判断,如果是异常跳出循环,那就再新增一个,这也算是保活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void processWorkerExit(Worker w, boolean completedAbruptly) {
......
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

如何回收

核心线程

一般情况,核心线程是不需要被回收的,如果想让核心线程也需要等待之后被回收,执行以下代码。

1
threadPoolExecutor.allowCoreThreadTimeOut(true);

非核心线程

在创建线程池的时候,会设置一个空闲时间,这个空闲时间也就是获取任务时的阻塞时间。

1
2
3
4
5
6
7
8
9
10
11
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}

如果不是核心线程,会执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果阻塞时间到了,还没有任务,则会返回null,执行timedOut = true,然后进入下一个循环。在下个循环中,会判断timedOut 的值,如果等于true ,表示这个线程的空闲时间到了,该消失了。此时会返回null,使当前线程退出外面的while循环,不再获取任务。由于是正常跳出,所以在最后的processWorkerExit方法中并不会补一个线程。

1
2
3
4
5
6
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

总结

  • 线程池根据阻塞队列的阻塞原理进行线程保活。
  • 线程池根据阻塞队列的两种不同的阻塞方式进行回收。如果是非核心线程,如果阻塞时间到了,则退出消失。
    如果线程因异常情况要消失,会在线程池中补一个线程。

总结
线程池根据阻塞队列的阻塞原理进行线程保活。
线程池根据阻塞队列的两种不同的阻塞方式进行回收。如果是非核心线程,如果阻塞时间到了,则退出消失。
如果线程因异常情况要消失,会在线程池中补一个线程。

线程池的拒绝策略有哪些?

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉,但不抛出异常
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求,然后重新提交该任务
  • ThreadPoolExecutor.CallerRunsPolicy 让调用线程(提交任务的线程)自己执行被拒绝的任务。也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • 重写拒接策略:你也可以通过实现 RejectedExecutionHandler 接口来创建自己的拒绝策略。

image-20240309195014584

线程池常用的阻塞队列有哪些?

新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

image-20240309200811416

如何设计一个能够根据任务的优先级来执行的线程池?如何解决OOM和饥饿问题

这是一个常见的面试问题,本质其实还是在考察求职者对于线程池以及阻塞队列的掌握。

我们上面也提到了,不同的线程池会选用不同的阻塞队列作为任务队列,比如FixedThreadPool 使用的是LinkedBlockingQueue(无界队列),由于队列永远不会被放满,因此FixedThreadPool最多只能创建核心线程数的线程。

假如我们需要实现一个优先级任务线程池的话,那可以考虑使用 PriorityBlockingQueue (优先级阻塞队列)作为任务队列(ThreadPoolExecutor 的构造函数有一个 workQueue 参数可以传入任务队列)。

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,可以看作是线程安全的 PriorityQueue,两者底层都是使用小顶堆形式的二叉堆,即值最小的元素优先出队。不过,PriorityQueue 不支持阻塞操作。

要想让 PriorityBlockingQueue 实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:

  1. 提交到线程池的任务实现 Comparable 接口,并重写 compareTo 方法来指定任务之间的优先级比较规则。
  2. 创建 PriorityBlockingQueue 时传入一个 Comparator 对象来指定任务之间的排序规则(推荐)。

不过,这存在一些风险和问题,比如:

  • PriorityBlockingQueue 是无界的,可能堆积大量的请求,从而导致 OOM。
  • 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。
  • 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁 ReentrantLock),因此会降低性能。

对于 OOM 这个问题的解决比较简单粗暴,就是继承PriorityBlockingQueue 并重写一下 offer 方法(入队)的逻辑,当插入的元素数量超过指定值就返回 false 。

饥饿问题这个可以通过优化设计来解决(比较麻烦),比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升。

对于性能方面的影响,是没办法避免的,毕竟需要对任务进行排序操作。并且,对于大部分业务场景来说,这点性能影响是可以接受的。

如何设定线程池的大小?(核心线程数的大小)

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。

如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式

  • CPU 密集型任务(N+1): 在CPU密集型任务中,大部分时间都是由CPU执行计算操作,而不涉及等待外部资源的读取或写入。在这种情况下,将更多的线程分配给CPU密集型任务可能不会带来性能的显著提升,反而可能引入额外的线程管理开销和竞争条件。

    这个+1有什么讲究?

    这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  • I/O 密集型任务(2N): I/O任务相对于CPU密集型任务而言,通常涉及到等待外部资源(如磁盘、网络)的读取或写入操作,这些操作需要较长的时间来完成,期间CPU可以执行其他任务。因此,将更多的线程分配给I/O任务可以提高系统的并发性和整体的吞吐量。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

线程池保证任务执行的顺序性吗?

Java中的线程池本身并不提供内置的方式来保证任务的顺序执行的,因为线程池的设计目的是为了提高并发性能和效率,如果顺序执行的话,那就和单线程没区别了

如果想要实现这个功能该怎么做,有以下两种方式。

使用单线程的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
public static void main(String[] args) {
//创建一个新的单线程执行器
ExecutorService executor = Executors.newSingleThreadExecutor();//创建三个简单的Runnable任务
Runnable task1 = () -> System.out.println("1");
Runnable task2 = () -> System.out.println("2");
Runnable task3 = () -> System.out.println("3");
//提交任务到执行器
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
//关闭执行器
executor.shutdown();
}
}

使用定时的线程池并设置任务之间的依赖关系

可以使用Executors.newScheduledThreadPool(1)来创建这样一个线程池,并使用ScheduledFuture.get()来设置任务之间的依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Main {
public static void main(String[] args) {
try {
//创建一个新的定时线程池执行器
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//创建三个简单的callable任务
Callable<Integer> task1 = () -> {
System.out.println("1");
return 0; };
Callable<Integer> task2 = () -> {
System.out.println("2");
return 0; };
Callable<Integer> task3 = () -> {
System.out.println("3");
return 0; };
//提交任务到任务执行器,并获取Futu对象
//这段代码表示将任务task1提交给executor,立即执行(延迟时间为0毫秒)。
ScheduledFuture<Integer> future1 = executor.schedule(task1, 0, TimeUnit.MILLISECONDS);
future1.get();//等待task1完成
ScheduledFuture<Integer> future2 = executor.schedule(task2, 0, TimeUnit.MILLISECONDS);
future2.get();//等待task2完成
ScheduledFuture<Integer> future3 = executor.schedule(task3, 0, TimeUnit.MILLISECONDS);
future3.get();//等待task3完成
//关闭执行器
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

我们创建了一个定时线程池执行器,并提交了三个任务。每个任务都是一个打印一个数字并返回0的简单Callable对象。我们使用ScheduledFuture.get()方法来等待前一个任务完成,然后提交下一个任务。这样,我们可以确保任务按照依赖关系顺序执行。

不过要注意,这种方法的缺点是,如果前一个任务没有正常结束(例如抛出了异常),那么所有依赖它的后续任务都无法开始。同时,也需要处理可能的InterruptedException和ExecutionException。


线程池
http://example.com/2023/05/13/Java/Java多并发/线程池/
作者
PALE13
发布于
2023年5月13日
许可协议