欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

互联网技术10——queue队列

程序员文章站 2022-07-11 11:53:20
...

  在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue为代表的阻塞队列。两种都是继承了Queue接口。

层次图(hierarchy):(idea中 层次图快捷键:选中后(或在类中空白处)) ctrl+h )

互联网技术10——queue队列

1.concurrentLinkedQueue:

  concurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常concurrentLinkedQueue性能要好于BlockedQueue。它是一个基于链接节点的*线程安全队列。该队列元素遵循先进先出的原则。头是最先加入的,尾是最后加入的,该队列不允许有null元素。

 

  ConcurrentLinkedQueue重要方法:add()、offer()、poll()、peek()

1.  add()和offer()都是加入元素的方法(在concurrentLinkedQueue中,这两个方法没有任何区别,之所以有两个相同的方法,是因为这两个方法都继承自Queue,其他场景可能不一样)。

2.   pool()和peek()都是取头元素,区别在于前者会删除元素,后者不会

验证ConcurrentLinkedQueue是线程安全的:

  

package com.company;

import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Created by BaiTianShi on 2018/8/18.
 */
public class QueueTest {

    public static void main(String[] args) {
         ConcurrentLinkedQueue<String> qu = new ConcurrentLinkedQueue<>();
        qu.add("a");
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(!qu.isEmpty()){
                    System.out.println("进入t1线程");
                    System.out.println("t1获取的元素"+qu.poll());
                }
            }
        },"t1");
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(!qu.isEmpty()){
                    System.out.println("进入t2线程");
                    System.out.println("t2获取的元素"+qu.poll());
                }
            }
        },"t2");
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(!qu.isEmpty()){
                    System.out.println("进入t3线程");
                    System.out.println("t3获取的元素"+qu.poll());
                }
            }
        },"t3");
        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(!qu.isEmpty()){
                    System.out.println("进入t4线程");
                    System.out.println("t4获取的元素"+qu.poll());
                }
            }
        },"t4");
        Thread t5 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(!qu.isEmpty()){
                    System.out.println("进入t5线程");
                    System.out.println("t5获取的元素"+qu.poll());
                }
            }
        },"t5");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();


    }
}

运行结果:

进入t1线程
进入t3线程
进入t5线程
进入t2线程
t5获取的元素null
t3获取的元素null
t1获取的元素a
进入t4线程
t2获取的元素null
t4获取的元素null

 

  可见看到,只有一个能拿到非空的值。其他线程拿不到a。在判断队列是否为空时,不要使用size,因为size要遍历一遍集合,使用isEmpty效率比较高,size()方法见下图。

互联网技术10——queue队列

 

ArrayBlockQueue

  基于数组的阻塞队列实现,在ArrayBlockQueue内部维护里一个定长的数组,以便缓存队列中的数据对象,其内部没有实现读写分离,所以生产和消费不能完全并行。长度是需要定义的,并且可以指定先进先出或先进后出。它是有界队列,以内创建时必须指定长度。

  方法:

   添加元素 add()、put()、offer() 三个方法虽然都是添加元素,但是作用却不同。

    1. put()方法,添加元素时,如果空间不够,将会一直等待,直到有元素被取出。

package com.company;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by BaiTianShi on 2018/8/19.
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String>  st = new ArrayBlockingQueue<String>(5);
        try {
           for(int i=0;i<7;i++){
               st.put(String.valueOf(i));
               System.out.println("添加了第"+i+"个");
           }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果,可以看见,添加到第6个时,线程一直在等待。

互联网技术10——queue队列

   2. add()方法,添加元素时,如果空间不够,这抛出抛出illegalStateException异常,如果有可用空间,则添加成功时返回true。

package com.company;

        import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by BaiTianShi on 2018/8/19.
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String>  st = new ArrayBlockingQueue<String>(5);
            for(int i=0;i<7;i++){
                st.add(String.valueOf(i));
                System.out.println("添加了第"+(i+1)+"个");
            }
      
    }
}

运行结果

互联网技术10——queue队列

 3. offer()方法,添加元素时,有空位置则添加,并且返回ture,没有则返回fase。当向有界队列添加元素的时候,此方法优于add。

    package com.company;
    import java.util.concurrent.ArrayBlockingQueue;

    /**
     * Created by BaiTianShi on 2018/8/19.
     */
    public class ArrayBlockingQueueTest {
        public static void main(String[] args) {
            ArrayBlockingQueue<String>  st = new ArrayBlockingQueue<String>(5);
                for(int i=0;i<7;i++){

                    System.out.println("添第"+(i+1)+"个时返回"+st.offer(String.valueOf(i)));
                }

        }
    }

运行结果:

互联网技术10——queue队列

 

LinkedBlockingQueue  

   通过linkedBlockQueue认识drainTo(List,n); 从队列中取出头部n个元素放到list中;

    package com.company;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.LinkedBlockingQueue;

    /**
     * Created by BaiTianShi on 2018/8/19.
     */
    public class ArrayBlockingQueueTest {
        public static void main(String[] args) {
            LinkedBlockingQueue<String>  st = new LinkedBlockingQueue<String>(5);
            for(int i=0;i<4;i++){
                st.offer(String.valueOf(i));
            }
            List<String> li = new ArrayList<>();
            st.drainTo(li,2);
            for(String l : li){
                System.out.println("取出的"+l);
            }
            for(int j=0;j<5; j++){
                System.out.println("队列中剩余"+String.valueOf(st.poll()));
            }

        }
    }

运行结果:可见取出0和1放到了list当中,

取出的0
取出的1
队列中剩余2
队列中剩余3
队列中剩余null
队列中剩余null
队列中剩余null

同时我们注意到,在这里我们虽然循环了十次,但只放进去0123这四个元素,因为这里指定了队列长度,这样它就是一个有界队列

互联网技术10——queue队列

当我们把这个长度去掉时

package com.company;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by BaiTianShi on 2018/8/19.
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        LinkedBlockingQueue<String> st = new LinkedBlockingQueue<String>();
        for(int i=0;i<10;i++){
            st.offer(String.valueOf(i));
        }
        System.out.println(st.size());



    }
}

运行结果:可见,虽然我们没指定长度,但是循环10次每次都放进去了。改成100甚至1000都可以,这里不做过多演示。

互联网技术10——queue队列

SynchronouseQueue

  下面看个例子,这个例子,貌似SynchronousQueue可以添加元素,如下所示。但是其实SynchronousQueue依然是没有存储元素的,这里之所以没有报错,是因为我们先启动了一个线程t1要消费SynchronousQueue这个队列中的元素,线程t2要向SynchronousQueue队列添加一个元素,这时候会发生什么呢?这时候,线程t2并不会真的把元素添加到队列中,而是直接将要添加的元素交给线程t1了。也就是说,SynchronousQueue队列还是不会真正存储元素的。

    package com.company;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.SynchronousQueue;

    /**
     * Created by BaiTianShi on 2018/8/19.
     */
    public class ArrayBlockingQueueTest {
        public static void main(String[] args) {
            final SynchronousQueue<String> st = new SynchronousQueue<String>();

            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(st.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t1.start();
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        st.put("0");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t2.start();
        }
    }

运行结果

互联网技术10——queue队列

 肯定有些人会有疑问,既然SynchronousQueue不能装任何元素的话,那么要它有何用?还有就是有界队列和*队列的应用场景是什么呢?如下图所示。

 

互联网技术10——queue队列

 

PriorityBlockingQueue

基于优先级的阻塞队列(优先级判断通过构造函数传入的Compatot对象来决定,也就是说传入队列的对象必须实现ComParable接口)。JDK中PriorityBlockingQueue内部控制线程同步的锁采用的是公平锁,它是一个*队列。

代码: task,实现了Comparable的compar方法

package com.company;

/**
 * Created by BaiTianShi on 2018/8/20.
 */
public class Task implements Comparable<Task> {
    private Integer id;
    private String name;


    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public int compareTo(Task Task) {

        return this.id > Task.id ? 1 : (this.id < Task.id ? -1 : 0);
    }
}

 

main方法:

    package com.company;
    import java.util.Iterator;
    import java.util.concurrent.PriorityBlockingQueue;

    /**
     * Created by BaiTianShi on 2018/8/19.
     */
    public class ArrayBlockingQueueTest {
        public static void main(String[] args) throws InterruptedException {
            final PriorityBlockingQueue<Task> st = new PriorityBlockingQueue<Task>();


            Task ta4 = new Task();
            ta4.setId(4);
            ta4.setName("t4");

            Task ta1 = new Task();
            ta1.setId(1);
            ta4.setName("t1");

            Task ta3 = new Task();
            ta3.setId(3);
            ta4.setName("t3");

            Task ta2 = new Task();
            ta2.setId(2);
            ta4.setName("t2");
            st.add(ta3);
            st.add(ta2);
            st.add(ta4);
            st.add(ta1);


            for(Iterator<Task> iterator = st.iterator(); iterator.hasNext();){
                System.out.println(iterator.next().getId());
            }
            Thread.sleep(3000);
            System.out.println(st.poll().getId());
            System.out.println(st.poll().getId());
            System.out.println(st.poll().getId());
            System.out.println(st.poll().getId());

        }
    }

可见,通过iterator遍历时是无效的,但是在取的时候是有序的。这样做的目的是为了避免每次放入元素时都要重新计算排序, 第一次取得时候进行排序,取走一个元素对顺序无任何影响。

 

DelayQueue 

  带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素,DelayQueue中的元素必须实现Delay接口。Delay中的元素必须实现Delay接口,Delay是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接处理的关闭等等。

这里使用DelayQueue模拟一下网吧上网的上机和下机过程,

网民实体类,实现Delayed接口

package com.company;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * Created by BaiTianShi on 2018/8/20.
 */
public class WangMin implements Delayed {

    private Integer id;

    private String name;

    private long endTime;

    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;

    public WangMin(Integer id, String name, long endTime) {
        this.id = id;
        this.name = name;
        this.endTime = endTime;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getEndTime() {
        return endTime;
    }

    public void setEndTime(long endTime) {
        this.endTime = endTime;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    public void setTimeUnit(TimeUnit timeUnit) {
        this.timeUnit = timeUnit;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed de) {
        WangMin w = (WangMin)de;
        return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
    }
}

模拟网吧开始营业及网面来上网的过程代码:

package com.company;

import java.util.concurrent.DelayQueue;

/**
 * Created by BaiTianShi on 2018/8/20.
 */
public class WangBa implements Runnable {
    //延时队列
    private DelayQueue<WangMin> queue = new DelayQueue();
    //是否营业
    private boolean open = true;

    public WangBa(boolean open) {
        this.open = open;
    }

    //上机方法
    //第三个参数是下机时间,上网时长加上当前时间就是下机时间
    public void playStart(String name, Integer id, Integer money){
        WangMin peo = new WangMin(id,name,1000*money+System.currentTimeMillis());
        System.out.println("身份证"+id +"的用户"+name+"已上机");
        queue.add(peo);
    }

    public void playEnd(WangMin wangMin){
        System.out.println("身份证"+wangMin.getId() +"的用户"+wangMin.getName()+"上机时间到,请您下机");
    }



    @Override
    public void run() {
        try {
            while(open){
                WangMin wan = queue.take();
                playEnd(wan);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //开始营业
        final WangBa wangBa = new WangBa(true);
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                wangBa.run();
            }
        });
        t1.start();

        wangBa.playStart("uzi",1,1);
        wangBa.playStart("goGoing",2,3);
        wangBa.playStart("farker",3,5);
        wangBa.playStart("clearLove",4,2);
    }
}

       运行结果:

身份证1的用户uzi已上机
身份证2的用户goGoing已上机
身份证3的用户farker已上机
身份证4的用户clearLove已上机
身份证1的用户uzi上机时间到,请您下机
身份证4的用户clearLove上机时间到,请您下机
身份证2的用户goGoing上机时间到,请您下机
身份证3的用户farker上机时间到,请您下机

从运行状态上看,虽然全部下机了,但是线程仍未结束,根据实际情况,可通过设置while(open)中的open将轮询线程结束,例如使用volatitle修饰open

互联网技术10——queue队列

 

 

相关标签: 多线程