Общее·количество·просмотров·страницы

Java Dev Notes - разработка на Java (а также на JavaScript/Python/Flex и др), факты, события из АйТи

пятница, 14 марта 2014 г.

Элементарная реализация BlockingQueue на Java

BlockingQueue (блокирующая очередь) - это очередь, которая блокирует поток, в двух случаях:
  • поток пытается получить элементы из пустой очереди
  • поток пытается положить элементы в полную очередь

Когда поток пытается получить элементы из пустой очереди, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не положит элементы в очередь. Аналогично, когда поток пытается положить элементы в полную очередь, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не возьмет элементы их очереди и таким образом не освободит место в ней. Естественно, введение понятия "полная очередь" подразумевает, что очередь имеет ограниченный размер, который обычно задается в конструкторе.

В Java есть интерфейс BlockingQueue, а также множество реализаций этого интерфейса:

Тем не менее, было бы полезно самим реализовать простейшую блокирующую очередь для понимания ее работы. Причем наша реализация (для начала) должна быть самой простой. Реализации в JDK намного более сложные, чем наша примитивная реализация, которая приведена ниже. Затем можно было бы детально разобрать реализации блокирующих очередей в Java. Впрочем, разбор реализаций в Java мы оставим когда-нибудь на потом, а сейчас займемся собственной простейшей реализацией блокирующей очереди.

public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }


    public synchronized void put(Object item) throws InterruptedException  {
        while (this.queue.size() == this.limit) {
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }


    public synchronized Object take() throws InterruptedException{
        while (this.queue.size() == 0){
            wait();
        }
        if (this.queue.size() == this.limit){
            notifyAll();
        }

        return this.queue.remove(0);
    }
}

Рассмотрим метод put:

public synchronized void put(Object item) throws InterruptedException  {
    while (this.queue.size() == this.limit) {
        wait();
    }
    if (this.queue.size() == 0) {
        notifyAll();
    }
    this.queue.add(item);
}
Если очередь заполнена, то поток ставится в ожидание. Если же очередь пустая, то вызывается метод notifyAll, который пробуждает потоки, которые поставлены в ожидание при получении элементов из очереди. Аналогично работает метод take.

Обратите внимание, что в нашей простейшей реализации методы put, take являются synchronized в отличие от реализаций в JDK. Это сделано из двух соображений:

  • простота
  • желание обойтись только элементарными примитивами синхронизации в JDK, а именно методами wait, notifyAll

Давайте промоделируем работу очереди размеров в 5 элементов. Пусть у нас есть один поток (Producer), который, скажем, каждые 2 миллисекунды создает и кладет новый объект в очередь (пока что не подключаем поток, который забирает элементы из очереди). Очевидно, что через 10 миллисекунд, после того как очередь вся будет заполнена, при попытке положить шестой по счету элемент в очередь, этот поток будет поставлен в ожидание вот этим кусом кода из метода put:

while (this.queue.size() == this.limit) {
    wait();
}
Теперь подключим второй поток, который будет забирать элементы из очереди (Consumer). Он определит, что очередь полная и пошлет notification всем ожидающим потокам. Это произойдет в следующем куске кода метода take:
if (this.queue.size() == this.limit){
    notifyAll();
}
После этого поток заберет первый элемент из списка, и вернет его в методе take, выйдя таким образом из этого (synchronized!!!) метода и освободив монитор, ассоциированный c очередью. Затем монитор захватит поток Producer, пробудившийся от ожидания, и продолжит класть элементы в очередь.

Теперь приведем полный код демо-примера:

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }


    public synchronized void put(T item) throws InterruptedException  {
        System.out.println("[BlockingQueue] try to put: " + item );
        while (this.queue.size() == this.limit) {
            System.out.println("[BlockingQueue] queue is full, waiting until space is free");
            wait();
        }
        if (this.queue.size() == 0) {
            System.out.println("[BlockingQueue] queue is empty, notify");
            notifyAll();
        }
        this.queue.add(item);
        System.out.println("[BlockingQueue] put ok: " + item );
    }


    public synchronized T take() throws InterruptedException{
        System.out.println("[BlockingQueue] try to take");
        while (this.queue.size() == 0){
            System.out.println("[BlockingQueue] queue is empty, waiting until smth is put");
            wait();
        }
        if (this.queue.size() == this.limit){
            System.out.println("[BlockingQueue] queue is full, notify");
            notifyAll();
        }

        T item = this.queue.remove(0);
        System.out.println("[BlockingQueue] take ok: " + item );
        return item;
    }
}
import java.util.Random;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new BlockingQueue(5);
        new Thread(new Producer(queue)).start();
        Thread.currentThread().sleep(1000);
        new Thread(new Consumer(queue)).start();
    }

    static class Producer implements Runnable {
        private final BlockingQueue queue;

        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("[Producer] run");
            while (true) {
                try {
                    queue.put(produce());
                    Thread.currentThread().sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private Integer produce() {
            Integer i = new Random().nextInt(100);
            System.out.println("[Producer] produce: " + i);
            return i;
        }
    }

    static class Consumer implements Runnable {
        private final BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("[Consumer] run");
            while (true) {
                try {
                    consume();
                    Thread.currentThread().sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private void consume() throws InterruptedException {
            Integer i = queue.take();
            System.out.println("[Consumer] consumed: " + i);
        }
    }
}

Вывод консоли будет такой (первая часть, где Producer кладет первые пять элементов в очередь):
[Producer] run
[Producer] produce: 86
[BlockingQueue] try to put: 86
[BlockingQueue] queue is empty, notify
[BlockingQueue] put ok: 86
[Producer] produce: 73
[BlockingQueue] try to put: 73
[BlockingQueue] put ok: 73
[Producer] produce: 20
[BlockingQueue] try to put: 20
[BlockingQueue] put ok: 20
[Producer] produce: 84
[BlockingQueue] try to put: 84
[BlockingQueue] put ok: 84
[Producer] produce: 73
[BlockingQueue] try to put: 73
[BlockingQueue] put ok: 73
[Producer] produce: 67
[BlockingQueue] try to put: 67
[BlockingQueue] queue is full, waiting until space is free
Затем вступает в работу Consumer:
[Consumer] run
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 86
[Consumer] consumed: 86
[BlockingQueue] put ok: 67
[Producer] produce: 63
[BlockingQueue] try to put: 63
[BlockingQueue] queue is full, waiting until space is free
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 73
[Consumer] consumed: 73
[BlockingQueue] put ok: 63
[Producer] produce: 90
[BlockingQueue] try to put: 90
[BlockingQueue] queue is full, waiting until space is free
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 20
[Consumer] consumed: 20
[BlockingQueue] put ok: 90
[Producer] produce: 57
[BlockingQueue] try to put: 57
[BlockingQueue] queue is full, waiting until space is free
На этом куске вывода хорошо видно, как Consumer и Producer попеременно используют очередь для обмена данными.

2 комментария:

  1. Спасибо, толково и просто.

    ОтветитьУдалить
  2. Или я ошибаюсь, или здесь маленькая ошибка. В коде указан лимит = 10. А первая нить кладет только 5 значений и потом переходит в состояние ожидания. Видимо тут должен быть лимит = 5 или в выводе должно быть десять значений.

    ОтветитьУдалить

Постоянные читатели