
- 并发整体框架图
- 实现一个计数器(体现原子性)
- 方式1:使用count++
- 方式2:加synchronized 和 volatile
- 方式3:AtomicInteger
- 线程的并发和同步
- Fork 和 Join
- 第一个阶段:分解任务ForkJoinPool
- 第二个阶段:合并结果
- 代码思路
- 案例应用:斐波那契数列
- 阻塞队列BlockingQueue
- 概念
- 常用实现类
- 常用方法
- LinkedBlockingQueue
- Future
- 概述
- Future模式组成部分
- 简单案例
- java中的Future模式
- happens-before
public class Counter{
private int count;
public void increase(){
count++;
}
问题
该代码可能在多线程条件下出现问题,因为count++是非原子的。
count++ 实际上等于三个 *** 作:读数据,加1,写回数据。
解决
为了防止多线程下访问increase方法会报错,所以给increase方法加锁。
count变量修改了其他线程可能看不到,所以就加个volatile关键字吧。
public class LockCounter{
private volatile count;
public synchronized void increase(){
count++;
}
}
问题
加锁会影响效率,可以考虑使用原子 *** 作类的形式。
public class AtomicCounter{
private AtomicInteger count = new AtomicInteger(0);
public void increase(){
count.incrementAndGet();
}
}
线程的并发和同步
| 区别 | 同步 | 异步 |
|---|---|---|
| 概念 | 同步过程调用发出后,在没有得到结果之前,该调用不返回或不继续执行后续 *** 作 多个线程同时访问同一个资源,一个线程访问结束之后,别的线程才能访问 | 异步过程调用后,调用者没有得到结果之前,就可以继续执行后续 *** 作 调用结果,通过状态、通知和回调来通知调用者 |
| *** 作 | 必须执行到底才能执行其他 *** 作 | *** 作可以同时执行 |
| 使用锁 | 多个线程使用同一把锁 | 多个线程在执行过程中使用不同的锁 |
JUC中提供了Fork/Join的并行计算框架,用来处理分治的情况。
分治的思想 = 分而治之,把复杂的问题分解成相似的子问题,然后子问题再分子问题,知道问题分到很简单不必再划分为止,然后层层返回问题的结果。
分治分为两个阶段:分解任务 和 合并结果
public class ForkJoinPool extends AbstractExecutorService
把任务分解为一个个小任务直至小任务可以简单的计算返回结果;(Fork分解任务)
ForkJoinPool治理分治任务的线程池,它有多个任务队列(区别于ThreadPoolExecutor线程池),通过ForkJoinPool的invoke、submit、execute提交任务的时候会根据一定规则分配给不同的任务队列(该队列是双端队列)。
ForkJoinPool 有一个机制,当某个工作线程对应消费的任务队列空闲的时候它会去别的忙的任务队列的尾部分担任务过来执行,这种执行又被称之为窃取线程。因为这个性质,所以采用双端队列。
窃取线程如何保证不和被窃取任务的线程冲突。队列的绑定工作线程都从队列头部取任务进行执行,窃取线程会从别的队列的尾部取任务执行。
提交任务方法
| 提交任务方法 | 说明 |
|---|---|
| < T > T invoke(ForkJoinTask< T > task) | 指定给定的任务,完成后返回其结果 同步,有返回结果(会阻塞) |
| Future< ? > submit(Runnable task) | 提交一个可运行的任务执行,返回一个表示该任务的Future 异步,有返回结果(Future<>) |
| void execute(Runnable task) | 在将来的某个时刻执行给定的命令 异步,无返回结果 |
构造方法
ForkJoinPool(int parallelism) —— 使用指定的并行级别创建一个ForkJoinPool,使用所有其他的参数的默认值。
完整的参数介绍如下:
| 参数 | 说明 |
|---|---|
| int parallelism | 并行级别 = 也就是设置最大并发数 默认值:Runtime.availableProcessors() |
| ForkJoinPool.ForkJoinWorkerThreadFactory factory | 创建新线程的工厂 默认值 defaultForkJoinWorkerThreadFactory |
| Thread.UncaughtExceptionHandler Handler | 由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序 默认值 null |
| boolean asyncMode | true = 为从未连接的分叉任务简历本地先进先出调度模式;默认值false = 基于本地堆栈的模式 |
| int corePoolSize | 核心线程数 = 保留在线程池中的线程数 ;默认值 = 并行级别数 |
| int maximumPoolSize | 允许的最大线程数;默认256 |
| int minimumRunnable | 未被连接组织的核心线程允许的最小数量;默认值是 1 |
| Predicate< ? super ForkJoinPool > saturate | 未被连接阻止的核心线程允许的最小数量;默认情况下,当一个线程即将被阻止连接或ForkJoinPool.ManagedBlocker,但由于超过maximumPoolSize不能被替换,因此抛出RejectExecutionException |
| long keepAliveTime | 在线程终止之前自上次使用以来经过的时间;默认值 60 |
| unit | keepAliveTime 参数的时间单位 |
双端队列
概念:
限定插入和删除 *** 作在表的两端进行的线性表,两端分别称为端点1和端点2,具有队列和栈性质的数据结构。
普通队列:队列头部删除元素、队列尾部添加元素;
双端队列:队列头部添加元素、队列尾部删除元素。
把每个小任务的结果合并返回得到最终结果。(Join合并结果)
ForkJoinTask,分治任务,等同于Runnable。
抽象类,核心方法是fork和join。fork用来异步执行一个子任务,join会阻塞当前线程等待子任务返回。
ForkJoinTask有两个子类:RecursiveAction 和 RecursiveTask 都是抽象类,通过递归来执行分治任务。
每个子类都有compute抽象方法(任务执行的主要计算量),区别在于RecursiveAction没有返回值、RecursiveTast有返回值。
1 创建ForkJoinPool,用于执行分治任务的线程池,parallelism = 并行级别,并发线程数
ForkJoinPool forkjoinpool = new ForkJoinPool(int parallelism);
2 创建具体 *** 作执行的类,需要继承ForkJoinTask类 或者其子类
public class ForkJoinTaskxhj类 extends RecursiveTask{}
3 创建ForkJoinTaskxhj类的对象
ForkJoinTaskxhj类 obj = new ForkJoinTaskxhj类(参数);
4 通过ForkJoinPool对象的invoke方法提交任务并执行
forkjoinpool.invoke(obj);
案例应用:斐波那契数列
斐波那契数列:1-1-2-3-5-8-13-21-34…
公式:F(1) = 1,F(2) = 1,F(n) = F(n-2) + F(n-1) (n>=3,n为正整数)
package offer2.Test52;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Test2 {
public static void main(String[] args) {
// 治理分治任务的线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 最大并发数4
// 要计算的斐波那契数列 的 第几个元素
Fibonacci fibonacci = new Fibonacci(20);
// 任务执行开始 的时间
long startTime = System.currentTimeMillis();
// 指定给定的任务,完成返回其结果
// forkJoinPool对象.invoke方法的参数是ForkJoinTask类,则就是它的对象或者其后代对象
// 本例中 fibonacci 是ForkJoinTask类的后代对象。
Integer result = forkJoinPool.invoke(fibonacci);
// 任务执行完毕 的时间
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
// 定义Fibonacci数列方法。继承自RecursiveTask泛型类
// Fibonacci类的定义采用的是成员内部类的形式
static class Fibonacci extends RecursiveTask<Integer> {
// 成员变量
final int n;
// 构造方法
Fibonacci(int n) {
this.n = n;
}
// compute抽象方法重写 任务的主要计算量
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
}
阻塞队列BlockingQueue
概念
- 阻塞队列与普通队列的区别:当队列是空的时候,从队列中获取元素的 *** 作将会阻塞;当队列是满的,往队列里面添加元素的 *** 作会被阻塞。
- 是java.util.concurrent包提供的接口,常用于多线程编程中容纳任务队列。
- 适用阻塞队列的好处:不需要关心什么阻塞线程、什么时候唤醒线程,阻塞队列都帮助我们考虑了。
| 类名 | 说明 |
|---|---|
| ArrayBlockingQueue | 数组结构组成的有界阻塞队列 |
| LinkedBlockingQueue | 链表结构组成的有界阻塞队列 |
| LinkedTransferQueue | 链表结构组成的无界阻塞队列,和SynchronousQueue类似,还含有非阻塞方式 |
| LinkedBlockingDeque | 链表解耦组成的双向阻塞队列 |
| SynchronousQueue | 不存储元素的阻塞队列,即直接提交给线程不保持它们 = 单个元素的队列 |
| PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
| DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 |
| 方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) 阻塞队列慢,再插入元素抛出IllegalStateException:Queue full… | offer(e) 成功返回true,失败返回false | put(e) 阻塞队列满,生产线程持续执行put *** 作,队列会阻塞生产线程直到put数据成功或响应中断退出 | offer(e,time,unit) 阻塞队列满,队列会阻塞生产线程一定时间,超时后生产线程会退出 |
| 移除 | remove() 阻塞队列为空,执行remove *** 作会抛出NoSuchElementException | poll() 成功返回队列元素,队列没有则返回null | take() 阻塞队列为空,消费者线程从队列里take元素,队列会一直阻塞消费者线程直到队列可用 | poll() 当队列为空,队列会阻塞消费者线程一段时间,超过时限后消费者线程退出 |
| 检查 | element() 阻塞队列为空,执行element会抛出NoSuchElementException异常 | peek() 返回队列的第一个元素 | 不可用 | 不可用 |
- 使用LinkedBlockingQueue< E > 实现线程同步。
LinkedBlockingQueue< E > 是一个基于已连接节点的,范围任意的阻塞队列。
该队列按照先进先出排序元素。
队列的头部是在队列中时间最长的元素,队列的尾部是在队列中时间最短的元素,新元素插入到队列的尾部。
获取队列元素的 *** 作只会获取头部元素,如果队列满了或者为空会进入阻塞状态。 - 常用方法:
| 方法名 | 说明 |
|---|---|
| LinkedBlockingQueue() | 创建一个容量为Integer.MAX_VALIE的LinkedBlockingQueue |
| put( E e ) | 在队尾添加一个元素,如果队列满则阻塞当前线程,直到队列有空位 |
| size() | 返回列表中的元素个数 |
| take() | 移除并返回头部元素,如果队列空则阻塞当前线程,直到取到元素为止 |
在JUC包中。
public interface Future< V >
Future模式很好的解决了那些需要返回值的异步调用。
Future表示异步计算的结果。提供方法来检查计算是否完成,等待其完成,并检索计算结果。只有当计算完成之后,才能使用get方法检索结果;如果需要则阻塞,直到准备就绪。
Future模式本质上是代理模式的一种实际应用。
经常使用Future的场景:计算密集场景、处理大数据量、远程方法调用;
当执行一个长时间运行的任务时,使用Future就可以让我们暂时去执行其他任务,等长任务执行完毕后再返回其结果。
| 组成 | 说明 |
|---|---|
| Main | 系统启动,调用Client发出请求 |
| Client | 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData |
| Data | 返回数据的接口 |
| FutureData | Future数据,构造快,虚拟数据,需要装配RealData(订单) |
| RealData | 真实数据,构造慢(午餐) |
图示:
package offer2.Test53.tast1;
// 创建Data
public interface Dataxhj {
// 抽象类
public String getResult();
}
// 创建继承自Dataxhj接口的真实实现类
public class RealDataXhj implements Dataxhj{
// protected受保护的变量;
protected final String result;
// 带参构造方法
public RealDataXhj(String para){
StringBuffer sb = new StringBuffer(para);
// 假设这里很慢,构造一个RealDataXhj不是一个容易的事
result = sb.toString();
}
@Override
public String getResult() {
return result;
}
}
// future模式的核心
public class FutureData implements Dataxhj {
// 创建RealDataXhj对象
protected RealDataXhj realDataXhj = null;
protected boolean isReady = false;
// 设置RealDataXhj对象
public synchronized void setRealDataXhj(RealDataXhj realDataXhj){
if(isReady){
return;
}
this.realDataXhj = realDataXhj;
isReady = true;
// RealDataXhj已经被注入,通知getResult,唤醒线程
notifyAll();
}
// 等待RealDataXhj构造完成
@Override
public synchronized String getResult() {
while(!isReady){
try {
// 等待 直到RealDataXhj被注入
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realDataXhj.result;
// RealDataXhj类中的result成员变量
}
}
// 从Client得到DataXhj
public class Client {
public Dataxhj request(final String queryStr){
// 创建FutureData对象
final FutureData future = new FutureData();
// 创建线程并启动
new Thread(){
@Override
public void run() {
// RealData构建很慢,在单独的线程中进行
RealDataXhj realDataXhj = new RealDataXhj(queryStr);
// setRealData的时候会notify等待这个future上的对阿宁
future.setRealDataXhj(realDataXhj);
}
}.start();
System.out.println(future.getResult());
// FutureData会立即返回,不会等待RealDataXhj被构造完
return future;
}
}
public class Mainxhj {
public static void main(String[] args) {
Client client = new Client();
//这里会立即返回,因为得到的是FutureData而不是RealData
Dataxhj dataxhj = client.request("xhjname");
System.out.println("请求完毕");
try {
//这里可以用一个sleep代替了对其他业务逻辑的处理
//在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间
Thread.sleep(2000);
} catch (InterruptedException e) {
// e.printStackTrace();
}
//使用真实的数据,如果到这里数据还没有准备好,getResult()会等待数据准备完,再返回
System.out.println("数据 = " + dataxhj.getResult());
}
}
| 问题:2022/5/3 |
|---|
| 输出结果 dataxhj.getResult显示内容为空。 个人理解输出结果是:数据 = xhjname |
| 解决:RealData的构造方法并没有使用传入参数para,new StringBuilder(para) 改为这种形式就好了 |
JDK内部有一个Future接口,类似之前案例中的订单。
常用方法:
| 方法名 | 说明 |
|---|---|
| boolean cannel(boolean manInterruptIfRunning) | 尝试取消执行此任务 |
| V get() | 等待计算完成,然后检索其结果 |
| V get(long timeout,TimeUnit unit) | 如果需要等待最多在给定的时间计算完成,然后索引其结果 |
| boolean isCancelled() | 如果此任务在正常完成之前被取消,返回true |
| boolean isDone() | 如果此任务完成,则返回true |
案例:2022/5/3不是很理解
package offer2.Test53.tast1;
// 创建Data
public interface Dataxhj {
// 抽象类
public String getResult();
}
// 创建继承自Dataxhj接口的真实实现类
public class RealDataXhj implements Dataxhj, Callable {
// protected受保护的变量;
protected final String result;
// 带参构造方法
public RealDataXhj(String para){
StringBuffer sb = new StringBuffer(para);
// 假设这里很慢,构造一个RealDataXhj不是一个容易的事
result = sb.toString();
}
@Override
public String getResult() {
return result;
}
@Override
public Object call() throws Exception {
return result;
}
}
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个固定长度的线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
final Future<?> future = executor.submit((Callable) new RealDataXhj("xhj"));
System.out.println("请求完毕,数据准备中");
try {
// 仍旧可以做额外的数据 *** 作,这是使用sleep代替
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数据 = " + future.get());
}
}
happens-before
happens-before是JMM最核心的概念。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)