简介
BlockingQueue是java.util.concurrent中的类。通过包名也知道它与并发有关。在学习并发的过程中,我相信大家都听过生产者消费者模型。BlockingQueue可以帮助我们来实现这样一个模型。从数据结构的角度来说,BlockingQueue是一个队列;从并发的角度来说,BlockingQueue带有阻塞的功能。
当队列为空时,若此时有线程来获取队列中的数据,那么这个线程将会被阻塞,直到其他线程往队列中插入数据。当队列满时,若此时有线程继续往队列插入数据,该线程也会被阻塞,直到有其他线程取走了数据。BlockingQueue的模型如下
BlockingQueue的类结构图如下
操作
BlockingQueue提供许多操作,具体如下表所示
Type | Throws exception | Special value | Blocks | Times out |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e,time,unit) |
Remove | remove() | poll() | take() | poll(time,unit) |
Examine | element() | peek() |
怎么理解上面的图呢?横轴来看,表示的是操作影响
- Throws exception:表示该操作可能会抛出异常,例如队列为空时,继续删除数据
- Special value:表示该操作会返回特殊的值,例如队列为空时,返回null
- Blocks:表示该操作会引起阻塞
- Times out:表示该操作可以指定超时时间,例如插入数据
阻塞和超时有啥区别呢。对于阻塞操作来说,线程会一直阻塞直到某些操作唤醒线程,而超时的话,线程可以被操作唤醒,也可以在一段时间后自动被唤醒。
纵轴来看,表示的是操作类型,这里重点看下Remove和Examine的区别,
- 相同点:两者都会返回队头的数据
- 不同点:remove会删除数据,而Examine不会删除数据
示例
首先建立一个BlockingQueue的测试类,并且定义一个长度为3的队列
public class BlockingQueueTestTest {
// 定义队列长度3
private BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
// 定义预期异常
@Rule
public ExpectedException thrown = ExpectedException.none();
}
1. add与offer
@Test
public void testAdd(){
Assert.assertTrue(blockingQueue.add(1));
Assert.assertTrue(blockingQueue.add(2));
Assert.assertTrue(blockingQueue.add(3));
thrown.expect(IllegalStateException.class);
blockingQueue.add(4);
}
@Test
public void testOffer(){
Assert.assertTrue(blockingQueue.offer(1));
Assert.assertTrue(blockingQueue.offer(2));
Assert.assertTrue(blockingQueue.offer(3));
Assert.assertFalse(blockingQueue.offer(4));
}
可以看到,当队列满时,继续add会抛出IllegalStateException的异常;而继续offer时会返回false
2. put与offer
先来测试put方法
@Test
public void testPut() throws InterruptedException {
for (int i = 0; i < 4; i++){
Thread thread = new Thread(() -> {
try {
int value = ThreadLocalRandom.current().nextInt(100);
blockingQueue.put(value);
System.out.println(String.format("put %d into queue,result:%s", value,blockingQueue.put(value,1,TimeUnit.SECONDS)));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 设置线程名称,以便jstack查看
thread.setName("BlockingQueue-put-" + i);
thread.start();
}
// 防止主函数退出
Thread.sleep(600 * 1000);
}
运行程序后,我们通过jps命令找到pid,在通过jstack命令来看下线程的情况。可以看到当队列满时,若继续put数据到队列,会导致线程阻塞。
再来看下offer(e,time,unit)方法
@Test
public void testOfferTimeout() throws InterruptedException {
for (int i = 0; i < 4; i++){
Thread thread = new Thread(() -> {
try {
int value = ThreadLocalRandom.current().nextInt(100);
System.out.println(String.format("put %d into queue,result:%s", value,blockingQueue.offer(value,1,TimeUnit.SECONDS)));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.setName("BlockingQueue-offer-" + i);
thread.start();
}
// 防止主函数退出
Thread.sleep(600 * 1000);
}
我们设置offer的超时时间为1s,当程序运行超过1s后,使用jstack命令查看线程状态时,发现只要main函数处于阻塞状态(因为在sleep),而offer线程已经全部执行完毕了。
remove与poll
@Test
public void testRemove(){
thrown.expect(NoSuchElementException.class);
blockingQueue.remove();
}
@Test
public void testPoll(){
Assert.assertNull(blockingQueue.poll());
}
element,peek,remove
@Test
public void testElementAndPeekAndRemove(){
blockingQueue.add(1);
// element操作只返回数据并不删除数据
Assert.assertEquals(1, (int) blockingQueue.element());
Assert.assertEquals(1, blockingQueue.size());
// remove删除并返回数据
Assert.assertEquals(1,(int) blockingQueue.remove());
Assert.assertEquals(0,blockingQueue.size());
// 当队列为空时,element会抛出异常
thrown.expect(NoSuchElementException.class);
blockingQueue.element();
// 当队列为空时,peek返回null
Assert.assertNull(blockingQueue.peek());
}