Java JUC

Java JUC,第1张

Java JUC 一、JUC 1. 什么是JUC

java.util.concurrent 包下的类:

2. 线程 和 进程
  • 一个进程往往可以包含多个线程,至少包含一个;
  • Java 默认有2个线程,mian、GC;
  • Java 本身不可以开启线程;
// 调用本地方法,底层是C++,Java无法直接 *** 作硬件
private native void start0();
2.1 并发 和 并行

并发编程的本质:多线程 *** 作同一个资源,充分利用CPU的资源;

  • 并发:CPU单核,多个线程,快速交替执行;
  • 并行:CPU多核,多个线程,可以同时执行;
// CPU密集型、IO密集型
System.out.printf("获取CPU核数: %srn", Runtime.getRuntime().availableProcessors());
2.2 线程有几个状态
public enum State {
	// 新生
	NEW,
	// 运行
	RUNNABLE,
	// 阻塞
	BLOCKED,
	// 等待
	WAITING,
	// 超时等待
	TIMED_WAITING,
	// 终止
	TERMINATED;
}
2.3 wait 和 sleep 区别 2.3.1 来自不同的类

wait => Object 类;
sleep => Thread 类;

2.3.2 关于锁的释放

wait 会释放锁;
sleep 不会释放锁;

2.3.3 使用的范围不同

wait 必须在同步代码块中;
sleep 可以在任何地方;

2.3.4 是否要捕获异常

wait 需要捕获异常;
sleep 需要捕获异常;

3. synchronized 和 Lock 锁区别
  1. synchronized 是Java内置的关键字,Lock 是一个Java类;
  2. synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁;
  3. synchronized 会自动释放锁,Lock 必须要手动释放锁;如果不释就死锁;
  4. synchronized 线程1 获得锁后阻塞,线程2一直等待,傻傻的等;
    Lock 锁就不一定会等待下去;
  5. synchronized 可重入锁,不可以中断的,非公平;
    Lock 可重入锁,可以 判断锁,非公平(可以设置公平锁);
  6. synchronized 适合锁少量的代码同步问题;
    Lock 适合锁大量的同步代码;
3.1 synchronized
public class Ticket1 {

    private int count = 20;

    
    public synchronized void sale() {
        if (count > 0) {
            System.out.printf("线程%s, 卖出第 %s张票rn", Thread.currentThread().getName(), count--);
        }
    }
}
public static void main(String[] args) {
    // synchronized
    Ticket1 ticket = new Ticket1();

    new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "A").start();
    new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "B").start();
    new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "C").start();
}
3.2 Lock 锁
public class Ticket2 {

    private int count = 20;
    private Lock lock = new ReentrantLock();

    
    public void sale() {
        // 加锁
        lock.lock();
        try {
            if (count > 0) {
                System.out.printf("线程%s, 卖出第 %s张票rn", Thread.currentThread().getName(), count--);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
	// Lock
  	Ticket2 ticket = new Ticket2();
  	
  	new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "A").start();
    new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "B").start();
    new Thread(() -> {
        for (int i = 0; i < 12; i++) ticket.sale();
    }, "C").start();
}
4. Lock 接口 4.1 可重入锁(常用)

ReentrantLock

4.1.1 非公平锁(默认)

可以插队;

public ReentrantLock() {
    sync = new NonfairSync();
}
4.1.2 公平锁

不可以插队;

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
4.2 读写锁

ReentrantReadWriteLock.WriteLock 写锁(独占锁)一次只能被一个线程占有;
ReentrantReadWriteLock.ReadLock 读锁(共享锁)多个线程可以同时占有;
读-读 可以共存;
读-写 不能共存;
写-写 不能共存;

public class MyCacheLock {

    private volatile Map map = new HashMap<>();
    // 读写锁
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    
    public void put(String key, Object val) {
        // 写锁
        lock.writeLock().lock();
        try {
            System.out.printf("线程: %s, 写入: %s, 开始rn", Thread.currentThread().getName(), key);
            map.put(key, val);
            System.out.printf("线程: %s, 写入: %s, 结束rn", Thread.currentThread().getName(), key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void get(String key) {
        // 读锁
        lock.readLock().lock();
        try {
            System.out.printf("线程: %s, 读取: %s, 开始rn", Thread.currentThread().getName(), key);
            map.get(key);
            System.out.printf("线程: %s, 读取: %s, 结束rn", Thread.currentThread().getName(), key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();
        }
    }
}
public static void main(String[] args) {
    // 加读写锁
    MyCacheLock cache = new MyCacheLock();
    
    // 写入
    for (int i = 0; i < 3; i++) {
        String str = String.valueOf(i);
        new Thread(() -> {
            cache.put(str, str);
        }, String.valueOf(i)).start();
    }
    // 读取
    for (int i = 0; i < 3; i++) {
        String str = String.valueOf(i);
        new Thread(() -> {
            cache.get(str);
        }, String.valueOf(i)).start();
    }
}
4.5 自旋锁 4.6 死锁 5. 生产者 和 消费者 问题 5.1 synchronized
public class Ticket1 {

    private int count = 0;

    
    public synchronized void increment() {
        // while 防止虚假唤醒
        while (count != 0) {
            // 等待
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        count++;
        System.out.printf("线程%s, count = %drn", Thread.currentThread().getName(), count);
        //  *** 作完毕,通知其他线程
        this.notifyAll();
    }

    
    public synchronized void decrement() {
        // 防止虚假唤醒
        while (count == 0) {
            // 等待
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        count--;
        System.out.printf("线程%s, count = %drn", Thread.currentThread().getName(), count);
        //  *** 作完毕,通知其他线程
        this.notifyAll();
    }
}
public static void main(String[] args) {
	// synchronized
	 Ticket1 ticket = new Ticket1();
	 
	 new Thread(() -> {
	     for (int i = 0; i < 1000; i++) ticket.increment();
	 }, "A").start();
	 new Thread(() -> {
	     for (int i = 0; i < 1000; i++) ticket.decrement();
	 }, "B").start();
	 new Thread(() -> {
	     for (int i = 0; i < 1000; i++) ticket.increment();
	 }, "C").start();
	 new Thread(() -> {
	     for (int i = 0; i < 1000; i++) ticket.decrement();
	 }, "D").start();
}
5.2 Lock

Condition 精准的通知和唤醒线程;

public class Ticket2 {

    private int count = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    
    public void increment() {
        lock.lock();
        try {
            while (count != 0) {
                // 等待
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count++;
            System.out.printf("线程%s, count = %drn", Thread.currentThread().getName(), count);
            //  *** 作完毕,通知其他线程
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    
    public void decrement() {
        lock.lock();
        try {
            while (count == 0) {
                // 等待
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count--;
            System.out.printf("线程%s, count = %drn", Thread.currentThread().getName(), count);
            //  *** 作完毕,通知其他线程
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    // LOck
    Ticket2 ticket = new Ticket2();
    
    new Thread(() -> {
        for (int i = 0; i < 1000; i++) ticket.increment();
    }, "A").start();
    new Thread(() -> {
        for (int i = 0; i < 1000; i++) ticket.decrement();
    }, "B").start();
    new Thread(() -> {
        for (int i = 0; i < 1000; i++) ticket.increment();
    }, "C").start();
    new Thread(() -> {
        for (int i = 0; i < 1000; i++) ticket.decrement();
    }, "D").start();
}
5.3 Condition
public class Ticket3 {

    private int count = 1;// 1A, 2B, 3C
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();

    public void printA() {
        lock.lock();
        try {
            while (count != 1) {
                // A等待
                conditionA.await();
            }
            System.out.printf("线程%s, printArn", Thread.currentThread().getName());
            // 唤醒B
            count = 2;
            conditionB.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB() {
        lock.lock();
        try {
            while (count != 2) {
                // B等待
                conditionB.await();
            }
            System.out.printf("线程%s, printBrn", Thread.currentThread().getName());
            // 唤醒C
            count = 3;
            conditionC.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC() {
        lock.lock();
        try {
            while (count != 3) {
                // C等待
                conditionC.await();
            }
            System.out.printf("线程%s, printCrn", Thread.currentThread().getName());
            // 唤醒A
            count = 1;
            conditionA.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
	// 线程ABC,按顺序执行
    Ticket3 ticket3 = new Ticket3();
    
    new Thread(() -> {
        for (int i = 0; i < 10; i++) ticket3.printA();
    }, "A").start();
    new Thread(() -> {
        for (int i = 0; i < 10; i++) ticket3.printB();
    }, "B").start();
    new Thread(() -> {
        for (int i = 0; i < 10; i++) ticket3.printC();
    }, "C").start();
}
6. 不安全的集合类 6.1 List
public class ListMain {

    
    public static void main(String[] args) {
//        List list = new ArrayList<>();
//        List list = new Vector<>();
//        List list = Collections.synchronizedList(new ArrayList<>());
        
        List list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}
6.2 Set
public class SetMain {

    
    public static void main(String[] args) {
//        Set set = new HashSet<>();
//        Set set = Collections.synchronizedSet(new TreeSet<>());
        Set set = new CopyOnWriteArraySet<>();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(set);
            }, String.valueOf(i)).start();
        }
    }
}
6.2.1 Set 底层是 Map
// 不变得值
private static final Object PRESENT = new Object();

public boolean add(E e) {
	//  Map key是无法重复的
    return map.put(e, PRESENT)==null;
}
6.3 Map
public class MapMain {

    
    public static void main(String[] args) {
        // initialCapacity 初始容量,loadFactor 加载因子
//        Map map = new HashMap<>(16, 0.75F);
//        Map map = Collections.synchronizedMap(new HashMap<>());
        Map map = new ConcurrentHashMap<>();
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(map);
            }, String.valueOf(i)).start();
        }
    }
}
7. Callable
  1. 可以有返回值;
  2. 可以抛出异常;
  3. 方法不同,run()/call();
public class CallableMain {

    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        FutureTask task = new FutureTask<>(myThread);
        new Thread(task, "A").start();
        // 2. A、B两个线程,只会调用一次,因为结果会被缓存,提高效率
        new Thread(task, "B").start();
        try {
            // 1. get()方法获取结果可能产生阻塞
            Integer integer = (Integer) task.get();
            System.out.printf("call()返回值: %d", integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class MyThread implements Callable {

    @Override
    public Integer call() throws Exception {
        System.out.println("call()");
        TimeUnit.SECONDS.sleep(1);
        return 1024;
    }
}
8. 常用的辅助类(重点) 8.1 CountDownLatch
public class CountDownLatchMain {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.printf("线程: %s Go outrn", Thread.currentThread().getName());
                // countDown()数量-1
                // 数量为0时,唤醒await()方法
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }
        System.out.println("End");
        try {
            // 等待计数器归零,就会被唤醒,然后再向下执行
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Close door");
    }
}
8.2 CyclicBarrier
public class CyclicBarrierMain {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6, () -> {
            // 线程数量达到6,会执行
            System.out.println("、等待结束");
        });
        for (int i = 0; i < 6; i++) {
            int finalI = i;
            new Thread(() -> {
                System.out.printf("线程: %s, 取到: %drn", Thread.currentThread().getName(), finalI);
                try {
                    // 等待线程数量达到6
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
    }
}
8.3 Semaphore
public class SemaphoreMain {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    // acquire()获得,如果满的等待释放
                    semaphore.acquire();
                    System.out.printf("线程: %s, 获得rn", Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // release()释放,唤醒等待的线程
                    semaphore.release();
                    System.out.printf("线程: %s, 释放rn", Thread.currentThread().getName());
                }
            }, String.valueOf(i)).start();
        }
    }
}
10. 阻塞队列

写入:如果队列满了,就必须阻塞等待消费;
读取:如果队列空了,就必须阻塞等待生产;
BlockingQueue 阻塞队列
AbstractQueue 非阻塞队列
Deque 双端队列
SynchronousQueue 同步队列

10.1 ArrayBlockingQueue 阻塞队列 方式抛出异常不抛出异常阻塞等待超时退出生产addofferputoffer(timeout)消费removepolltakepoll查看队首elementpeek-- 10.2 SynchronousQueue 同步阻塞队列

不存储元素,put一个元素,就必须take取出来,否则不能再put;

方式阻塞等待生产put消费take查看队首- 11. 线程池

线程池 = 三大方法 + 七个参数 + 四种拒绝策略;

11.1 三大方法

创建线程池;

// 单个线程池,linkedBlockingQueue,队列长度21亿,可能导致OOM
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 固定5个线程池,linkedBlockingQueue
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 缓存线程池(可伸缩线程池)SynchronousQueue
ExecutorService threadPool = Executors.newCachedThreadPool();
11.2 七个参数
  • corePoolSize: 核心线程池大小
  • maximumPoolSize: 最大线程池大小
  • keepAliveTime: 空闲超时释放时间
  • unit: 超时单位
  • workQueue: 阻塞队列
  • threadFactory: 线程工厂,创建线程(一般不变)
  • handler: 拒绝策略
11.3 四种拒绝策略

AbortPolicy: 中止策略,抛出异常

  • CallerRunsPolicy: 调用方执行策略(超出由main线程执行)
  • DiscardOldestPolicy: 丢弃旧的策略
  • DiscardPolicy: 丢弃策略
11.4 最大线程池大小如何定义?
  • CPU密集型,根据CPU核数,保持CPU效率最高;
  • IO密集型,根据程序IO线程数,大于IO线程数(两倍);
// CPU密集型、IO密集型
System.out.printf("获取CPU核数: %srn", Runtime.getRuntime().availableProcessors());
public static void main(String[] args) {
    // CPU核数
    int availableProcessorCount = Runtime.getRuntime().availableProcessors();

    // 1. 创建线程池(2 + 5 + 3 + 3)
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
            availableProcessorCount, // CPU核数
            3,
            TimeUnit.SECONDS, // 3秒空闲释放
            new linkedBlockingDeque<>(3), // 队列长度3
            Executors.defaultThreadFactory(), // 默认线程工厂
            new ThreadPoolExecutor.DiscardPolicy() // 丢弃策略
    );
    // 2. 使用线程池
    try {
        // 最大承载 = maximumPoolSize + Deque
        for (int i = 0; i < 9; i++) {
            int finalI = i;
            threadPool.execute(() -> {
                System.out.printf("线程: %s, %drn", Thread.currentThread().getName(), finalI);
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 3. 关闭线程池
        threadPool.shutdown();
    }
}
12. 异步回调 12.1 无返回值
public static void main(String[] args) {
    // 无返回值
    CompletableFuture future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("线程: %s, runAsyncrn", Thread.currentThread().getName());
    });
    System.out.println("futureEnd");
    try {
        // 阻塞等待
        System.out.println(future.get());
        System.out.println("End");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
12.2 有返回值
public static void main(String[] args) {
    // 有返回值
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        // 错误
//       int i = 1 / 0;
        System.out.printf("线程: %s, supplyAsyncrn", Thread.currentThread().getName());
        return 1024;
    });
    System.out.println("futureEnd");
    try {
    	// 阻塞等待
        Integer integer = future.whenComplete((t, u) -> {
            System.out.printf("t: %s, u: %srn", t, u);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return -1;
        }).get();
        System.out.println(integer);
        System.out.println("End");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
13 JMM

Java内存模型,不存在的东西,概念约定!

13.1 关于JMM的一些同步的约定
  1. 线程解锁前,必须把共享变量立刻刷回主存;
  2. 线程加锁前,必须读取主存中的最新值到工作内存中;
  3. 加锁和解锁是同一把锁;
13.2 JMM8种 *** 作
  • Java内存模型定义了8种 *** 作来完成主内存与工作内存的读写交互,虚拟机实现保证每一种 *** 作都是原子的,不可再分的;
  • 对于double和long类型的变量来说,load、store、read和write *** 作在某些平台上允许例外;
*** 作描述Lock锁定作用于主内存变量,将变量标志为一条线程所独占Unlock解锁作用于主内存变量,将处于锁定的变量释放出来Read读取作用于主内存变量,它将一个变量的值从主内存传输到线程的工作内存中Load载入作用于工作内存变量,它把从主内存读取的变量值放入工作内存的副本中Use使用作用于工作内存变量,将工作内存变量值传递给执行引擎Assgin赋值作用于工作内存变量,将执行引擎的值传递给工作内存的变量Store存储作用于工作内存变量,它把工作内存变量传递到主内存中Write写入作用于主内存变量,把Store *** 作从工作内存得到的变量值放入主内存变量中 14. volatile
  • 保证可见性
  • 不保证原子性
  • 禁止指令重排(源码 -> 编译器优化的重排 -> 指令并行可能会重排 -> 内存系统会重排 -> 执行)
15. CAS 16. 单例模式 二、扩展

面试的:单例模式、排序算法、生产者和消费者、死锁

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/4828735.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-11-10
下一篇2022-11-10

发表评论

登录后才能评论

评论列表(0条)

    保存