
一、Thinking in Java
引入:顺序任务与并行任务并行一定快吗? *** 作系统级别的并发抢占式与协作式Java的线程机制
任务定义通过Executor定义任务
newCachedThreadPool与 newFixedThreadPoolCallable 线程优先级Thread.yield()后台线程、守护线程线程工厂:MyThreadFactoryjoin()线程异常的捕获 共享受限资源与线程安全问题原子性与volatile线程的中止
线程的四种状态:进入阻塞状态:线程的中断检查中断 线程的协作线程死锁的四个条件CountDownLatchCyclcBarrier:一组并行任务并行执行。
一、Thinking in Java本人在学习Thinking in Java一书中,初识多线程,学习过程中总结了一些有用的知识,在此做个笔记。
引入:顺序任务与并行任务我们编程问题大部分的问题都可以通过顺序问题解决。但是,对于某些问题,如果采用并行执行,将会大大提高处理速度。什么情况下使用并行编程呢?如,程序的中的某一片段被阻塞了(通常是由于IO阻塞或远程请求阻塞)。考虑阻塞的时候,是不是可以让程序继续执行?哪些部分可以执行?这就是并行执行。
并行一定快吗?然而任务的并行未必一定会提高程序处理速度。在单处理器上,单处理器只有单个CPU,任务并行执行过程中,需要切换任务,并保存当前任务的执行状态,这部分花费的时间就是上下文的切换。那么总结起来就是说:如果任务没有阻塞,并且机器是单处理的使用并发就没有优势可言。
*** 作系统级别的并发 *** 作系统使用的并发处理问题是最理想也是最安全的,因为 *** 作系统上的进行是运行在自己的地址空间中的,任务之间也不会相互干涉,同时 *** 作系统也会周期性的将CPU从一个进程切换到另一个进程。
某一些编程语言被设计为可以并发将任务彼此隔离,这些语言被称为函数型语言,每个函数的调用都不会干涉其他函数,Erlang就是这样的语言,同时它也包含了针对任务之间彼此通信的安全机制。
Java是一种传统的顺序型语言,在顺序型语言的基础上提供了对线程的支持。在Java中线程表示单一进程中的任务。
抢占式:表示调度机会周期性的中断线程,将上下文切换到另外一个线程,从而每个线程都能分得时间片去执行它的任务。被动的礼让
协作式:每个任务会自动地放弃对对处理器的控制,这要求程序员有意识地在每个任务中插入某种让步语句。一种主动的礼让。
多线程编程的本质是:将一整段程序划分为多个分离的,能够独立运行的任务。然后将每个任务有单独的执行线程来完成。一个线程就是在进程中的一个的一个单一的顺序控制流。因此,单个进程可以拥有多个线程,每个线程都好像有自己的CPU一样,但其实是调度器周期性将CPU的执行时间分配给每个线程。
所以,现在编程中关键的任务是如何将总的需求划分成单独的小的任务。
// 定义程序中的片段任务
public class LiftOff implement Runable{
protected int countDown = 10;
public void run(){
while(countDown--> 0){
System.out.print(countDown);
Thread.yield();
}
}
}
// 将任务交给线程,这里的run()不是由单独的线程驱动的,而是由main()所属的线程直接调用。
public class MainThread{
public static void main(String[] args){
new LiftOff().run();
}
}
//通过Thread驱动任务
public class BasicThreads{
public static void main(String[] args){
Thread t = new Thread(new LiftOff());
// 通过start来驱动任务
t.start();
}
}
通过Executor定义任务
Java SE5的java.util.concurrent包中的执行器(Executor)能够管理Thread对象
Executor执行器在客户端和任务执行之间提供了一个中间层ExecutorService ,这个中间层采用execute()执行任务。
ExecutorService通过静态方法来获取。
源代码:
//源代码中Executor执行器还是通过ThreadPoolExecutor来创建线程的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue(),
threadFactory);
}
newCachedThreadPool与 newFixedThreadPool
//通过Executor类
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
//shutdown:关闭ExecutorService对象。防止新任务提交给这个ExecutorService
}
}
Callable
对于网络请求有需要一定的返回结果的任务,是实现Callable泛型接口,泛型为返回类型。将接口提交给Executor的submit()。
submit()方法返回一个Future对象。通过Future对象的isDone()判断当前任务是否完成。get()方法在结果返回之前将会阻塞。
public class TaskWithResult implements Callable{ private int id ; public TaskWithResult(int id) { this.id = id; } @Override public String call() throws Exception { return "result of TaskWithResult " + id; } }
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(exec.submit(new TaskWithResult(i)));
}
for (Future fs : results) {
try {
System.out.println("isDone: " + fs.isDone());
System.out.println(fs.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
exec.shutdown();
}
}
}
}
线程优先级
线程的优先级:线程的优先级将传递给调度器,调度器将会倾向于让优先级最高得线程先执行。JDK有10个优先级,但是它与 *** 作系统得优先级不能很好的映射。
Thread.yield()调用Thread.yield()得线程给调度器一个暗示:我的工作已经做的差不多了,可以让给别的线程使用CPU了。这只是一个暗示,该暗示调度器未必一定会采纳。
yield():与sleep一样方法不释放当前对象的锁。
守护线程:也称为后台线程。程序运行过程中在后台提供得一个通用服务。如果所有的非后台进程结束后,程序也就终止了。
在线程启动之前调用setDaemon()方法将当前线程设置为后台线程。并且后台线程的所有子线程都为后台线程。
后台进程的不会执行try catch finally中的finally块,就会中止其run()。
public class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
线程工厂:MyThreadFactory
给线程定义自定义的名字,线上跟踪问题日志容易跟踪。
public class MyThreadFactory implements ThreadFactory {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String prefix;
private final boolean daemoThread;
private final ThreadGroup threadGroup;
public MyThreadFactory() {
this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);
}
public MyThreadFactory(String prefix) {
this(prefix, false);
}
public MyThreadFactory(String prefix, boolean daemo) {
this.prefix = StringUtils.isNotEmpty(prefix) ? prefix + "-thread-" : "";
daemoThread = daemo;
SecurityManager s = System.getSecurityManager();
threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
String name = prefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(threadGroup, runnable, name, 0);
ret.setDaemon(daemoThread);
System.out.println("形成了一个线程 : " + mThreadNum);
return ret;
}
}
// 开启了十个线程执行任务
ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(resultRow.size() * 2), new MyThreadFactory("invoicemanage"));
join()
a.join()是线程插队的意思。在主线程执行过程中,调用a.join();表示在a线程执行完后,主线程才会继续向下执行。
public class ThreadJoin {
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
testJoin();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
testJoin();
}
}, "t2");
t1.start();
t2.start();
try {
//t1.join();
//t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程结束");
}
public static void testJoin() {
System.out.println(Thread.currentThread().getName() + "插队");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "结束");
}
}
可以看到在主线程结束后,t1、t2还没结束
将t1.join();与t2.join();解除注释,输出结果如下:
异常是不能跨线程传播的。一旦异常逃出了任务的run()方法,传播到控制台,就不太好获取了。
public class ExceptionThread implements Runnable {
@Override
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService executorService
= Executors.newCachedThreadPool();
executorService.execute(new ExceptionThread());
}
}
怎么样捕获线程中的异常呢?我可以通过Executor产生线程的方式。即ThreadFactory。通过给ThreadFactory的setUncaughtExceptionHandler();设置捕获异常的自定义接口。这样每个创建的Thread都会附着一个Thread.UncaughtExceptionHandler。
class ExceptionThread2 implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by" + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(" caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + r);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
}
共享受限资源与线程安全问题
线程安全问题:是指其他线程读取了未提交的数据,即脏读!
如果单个线程的任务之间不存在共享资源,那么不会出现线程安全的问题。
对于存在共享资源竞争的并发问题,都是采用序列化访问共享资源的,即同一时间不能有两个线程修改同一资源。为了实现这一方案,Java可以通过synchronized关键字修饰方法或静态代码块完成。
如果有超过一个线程在处理某一个共享变量,必须用synchronized同步所有的访问或修改方法。
一个任务可以多次获得对象的锁,如果一个方法在同一个对象上调用了第二个加锁方法,锁的计数就会+1.只有当锁的计数为零的时候,其他线程才可以继续使用此资源。
安全的线程 *** 作:
1、复制与返回值是原子性的,没有发生中断的可能,不存在中间态。但是为了保证不同线程之间的可视性,还是要把变量声明为volatile
2、
不安全的线程 *** 作:
1、数值的递增
1、Java不是原子性的递增 *** 作,涉及到增和写。volatile告诉编译器不要执行任何读取和写入的优化。
2、如果一个域被声明为volatile,那么只要这个域中的产生了写的 *** 作,那么所有的读 *** 作都能看到这个修改。即便用了本地缓存也会如此,violate域会立即刷新到主存,而读取 *** 作就发生在主内存中。
3、在32位机器上,对于long和double类型的共享变量读取和写入时两个分离的32位 *** 作,这就产生了读取和写入 *** 作的中间发生了上下文切换,从而可能被不同的任务看到错误的结果。应该添加volatile,提供更强的原子性。
synchronized与volatile总结点:
synchronized添加在方法或代码片段上,volatile添加在域上,两者在代码中是否必须添加有一定的影像
如果一个域是有单个线程 *** 作,那么不需要设置volatile,方法也不需要synchronized。
如果一个域是能够有多个线程 *** 作(读取与写入),那么就需要将对该域进行读取和写入的方法添加synchronized。在读取方法上添加是防止脏读,在写入的方法上添加是为了防止多个线程同时对其写入,写入的值相互覆盖。如果一个域有多个线程
下面这个例子:
EvenGenerator中的generator是一个共享资源,该资源中的值currentEvenValue,isCancel是判断程序是否结束的标志,是个布尔类型,因此其赋值是原子性的。理论上,当val为奇数时表示程序执行失败程序结束。但是我的代码中是两个currentEvenValue连续自增,即以偶数的等差数列递增,不会在函数外读取到奇数。下面看执行结果:
package chapter21.c21p3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class EvenChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator generator, int id) {
this.generator = generator;
this.id = id;
}
@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(Thread.currentThread().getName() + val + " not even!,generator.isCanceled():" + generator.isCanceled());
generator.cancel();
System.out.println(Thread.currentThread().getName() + val + " generator.isCanceled():" + generator.isCanceled()+ " 应该中止了!");
}
}
}
public static void test(IntGenerator gp, int count) {
System.out.println("Press Control-C to Exit");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
exec.execute(new EvenChecker(gp, i));
}
exec.shutdown();
}
public static void test(IntGenerator gp) {
test(gp, 10);
}
}
public class EvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public int next() {
++currentEvenValue;
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
}
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
public void cancel() {
canceled = true;
}
public boolean isCanceled() {
return canceled;
}
}
执行结果:
理论上到当输出not even!表示程序错误,却输出了多次,才中止。说明存在线程安全问题。怎么解呢
错误之处在于:
1、当前为多线程环境,currentEvenValue修改的方法所有线程都能够同时访问,造成脏读
2、currentEvenValue的自增方法不是原子性的,当前线程新增后,其他线程可能读取了另外一个线程自增一后值。
修正方案一:
将新增因为多个线程都能进行同时调用next()方法,如果在方法上添加synchronized关键字,那么同一时刻只有一个线程修改共享变量,也就消除了潜在的竞争条件。
public class SynchronizedEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
@Override
public synchronized int next() {
++currentEvenValue;
//Thread.yield();
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
}
结果表明程序执行成功:
修正方案二:
采用lock锁对共享资源进行加锁。对程序的自增是只有一个线程可以获取到锁。
public class MuteEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield();
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MuteEvenGenerator());
}
}
修正方案三:
采用原子类AtomicInteger、AtomicLong、AtomicReference特殊的原子类,增加是原子性的
public class IntGeneratorAtomic extends IntGenerator {
private AtomicInteger atomicInteger = new AtomicInteger(0);
private Lock lock = new ReentrantLock();
@Override
public int next() {
return atomicInteger.addAndGet(2);
}
public static void main(String[] args) {
EvenChecker.test(new IntGeneratorAtomic ());
}
}
错误的修改方案四:
public class IntGeneratorAtomicTwo extends IntGenerator {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public int next() {
atomicInteger.addAndGet(1);
return atomicInteger.addAndGet(1);
}
public static void main(String[] args) {
EvenChecker.test(new IntGeneratorAtomicTwo());
}
}
线程的中止
线程的四种状态:
新建:线程被创建时,会短暂的处于该状态。此时,已经分配了必须的线程资源,此刻线程已具备获取CPU时间的资格,之后线程的状态就会变为可运行状态或 阻塞状态。
就绪:在这种状态下,只要调度器把时间片段分配给线程,线程就可以运行。
阻塞:线程能够运行,但有某个条件阻止了它的运行。
死亡:处于死亡的线程将不再是可调度的。
1、调用sleep,使任务进入休眠状态。
2、调用wait()使线程挂起。直到得到notify或notifyAll,或者sign()或signAll()。
3、获取synchronized锁失败。
Thread()类包含的interrupt()方法可以中断被阻塞的线程。
Executor()也封装了对线程的中断 *** 作。通过调用shutDownNow(),它将发送一个interrupt()调用给它所启动的所有线程。
Future对象上可以调用cancel()来取消当前任务。
对于IO线程的线程可以关闭其上的资源,以关闭线程
Thread.interrupted()可以检查线程是否已经中断
线程的协作多个线程之间可能相互协作,完成某项任务。
wait():挂起当前任务。
notify():唤醒持有该锁的某一个线程。在多生产多消费的模式下,会导致死锁,一般用notifyAll。
notifyAll():唤醒持有该锁的所有线程。
sleep():线程睡眠不释放当前对象的锁.
1、互斥条件。任务使用的资源至少一个是不能共享的
2、至少有一个任务必须持有一个资源且正在等待获取当前正在被别的任务所持有的资源。
3、资源不能被任务抢占:任务不能从其他任务抢占资源
4、循环等待:一一个任务等待其他任务所持有的资源,形成一个循环。
可以向CountDownLatch设置一个初始值,任何线程调用wait()方法将会阻塞。直到CountDownLatch的值为零。该初始值可以比线程数量多。
可以通过调用countDown()方法使计数值减一。。
CountDownLatch被设计为只触发一次,计数值不能重置。
package chapter21.c21p7;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for (int i = 0; i < SIZE; i++) {
exec.execute(new TaskPortion(latch));
}
System.out.println("Latched all tasks");
exec.shutdown();
}
}
CyclcBarrier:一组并行任务并行执行。
其构造函数:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
是一个可以重复使用的对象。向CyclicBarrier提供一个栅栏动作,是一个Runnable。当数值到达零的时候自动执行,这是他与CountDownLatch 的一个区别。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)