BlockingQueue介绍

简介

BlockingQueue是java.util.concurrent中的类。通过包名也知道它与并发有关。在学习并发的过程中,我相信大家都听过生产者消费者模型。BlockingQueue可以帮助我们来实现这样一个模型。从数据结构的角度来说,BlockingQueue是一个队列;从并发的角度来说,BlockingQueue带有阻塞的功能。

当队列为空时,若此时有线程来获取队列中的数据,那么这个线程将会被阻塞,直到其他线程往队列中插入数据。当队列满时,若此时有线程继续往队列插入数据,该线程也会被阻塞,直到有其他线程取走了数据。BlockingQueue的模型如下

BlockingQueue的类结构图如下

操作

BlockingQueue提供许多操作,具体如下表所示

Type Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e,time,unit)
Remove remove() poll() take() poll(time,unit)
Examine element() peek()

怎么理解上面的图呢?横轴来看,表示的是操作影响

  1. Throws exception:表示该操作可能会抛出异常,例如队列为空时,继续删除数据
  2. Special value:表示该操作会返回特殊的值,例如队列为空时,返回null
  3. Blocks:表示该操作会引起阻塞
  4. Times out:表示该操作可以指定超时时间,例如插入数据

阻塞和超时有啥区别呢。对于阻塞操作来说,线程会一直阻塞直到某些操作唤醒线程,而超时的话,线程可以被操作唤醒,也可以在一段时间后自动被唤醒。

纵轴来看,表示的是操作类型,这里重点看下Remove和Examine的区别,

  • 相同点:两者都会返回队头的数据
  • 不同点:remove会删除数据,而Examine不会删除数据

示例

首先建立一个BlockingQueue的测试类,并且定义一个长度为3的队列

public class BlockingQueueTestTest { 

    // 定义队列长度3
    private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);

    // 定义预期异常
    @Rule
    public ExpectedException thrown = ExpectedException.none();
}

1. add与offer

@Test
public void testAdd(){
    Assert.assertTrue(blockingQueue.add(1));
    Assert.assertTrue(blockingQueue.add(2));
    Assert.assertTrue(blockingQueue.add(3));
    thrown.expect(IllegalStateException.class);
    blockingQueue.add(4);
}

@Test
public void testOffer(){
    Assert.assertTrue(blockingQueue.offer(1));
    Assert.assertTrue(blockingQueue.offer(2));
    Assert.assertTrue(blockingQueue.offer(3));
    Assert.assertFalse(blockingQueue.offer(4));
}

可以看到,当队列满时,继续add会抛出IllegalStateException的异常;而继续offer时会返回false

2. put与offer

先来测试put方法

@Test
public void testPut() throws InterruptedException {
    for (int i = 0; i < 4; i++){
        Thread thread = new Thread(() -> {
            try {
                int value = ThreadLocalRandom.current().nextInt(100);
                blockingQueue.put(value);
                System.out.println(String.format("put %d into queue,result:%s", value,blockingQueue.put(value,1,TimeUnit.SECONDS)));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 设置线程名称,以便jstack查看
        thread.setName("BlockingQueue-put-" + i);
        thread.start();
    }
    // 防止主函数退出
    Thread.sleep(600 * 1000);
}

运行程序后,我们通过jps命令找到pid,在通过jstack命令来看下线程的情况。可以看到当队列满时,若继续put数据到队列,会导致线程阻塞。

再来看下offer(e,time,unit)方法

@Test
public void testOfferTimeout() throws InterruptedException {
    for (int i = 0; i < 4; i++){
        Thread thread = new Thread(() -> {
            try {
                int value = ThreadLocalRandom.current().nextInt(100);
                System.out.println(String.format("put %d into queue,result:%s", value,blockingQueue.offer(value,1,TimeUnit.SECONDS)));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.setName("BlockingQueue-offer-" + i);
        thread.start();
    }
    // 防止主函数退出
    Thread.sleep(600 * 1000);
}

我们设置offer的超时时间为1s,当程序运行超过1s后,使用jstack命令查看线程状态时,发现只要main函数处于阻塞状态(因为在sleep),而offer线程已经全部执行完毕了。

remove与poll

@Test
public void testRemove(){
    thrown.expect(NoSuchElementException.class);
    blockingQueue.remove();
}

@Test
public void testPoll(){
    Assert.assertNull(blockingQueue.poll());
}

element,peek,remove

@Test
public void testElementAndPeekAndRemove(){
    blockingQueue.add(1);
    // element操作只返回数据并不删除数据
    Assert.assertEquals(1, (int) blockingQueue.element());
    Assert.assertEquals(1, blockingQueue.size());

    // remove删除并返回数据
    Assert.assertEquals(1,(int) blockingQueue.remove());
    Assert.assertEquals(0,blockingQueue.size());

    // 当队列为空时,element会抛出异常
    thrown.expect(NoSuchElementException.class);
    blockingQueue.element();
    // 当队列为空时,peek返回null
    Assert.assertNull(blockingQueue.peek());
}

Reprint please specify: wbl BlockingQueue介绍

Current
BlockingQueue介绍 BlockingQueue介绍
简介BlockingQueue是java.util.concurrent中的类。通过包名也知道它与并发有关。在学习并发的过程中,我相信大家都听过生产者消费者模型。BlockingQueue可以帮助我们来实现这样一个模型。从数据结构的角度来说
2020-05-10
Next
Spring Boot JPA--JpaRepository实现原理 Spring Boot JPA--JpaRepository实现原理
在SpringBoot中,我们只要定义好Entity,配置好DataSource,最后实现JpaRepository接口就可以对数据进行CRUD操作。 @Repository public interface UserDao extends
2020-04-13