
此次需求做的是同步功能,将第三方数据拉取到本地
1、同步将由多线程进行,节省时间
2、拉取过来后将与本地表进行对比:add、update、delete三种情况
3、本地数据修改后,将通过rabbitmq形式发送消息到其他服务进行业务处理。
通过submit 方法能够获取线程的返回结果
通过execute方法不会获取线程返回
在此 我使用了submit方法来执行多线程任务
一般来说Future和Callable是搭配在一起使用的
get方法为阻塞式超过最大等待时间将抛出异常
很多场景下应用程序必须能够处理一系列传入请求,简单的处理方式是通过一个线程顺序的处理这些请求,如下图:
单线程策略的优势和劣势都非常明显:
优势:设计和实现简单;劣势:这种方式会带来处理效率的问题,单线程的处理能力是有限,不能发挥多核处理器优势。
在这种场景下我们就需要考虑并发,一个简单的并发策略就是Thread-Per-Message模式,即为每个请求使用一个新的线程。
Thread-Per-Message策略的优势和劣势也非常明显:
优势:设计和实现比较简单,能够同时处理多个请求,提升响应效率;
劣势:主要在两个方面
1资源消耗 引入了在串行执行中所没有的开销,包括线程创建和调度,任务处理,资源分配和回收以及频繁上下文切换所需的时间和资源。2安全
有没有一种方式可以并发执行又可以克服Thread-Per-Message的问题?
采用线程池的策略,线程池通过控制并发执行的工作线程的最大数量来解决Thread-Per-Message带来的问题。可见下图,请求来临时先放入线程池的队列
线程池可以接受一个Runnable或Callable<T>任务,并将其存储在临时队列中,当有空闲线程时可以从队列中拿到一个任务并执行。
反例(使用 Thread-Per-Message 策略)
正例(使用 线程池 策略)
JAVA 中(JDK 15+)线程池的种类:
程序不能使用来自有界线程池的线程来执行依赖于线程池中其他任务的任务。
有两个场景:
要缓解上面两个场景产生的问题有两个简单的办法:
真正解决此类方法还是需要梳理线程池执行业务流程,不要在有界线程池中执行相互依赖的任务,防止出现竞争和死锁。
向线程池提交的任务需要支持中断。从而保证线程可以中断,线程池可以关闭。线程池支持 javautilconcurrentExecutorServiceshutdownNow() 方法,该方法尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务的列表。
但是 shutdownNow() 除了尽力尝试停止处理主动执行的任务之外不能保证一定能够停止。例如,典型的实现是通过Threadinterrupt()来停止,因此任何未能响应中断的任务可能永远不会终止,也就造成线程池无法真正的关闭。
反例:
正例:
线程池中的所有任务必须提供机制,如果它们异常终止,则需要通知应用程序
如果不这样做不会导致资源泄漏,但由于池中的线程仍然被会重复使用,使故障诊断非常困难或不可能。
在应用程序级别处理异常的最好方法是使用异常处理。异常处理可以执行诊断 *** 作,清理和关闭Java虚拟机,或者只是记录故障的详细信息。
也就是说在线程池里执行的任务也需要能够抛出异常并被捕获处理。
任务恢复或清除 *** 作可以通过重写 javautilconcurrentThreadPoolExecutor 类的 afterExecute() 钩子来执行。
当任务通过执行其 run() 方法中的所有语句并且成功结束任务,或者由于异常而导致任务停止时,将调用此钩子。
可以通过自定义 ThreadPoolExecutor 服务来重载 afterExecute()钩子。
还可以通过重载 terminated() 方法来释放线程池获取的资源,就像一个finally块。
反例:
任务意外终止时作为一个运行时异常,无法通知应用程序。此外,它缺乏恢复机制。因此,如果Task抛出一个NullPointerException ,异常将被忽略。
正例:
另外一种方式是使用 ExecutorServicesubmit() 方法(代替 execute() 方法)将任务提交到线程池并获取 Future 对象。
当通过 ExecutorServicesubmit() 提交任务时,抛出的异常并未到达未捕获的异常处理机制,因为抛出的异常被认为是返回状态的一部分,因此被包装在ExecutionException ,并由Futureget() 返回。
javalangThreadLocal 类提供线程内的本地变量。根据Java API
ThreadLocal对象需要关注那些对象被线程池中的多个线程执行的类。
线程池缓存技术允许线程重用以减少线程创建开销,或者当创建无限数量的线程时可以降低系统的可靠性。
当 ThreadLocal 对象在一个线程中被修改,随后变得可重用时,在重用的线程上执行的下一个任务将能看到该线程上执行过的上一个任务修改的ThreadLocal 对象的状态。
所以要在使用线程池时重新初始化的ThreadLocal对象实例。
反例:
DiaryPool类创建了一个线程池,它可以通过一个共享的无界的队列来重用固定数量的线程。
在任何时候,不超过numOfThreads个线程正在处理任务。如果在所有线程都处于活动状态时提交其他任务,则 它们在队列中等待,直到线程可用。
当线程循环时,线程的线程局部状态仍然存在。
下表显示了可能的执行顺序:
时间任务线程池提交方法日期1t11doSomething1()星期五2t22doSomething2()星期一3t31doSomething3()星期五
在这个执行顺序中,期望从doSomething2() 开始的两个任务( t 2和t 3 doSomething2() 将当天视为星 期一。然而,因为池线程1被重用,所以t 3观察到星期五。
解决方案(try-finally条款)
符合规则的方案removeDay() 方法添加到Diary类,并在try‐finally 块中的实现doSomething1() 类的doSomething1() 方法的语句。finally 块通过删除当前线程中的值来恢复threadlocal类型的days对象的初始状态。
如果threadlocal变量再次被同一个线程读取,它将使用initialValue()方法重新初始化 ,除非任务已经明确设置了变量的值。这个解决方案将维护的责任转移到客户端( DiaryPool ),但是当Diary类不能被修改时是一个好的选择。
解决方案(beforeExecute())
使用一个自定义ThreadPoolExecutor 来扩展 ThreadPoolExecutor并覆盖beforeExecute() 方法。beforeExecute() 方法在Runnable 任务在指定线程中执行之前被调用。该方法在线程 “t” 执行任务 “r” 之前重新初始化 threadlocal 变量。
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务
corePoolSize 核心线程数
maximumPoolSize 最大线程数
线程池中允许的最大线程数。
keepAliveTime 线程空闲时的存活时间
当线程没有执行任务时,继续存活的时间。当线程池中的线程数量大于核心线程数时,即时没有新任务提交,核心线程外的线程也不会立即销毁,而是等待keepAliveTime才会销毁。
unit 线程空闲时的存活时间单位
workQueue 阻塞队列
阻塞队列:1)支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满;2)支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空
生产者和消费者模式 能够解决并发问题,通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。通过阻塞队列来进行通信,生产者生产完数据不用等待消费者处理,直接扔给阻塞队列,消费者也直接从阻塞队列中取数据,既能够解耦,又平衡两者的处理能力。
常用阻塞队列:
threadFactory 创建线程的工厂
handler 拒绝策略
当阻塞队列满了,且没有空闲的工作线程,继续提交任务会采取一种策略处理新任务。线程池提供了4中策略:
1)AbortPolicy:直接抛出异常,默认策略
2)CallerRunsPolicy:用调用者所在的线程来执行任务
3)DiscardPolicy:直接丢弃任务
4)DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务
也可以自己根据应用场景(如记录日志或持久化储存不能处理的任务)实现RejectedExecutionHandler接口,自定义拒绝策略。
看下ThreadPoolExecutor的execute方法:
流程图:
提交任务:
关闭线程池:遍历线程池中的工作线程,然后逐个调用interrupt方法来中断线程。
1ThreadPoolTaskExecutor的基本参数:因为它是基于ThreadPoolExecutor来实现的,我们可以参考ThreadPoolExecutor的构造函数
2线程池的执行流程:当提前任务到当前线程池时,先判断当前线程池中线程数量是否小于corePoolSize,如果小于创建新的线程处理请求,不管当前有没有线程闲置;如果大于等于,则将线程想放入阻塞队列workQueue中,线程池中存在空闲的线程后会去处理workQueue中任务;如果workQueue也满了,则会新建工作线程处理任务,当线程池中的线程大于最大线程数maxPoolSize时,则会用选定的拒绝策略来处理新的线程
下面是源码部分(使用execute()方法):
注意:ThreadPoolTaskExecutor中execute()有重载方法,但最后都是调用同一方法,处理逻辑相同
从上看出其实线程池执行任务的主要方法是addWorker()方法,execute()方法只是将任务提交以及做一些判断,我们看一下addWorker()方法:
上面源码中调用start()方法启动线程,其实是调用Worker中run()方法来启动,因为Worker实现实现了Runnable接口,如下
调用Worker的run()方法,从图中可以看出本质执行的方法是runWorker()方法,源码如下:
源码执行流程图如下:
主线程抛出一个子线程异步处理一些东西,这时主线程要等待子线程运行完成再完成(其实我是为了统计运行时间的)。这里抛出的子线程可能递归的调用自己,就是再抛一个他的子线程出来,但是到底一共抛多少,事先是不知道的。应用场景:1)多线程扫描文件夹内的文件,遇到文件夹内有子文件夹,要递归调用扫描线程的,等到全部扫描完成后,返回结果,显示;2)多线程快速排序,第一次肯定是单线程的,第一次排序完成后,会分两半,这两半多线程排,递归调用了这个排序线程,这两半很有可能,极大有可能再各分两半,也就是会有4个子线程的子线程再排序。我试过网上的那个CountDownLatch,但是他只能实现定义好子线程的数量,但是在以上两种情景下,事先你是不知道会有多少个子线程的!PS:在某种需求中,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择CyclicBarrier了。这个貌似也要在开始的时候设定总线程数:CyclicBarrier(intparties)这个和countDownLatch就差不多了呢!你觉得呢问题补充:niuzai写道亲,CyclicBarrier可能是你想要的。PS:在某种需求中,比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择CyclicBarrier了。我再来看看~~试试看!问题补充:niuzai写道亲,CyclicBarrier这个东东是可以动态重置个数的,而countDownLatch是一次性的。只不过大多数例子CyclicBarrier初始化了个数罢了,实质上它是可以动态改变的~嗯我试了下,多线程快排,小数据量还好,顺利执行了,但是数多了后,会建N多线程等待,会outofmemory,呵呵!不过证明这个方法是可以的!问题补充:niuzai写道亲,那你就结合线程池 *** 作,设置线程数目上限。不要每个任务就产生一个线程咯~产生新的线程是很耗内存的,线程太多当然就内存溢出咯~嗯你说的很对!要结合线程池的!
文章框架
线程开启方式
--1通过异步委托实现线程
----11定义线程
----12检测委托线程结束,通过while循环,等待句柄,函数回调
--2通过thread类开启线程
----21定义线程
----22如何传递参数
----23线程优先级
----24线程控制
--3、通过线程池开启线程
--4、通过任务开启线程
----41通过任务或任务工厂
----42连续任务
----43任务的层次结构
----44任务的执行结果
调用线程控制方法启动:ThreadStart();停止:ThreadAbort();暂停:ThreadSuspend();继续:ThreadResume();
值得注意的是: 通过 ThreadAbort() 停下来的线程(或自行运行结束的线程),都无法直接通过 ThreadStart() 方法再次启动,必须重新创建一个线程启动。
注意:线程池中的线程都是后台线程,不能修改为前台线程,不能设置优先级
如果一个任务的执行依赖于另一个任务,即任务的执行有先后顺序。此时,我们可以使用连续任务。
taskContinueWith(ReadNews)表示一个任务task结束后,才开始执行另一个任务。
在一个任务中可以启动子任务,两个任务异步执行。默认情况下,子任务(即由外部任务创建的内部任务)将独立于其父任务执行。使用TaskCreationOptionsAttachedToParent显式指定将任务附加到任务层次结构中的某个父级。
如果父任务执行完了但是子任务没有执行完,则父任务的状态会被设置为WaitingForChildrenToComplete,只有子任务也执行完了,父任务的状态才会变成RunToCompletion。
使用Task的泛型版本,可以返回任务的执行结果。
下面例子中的TaskWithResult的输入为object类型,返回一个元组Tuple<int, int>。
定义调用TaskWithResult的任务时,使用泛型类Task<Tuple<int, int>>,泛型的参数定义了返回类型。通过构造函数,传递TaskWithResult,构造函数的第二个参数定义了TaskWithResult的输入值。
任务完成后,通过Result属性获取任务的结果。
可以弄个List存放所有任务,任务完成了就删掉。 大概代码如下,你可以调试看看 private ExecutorService threadPool; private List<Callable<Boolean>> loadTasks; private List<Future<Boolean>> loadResults; private int numberOfTasks; public void load() { threadPool = ExecutorsnewCachedThreadPool(); loadTasks = new ArrayList<Callable<Boolean>>(); // loadTasksadd( new Callable<Boolean>() { public Boolean call() throws Exception { //add one task } }); //add some other tasks loadResults = new ArrayList<Future<Boolean>>(); for( Callable<Boolean> task : loadTasks ) { loadResultsadd( threadPoolsubmit( task ) ); } numberOfTasks = loadResultssize(); } public float getLoadPercent() { Iterator<Future<Boolean>> it = loadResultsiterator(); while (ithasNext()) { Future<Boolean> next = itnext(); if (nextisDone()) { try { if (nextget()) { itremove(); } } catch (Exception ex) { exprintStackTrace(); } } } float percent = (numberOfTasks - loadResultssize()) / (float) numberOfTasks; return percent; }
在 *** 作系统中,线程是 *** 作系统调度的最小单元,同时线程又是一种受限的系统资源,即线程不可能无限制地产生,并且 线程的创建和销毁都会有相应的开销。 当系统中存在大量的线程时,系统会通过会时间片轮转的方式调度每个线程,因此线程不可能做到绝对的并行。
如果在一个进程中频繁地创建和销毁线程,显然不是高效的做法。正确的做法是采用线程池,一个线程池中会缓存一定数量的线程,通过线程池就可以避免因为频繁创建和销毁线程所带来的系统开销。
AsyncTask是一个抽象类,它是由Android封装的一个轻量级异步类(轻量体现在使用方便、代码简洁),它可以在线程池中执行后台任务,然后把执行的进度和最终结果传递给主线程并在主线程中更新UI。
AsyncTask的内部封装了 两个线程池 (SerialExecutor和THREAD_POOL_EXECUTOR)和 一个Handler (InternalHandler)。
其中 SerialExecutor线程池用于任务的排队,让需要执行的多个耗时任务,按顺序排列 , THREAD_POOL_EXECUTOR线程池才真正地执行任务 , InternalHandler用于从工作线程切换到主线程 。
1AsyncTask的泛型参数
AsyncTask是一个抽象泛型类。
其中,三个泛型类型参数的含义如下:
Params: 开始异步任务执行时传入的参数类型;
Progress: 异步任务执行过程中,返回下载进度值的类型;
Result: 异步任务执行完成后,返回的结果类型;
如果AsyncTask确定不需要传递具体参数,那么这三个泛型参数可以用Void来代替。
有了这三个参数类型之后,也就控制了这个AsyncTask子类各个阶段的返回类型,如果有不同业务,我们就需要再另写一个AsyncTask的子类进行处理。
2AsyncTask的核心方法
onPreExecute()
这个方法会在 后台任务开始执行之间调用,在主线程执行。 用于进行一些界面上的初始化 *** 作,比如显示一个进度条对话框等。
doInBackground(Params)
这个方法中的所有代码都会 在子线程中运行,我们应该在这里去处理所有的耗时任务。
任务一旦完成就可以通过return语句来将任务的执行结果进行返回,如果AsyncTask的第三个泛型参数指定的是Void,就可以不返回任务执行结果。 注意,在这个方法中是不可以进行UI *** 作的,如果需要更新UI元素,比如说反馈当前任务的执行进度,可以调用publishProgress(Progress)方法来完成。
onProgressUpdate(Progress)
当在后台任务中调用了publishProgress(Progress)方法后,这个方法就很快会被调用,方法中携带的参数就是在后台任务中传递过来的。 在这个方法中可以对UI进行 *** 作,在主线程中进行,利用参数中的数值就可以对界面元素进行相应的更新。
onPostExecute(Result)
当doInBackground(Params)执行完毕并通过return语句进行返回时,这个方法就很快会被调用。返回的数据会作为参数传递到此方法中, 可以利用返回的数据来进行一些UI *** 作,在主线程中进行,比如说提醒任务执行的结果,以及关闭掉进度条对话框等。
上面几个方法的调用顺序:
onPreExecute() --> doInBackground() --> publishProgress() --> onProgressUpdate() --> onPostExecute()
如果不需要执行更新进度则为onPreExecute() --> doInBackground() --> onPostExecute(),
除了上面四个方法,AsyncTask还提供了onCancelled()方法, 它同样在主线程中执行,当异步任务取消时,onCancelled()会被调用,这个时候onPostExecute()则不会被调用 ,但是要注意的是, AsyncTask中的cancel()方法并不是真正去取消任务,只是设置这个任务为取消状态,我们需要在doInBackground()判断终止任务。就好比想要终止一个线程,调用interrupt()方法,只是进行标记为中断,需要在线程内部进行标记判断然后中断线程。
3AsyncTask的简单使用
这里我们模拟了一个下载任务,在doInBackground()方法中去执行具体的下载逻辑,在onProgressUpdate()方法中显示当前的下载进度,在onPostExecute()方法中来提示任务的执行结果。如果想要启动这个任务,只需要简单地调用以下代码即可:
4使用AsyncTask的注意事项
①异步任务的实例必须在UI线程中创建,即AsyncTask对象必须在UI线程中创建。
②execute(Params params)方法必须在UI线程中调用。
③不要手动调用onPreExecute(),doInBackground(Params params),onProgressUpdate(Progress values),onPostExecute(Result result)这几个方法。
④不能在doInBackground(Params params)中更改UI组件的信息。
⑤一个任务实例只能执行一次,如果执行第二次将会抛出异常。
先从初始化一个AsyncTask时,调用的构造函数开始分析。
这段代码虽然看起来有点长,但实际上并没有任何具体的逻辑会得到执行,只是初始化了两个变量,mWorker和mFuture,并在初始化mFuture的时候将mWorker作为参数传入。mWorker是一个Callable对象,mFuture是一个FutureTask对象,这两个变量会暂时保存在内存中,稍后才会用到它们。 FutureTask实现了Runnable接口,关于这部分内容可以看这篇文章。
mWorker中的call()方法执行了耗时 *** 作,即result = doInBackground(mParams);,然后把执行得到的结果通过postResult(result);,传递给内部的Handler跳转到主线程中。在这里这是实例化了两个变量,并没有开启执行任务。
那么mFuture对象是怎么加载到线程池中,进行执行的呢?
接着如果想要启动某一个任务,就需要调用该任务的execute()方法,因此现在我们来看一看execute()方法的源码,如下所示:
调用了executeOnExecutor()方法,具体执行逻辑在这个方法里面:
可以 看出,先执行了onPreExecute()方法,然后具体执行耗时任务是在execexecute(mFuture),把构造函数中实例化的mFuture传递进去了。
exec具体是什么?
从上面可以看出具体是sDefaultExecutor,再追溯看到是SerialExecutor类,具体源码如下:
终于追溯到了调用了SerialExecutor 类的execute方法。SerialExecutor 是个静态内部类,是所有实例化的AsyncTask对象公有的,SerialExecutor 内部维持了一个队列,通过锁使得该队列保证AsyncTask中的任务是串行执行的,即多个任务需要一个个加到该队列中,然后执行完队列头部的再执行下一个,以此类推。
在这个方法中,有两个主要步骤。
①向队列中加入一个新的任务,即之前实例化后的mFuture对象。
②调用 scheduleNext()方法,调用THREAD_POOL_EXECUTOR执行队列头部的任务。
由此可见SerialExecutor 类仅仅为了保持任务执行是串行的,实际执行交给了THREAD_POOL_EXECUTOR。
THREAD_POOL_EXECUTOR又是什么?
实际是个线程池,开启了一定数量的核心线程和工作线程。然后调用线程池的execute()方法。执行具体的耗时任务,即开头构造函数中mWorker中call()方法的内容。先执行完doInBackground()方法,又执行postResult()方法,下面看该方法的具体内容:
该方法向Handler对象发送了一个消息,下面具体看AsyncTask中实例化的Hanlder对象的源码:
在InternalHandler 中,如果收到的消息是MESSAGE_POST_RESULT,即执行完了doInBackground()方法并传递结果,那么就调用finish()方法。
如果任务已经取消了,回调onCancelled()方法,否则回调 onPostExecute()方法。
如果收到的消息是MESSAGE_POST_PROGRESS,回调onProgressUpdate()方法,更新进度。
InternalHandler是一个静态类,为了能够将执行环境切换到主线程,因此这个类必须在主线程中进行加载。所以变相要求AsyncTask的类必须在主线程中进行加载。
到此为止,从任务执行的开始到结束都从源码分析完了。
AsyncTask的串行和并行
从上述源码分析中分析得到,默认情况下AsyncTask的执行效果是串行的,因为有了SerialExecutor类来维持保证队列的串行。如果想使用并行执行任务,那么可以直接跳过SerialExecutor类,使用executeOnExecutor()来执行任务。
四、AsyncTask使用不当的后果
1)生命周期
AsyncTask不与任何组件绑定生命周期,所以在Activity/或者Fragment中创建执行AsyncTask时,最好在Activity/Fragment的onDestory()调用 cancel(boolean);
2)内存泄漏
3) 结果丢失
屏幕旋转或Activity在后台被系统杀掉等情况会导致Activity的重新创建,之前运行的AsyncTask(非静态的内部类)会持有一个之前Activity的引用,这个引用已经无效,这时调用onPostExecute()再去更新界面将不再生效。
自己是从事了七年开发的Android工程师,不少人私下问我,2019年Android进阶该怎么学,方法有没有?
没错,年初我花了一个多月的时间整理出来的学习资料,希望能帮助那些想进阶提升Android开发,却又不知道怎么进阶学习的朋友。 包括高级UI、性能优化、架构师课程、NDK、Kotlin、混合式开发(ReactNative+Weex)、Flutter等架构技术资料 ,希望能帮助到您面试前的复习且找到一个好的工作,也节省大家在网上搜索资料的时间来学习。
以上就是关于Future配合线程池进行多线程任务并返回结果全部的内容,包括:Future配合线程池进行多线程任务并返回结果、使用线程池时一定要注意的五个点、线程池工作机制等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)