从线程通信到阻塞队列

从线程通信到阻塞队列,第1张

什么是线程通信

对同一资源的 *** 作有不同种类的线程。直白点说就是:共享资源+多线程。
现实生活中的例子:生产者生产物品,消费者购买物品,生产者和消费者分别为两个线程,同时对物品进行 *** 作,一般情况下互不影响。如果物品没了,消费者就无法进行购买,需要"告诉"生产者生产,生产者生产物品,达到一点的数量,"告诉"消费者可以购买了,这种线程间的相互调度,就是线程间的通信。

实现线程间的通信 1.等待唤醒机制:wait/notify

等待唤醒机制底层维护了线程队列,避免了多线程同时自旋造成的cpu资源浪费,有点空间换时间的意思。当一个生产者无法再生产物品时,就让他在队列中休眠(阻塞),此时的生产者线程会释放cpu资源,等待消费者抢到cpu的执行权购买物品,再由消费者唤醒生产者进继续行生产。
举个栗子:
生产者去店里看了一下货架上还有货,就不管它了,去睡觉去了,等到货卖完了之后,会有消费者过来叫醒它,让它补货。
java实现wait/notify:

public class wait_notify_Queue<T> {

    //容器,用于生产者和消费者对物品进行 *** 作
    private final LinkedList<T> queue = new LinkedList<>();

    //生产 *** 作
    public synchronized void put(T resource) throws InterruptedException {
        while (queue.size()>=1){
            //店里有货时,不需要生产,就去睡觉了
            System.out.println("生产者:有货,不生产");
            this.wait();
        }
        //没货就生产
        System.out.println("生产者:生产物品:"+resource);
        queue.addFirst(resource);
        this.notify();//叫醒消费者
    }

    //购买 *** 作
    public synchronized void take() throws InterruptedException {
        while (queue.size()<=0){
            //店里没有货,等待生产者生产
            System.out.println("消费者:没有货,啷个买嘛");
            this.wait();
        }
        //有货进行购买
        System.out.println("消费者:购买物品");
        queue.removeLast();
        //叫醒生产者
        this.notify();
    }
}

测试

public class test01 {

    public static void main(String[] args) {
        //创建一个线程队列
        wait_notify_Queue<String> queue = new wait_notify_Queue<>();

        //生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.put("物品:"+i+"生产");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        //消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

结果

用synchronized保证了原子性,wait和notify实现了等待唤醒机制。
这样会有一个问题,我们不妨再加一个生产者看一下

整个程序的线程都阻塞了。
原因:在synchronized机制下,所有的等待线程都在同一个队列中,而notify是随机唤醒线程,这就有可能会发生,生产者一号生产完物品,唤醒生产者二号后去睡觉了,这时候生产者二号看有货,也去睡觉了,也就造成了全部的线程都睡觉去了,程序就卡住了。
解决的方法是改用notifyAll,把所有的线程都唤醒,然后所有的线程一起参加执行权的争夺,这样每一个线程在进入阻塞之前,会唤醒所有的线程!

等待唤醒机制2:condition

wati/notify版本的缺点是随机唤醒线程,可能会发生己方唤醒己方,导致线程全部阻塞的问题;wait/notifyAll版本解决了全部阻塞的问题,但对于唤醒的对象不明确,会造成全部线程抢占执行权(实际上我们只需要唤醒敌方线程即可)。
可是使用ReentrantLock中的condition 代替synchronized的wati/notify:

public class ConditionQueue<T> {

    //商店,用于消费者和生产者 对物品的 *** 作
 private final   LinkedList<T> queue =  new LinkedList<>();

   //显示锁(相对地,synchronized锁被称为隐式锁)
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition producerCondition = lock.newCondition();
    private final Condition consumerCondition = lock.newCondition();

   //生产者
    public  void put(T resource){
        lock.lock();//加锁
        try {
            while (queue.size()>=1){
                //店里有货时,不需要生产,就去睡觉了
                System.out.println("生产者:有货,不生产");
                //阻塞生产者
                producerCondition.await();
            }
            //没货就生产
            System.out.println("生产者:生产物品:"+resource);
            queue.addFirst(resource);
            //生产完毕,唤醒消费者
            consumerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }

    //消费者
    public void take(){
        lock.lock();//加锁
        try {
            while (queue.size()<=0){
                //店里没有货,等待生产者生产
                System.out.println("消费者:没有货,啷个买嘛");
                //消费者阻塞
                consumerCondition.await();
            }
            //有货进行购买
            System.out.println("消费者:购买物品");
            queue.removeLast();
            //唤醒生产者
            producerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
Condition

如何理解Condition?
可以认为lock.newCondition()创建了一个队列,调用producerCondition.wati()方法,把生产者放入到了生产者的阻塞队列中,当消费者调用producerCondition.signal()方法,会从生产者队列中唤醒一个生产者线程。
也就是说每一个condition都会创建一个等待队列,可以分别存储生产者和消费者线程,从而实现精确唤醒。

阻塞队列

上面所进行的线程间的通信,会发现都采用了阻塞队列实现的,我们先构造一个queue队列,然后生产者和消费者直接 *** 作queue,至于是否阻塞,有queue内部判断。

BlockingQueue


Queue和List其实很像,也是集合的一个分支罢了:
我们听到阻塞队列(ArrayBlockingQueue)觉得很神秘,听起来比Arryalist厉害,可以看看上图的继承体系,如果ArrayBlockingQueue没有实现BlockingQueue接口,也就是一个普通的队列.

那么阻塞队列是怎么实现的呢?
以ArrayBlockingQueue为例,上图继承体系中,queue和blockingqueue都是接口,不可能实现阻塞的,而AbstractQueue里面几乎什么都没有写

那就只能是ArrayBlockingQueue本身实现了阻塞的功能了

可以看到,ArrayBlockingQueue定义了三个成员变量,是不是有点熟悉,就是用ReentrantLock中的Condition来进行阻塞的

满了就await(),没满就enqueue()。
有了awati()方法,可是没有看到singal()方法,可以猜想到,应该是在enqueue()方法中:

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/904935.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-15
下一篇2022-05-15

发表评论

登录后才能评论

评论列表(0条)

    保存