java线程池(一) 简述线程池的几种使用方式

java线程池(一) 简述线程池的几种使用方式,第1张

首先说明下java线程是如何实现线程重用的

1. 线程执行完一个Runnable的run()方法后,不会被杀死

2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12

java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明

newFixedThreadPool

创建一个固定大小的线程池

添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。

@Test

public void newFixedThreadPool() throws Exception {

ExecutorService executorService = Executors.newFixedThreadPool(1)

executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build())

RunThread run1 = new RunThread("run 1")

executorService.execute(run1)

executorService.shutdown()

}12345678

newSingleThreadExecutor

仅支持单线程顺序处理任务

@Test

public void newSingleThreadExecutor() throws Exception {

ExecutorService executorService = Executors.newSingleThreadExecutor()

executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build())

executorService.execute(new RunThread("run 1"))

executorService.execute(new RunThread("run 2"))

executorService.shutdown()

}123456789

newCachedThreadPool

这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s

@Test

public void newCachedThreadPool() throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool()

executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build())

executorService.execute(new RunThread("run 1"))

executorService.execute(new RunThread("run 2"))

executorService.shutdown()

}123456789

newWorkStealingPool

支持给定的并行级别,并且可以使用多个队列来减少争用。

@Test

public void newWorkStealingPool() throws Exception {

ExecutorService executorService = Executors.newWorkStealingPool()

executorService = Executors.newWorkStealingPool(1)

RunThread run1 = new RunThread("run 1")

executorService.execute(run1)

executorService.shutdown()

}123456789

newScheduledThreadPool

看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用

支持定时周期执行任务(还没有看完)

@Test

public void newScheduledThreadPool() throws Exception {

ExecutorService executorService = Executors.newScheduledThreadPool(1)

executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build())

executorService.execute(new RunThread("run 1"))

executorService.execute(new RunThread("run 2"))

executorService.shutdown()

}

使用线程池的好处

1、降低资源消耗

可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

当任务到达时,任务可以不需要等到线程创建就能立即执行。

3、提高线程的可管理性

线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池的工作原理

首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

线程池饱和策略

这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

AbortPolicy

为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

DiscardPolicy

直接抛弃,任务不执行,空方法

DiscardOldestPolicy

从队列里面抛弃head的一个任务,并再次execute 此task。

CallerRunsPolicy

在调用execute的线程里面执行此command,会阻塞入口

用户自定义拒绝策略(最常用)

实现RejectedExecutionHandler,并自己定义策略模式

下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

1.jpg

2.jpg

1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

3.jpg

关键方法源码分析

我们看看核心方法添加到线程池方法execute的源码如下:

//     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current {@code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         {@code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if {@code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException()       //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get()       // 工作线程数小于核心线程数        if (workerCountOf(c) <corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return           c = ctl.get()       }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) &&workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get()           if (! isRunning(recheck) &&remove(command))                // 移除成功,拒绝该非运行的任务                reject(command)           else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false)       }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command)   }


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/11699822.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-18
下一篇2023-05-18

发表评论

登录后才能评论

评论列表(0条)

    保存