- поток пытается получить элементы из пустой очереди
- поток пытается положить элементы в полную очередь
Когда поток пытается получить элементы из пустой очереди, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не положит элементы в очередь. Аналогично, когда поток пытается положить элементы в полную очередь, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не возьмет элементы их очереди и таким образом не освободит место в ней. Естественно, введение понятия "полная очередь" подразумевает, что очередь имеет ограниченный размер, который обычно задается в конструкторе.
В Java есть интерфейс BlockingQueue, а также множество реализаций этого интерфейса:
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingDeque
- LinkedBlockingQueue
- LinkedTransferQueue
- PriorityBlockingQueue
- SynchronousQueue
Тем не менее, было бы полезно самим реализовать простейшую блокирующую очередь для понимания ее работы. Причем наша реализация (для начала) должна быть самой простой. Реализации в JDK намного более сложные, чем наша примитивная реализация, которая приведена ниже. Затем можно было бы детально разобрать реализации блокирующих очередей в Java. Впрочем, разбор реализаций в Java мы оставим когда-нибудь на потом, а сейчас займемся собственной простейшей реализацией блокирующей очереди.
public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void put(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); } this.queue.add(item); } public synchronized Object take() throws InterruptedException{ while (this.queue.size() == 0){ wait(); } if (this.queue.size() == this.limit){ notifyAll(); } return this.queue.remove(0); } }
Рассмотрим метод put:
public synchronized void put(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); } this.queue.add(item); }Если очередь заполнена, то поток ставится в ожидание. Если же очередь пустая, то вызывается метод notifyAll, который пробуждает потоки, которые поставлены в ожидание при получении элементов из очереди. Аналогично работает метод take.
Обратите внимание, что в нашей простейшей реализации методы put, take являются synchronized в отличие от реализаций в JDK. Это сделано из двух соображений:
- простота
- желание обойтись только элементарными примитивами синхронизации в JDK, а именно методами wait, notifyAll
Давайте промоделируем работу очереди размеров в 5 элементов. Пусть у нас есть один поток (Producer), который, скажем, каждые 2 миллисекунды создает и кладет новый объект в очередь (пока что не подключаем поток, который забирает элементы из очереди). Очевидно, что через 10 миллисекунд, после того как очередь вся будет заполнена, при попытке положить шестой по счету элемент в очередь, этот поток будет поставлен в ожидание вот этим кусом кода из метода put:
while (this.queue.size() == this.limit) { wait(); }Теперь подключим второй поток, который будет забирать элементы из очереди (Consumer). Он определит, что очередь полная и пошлет notification всем ожидающим потокам. Это произойдет в следующем куске кода метода take:
if (this.queue.size() == this.limit){ notifyAll(); }После этого поток заберет первый элемент из списка, и вернет его в методе take, выйдя таким образом из этого (synchronized!!!) метода и освободив монитор, ассоциированный c очередью. Затем монитор захватит поток Producer, пробудившийся от ожидания, и продолжит класть элементы в очередь.
Теперь приведем полный код демо-примера:
import java.util.LinkedList; import java.util.List; public class BlockingQueue{ private List queue = new LinkedList (); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void put(T item) throws InterruptedException { System.out.println("[BlockingQueue] try to put: " + item ); while (this.queue.size() == this.limit) { System.out.println("[BlockingQueue] queue is full, waiting until space is free"); wait(); } if (this.queue.size() == 0) { System.out.println("[BlockingQueue] queue is empty, notify"); notifyAll(); } this.queue.add(item); System.out.println("[BlockingQueue] put ok: " + item ); } public synchronized T take() throws InterruptedException{ System.out.println("[BlockingQueue] try to take"); while (this.queue.size() == 0){ System.out.println("[BlockingQueue] queue is empty, waiting until smth is put"); wait(); } if (this.queue.size() == this.limit){ System.out.println("[BlockingQueue] queue is full, notify"); notifyAll(); } T item = this.queue.remove(0); System.out.println("[BlockingQueue] take ok: " + item ); return item; } }
import java.util.Random; public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueueВывод консоли будет такой (первая часть, где Producer кладет первые пять элементов в очередь):queue = new BlockingQueue (5); new Thread(new Producer(queue)).start(); Thread.currentThread().sleep(1000); new Thread(new Consumer(queue)).start(); } static class Producer implements Runnable { private final BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("[Producer] run"); while (true) { try { queue.put(produce()); Thread.currentThread().sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } private Integer produce() { Integer i = new Random().nextInt(100); System.out.println("[Producer] produce: " + i); return i; } } static class Consumer implements Runnable { private final BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("[Consumer] run"); while (true) { try { consume(); Thread.currentThread().sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } private void consume() throws InterruptedException { Integer i = queue.take(); System.out.println("[Consumer] consumed: " + i); } } }
[Producer] run [Producer] produce: 86 [BlockingQueue] try to put: 86 [BlockingQueue] queue is empty, notify [BlockingQueue] put ok: 86 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 20 [BlockingQueue] try to put: 20 [BlockingQueue] put ok: 20 [Producer] produce: 84 [BlockingQueue] try to put: 84 [BlockingQueue] put ok: 84 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 67 [BlockingQueue] try to put: 67 [BlockingQueue] queue is full, waiting until space is freeЗатем вступает в работу Consumer:
[Consumer] run [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 86 [Consumer] consumed: 86 [BlockingQueue] put ok: 67 [Producer] produce: 63 [BlockingQueue] try to put: 63 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 73 [Consumer] consumed: 73 [BlockingQueue] put ok: 63 [Producer] produce: 90 [BlockingQueue] try to put: 90 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 20 [Consumer] consumed: 20 [BlockingQueue] put ok: 90 [Producer] produce: 57 [BlockingQueue] try to put: 57 [BlockingQueue] queue is full, waiting until space is freeНа этом куске вывода хорошо видно, как Consumer и Producer попеременно используют очередь для обмена данными.
Спасибо, толково и просто.
ОтветитьУдалитьИли я ошибаюсь, или здесь маленькая ошибка. В коде указан лимит = 10. А первая нить кладет только 5 значений и потом переходит в состояние ожидания. Видимо тут должен быть лимит = 5 или в выводе должно быть десять значений.
ОтветитьУдалить