Общее·количество·просмотров·страницы

Java Dev Notes - разработка на Java (а также на JavaScript/Python/Flex и др), факты, события из АйТи

четверг, 27 марта 2014 г.

Grails - Convention over Configuration

Изучаю популярный фремворк для разработки веб-приложений для JVM-стека - Grails. Вместо того, чтобы писать длинные портянки XML-конфигураций (как в Spring, до тех пор, пока не стали пользоваться аннотациями для конфигов), Grails использует другой подход: есть жестко заданные правила, которые определяют, что где лежит, а также схему именования этих сущностей (контроллеры, вьюшки, сервисы, доменные объекты и т.п.). Для Grails 2.3.7 схема такая (Convention over Configuration):
  • grails-app - родительский каталог для исходников приложения
    • conf - конфиги
    • controllers - контроллеры - буковка "C" в аббревиатуре MVC.
    • domain - доменные объекты. Т.е. здесь лежат энтити, по которым будет создаваться схема БД
    • i18n - поддержка интернационализации (i18n).
    • services - сервисы
    • taglib - библиотеки тегов
    • utils - Grails -специфичные утилиты
    • views - Groovy Server Pages - вьюшки, аналог JSP, буковка "V" в аббревиатуре MVC
  • scripts - Gant-скрипты.
  • src - вспомогательные исходники
    • groovy - исходники на Groovy
    • java - исходники на Java
  • test - юнит-тесты, интеграционные тесты

суббота, 15 марта 2014 г.

Семафоры в Java: реализация простого семафора

Семафор используется для обмена сигналами между потоками, или же для охраны критической секции. Их также можно использовать и вместо локов. Несмотря на то, что в JDK уже реализован семафор (java.util.concurrent.Semaphore), полезно будет самим реализовать этот объект.

Итак, у нашего семафора будет всего лишь два метода: take, release. Соответственно, простейшая реализация будет такой:

public class SimpleSemaphore {
    boolean taken = false;

    public synchronized void take() {
        this.taken = true;
        this.notify();
    }

    public synchronized void release() throws InterruptedException {
        while (!this.taken) wait();
        this.taken = false;
    }
}

Теперь давайте рассмотрим программу, которая использует семафор для обмена сигналами. У нас будет два потока: SignalSender, SignalReceiver, которые будут посылать друг другу сигналы. Вот код:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        SimpleSemaphore semaphore = new SimpleSemaphore();
        new Thread(new SignalSender(semaphore)).start();
        Thread.currentThread().sleep(2000);
        new Thread(new SignalReceiver(semaphore)).start();
    }


    static class SignalSender implements Runnable {
        private final SimpleSemaphore semaphore;

        public SignalSender(SimpleSemaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            System.out.println("[SignalSender] run");
            while (true) {
                try {
                    doSomeWork();
                    semaphore.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

        private void doSomeWork() throws InterruptedException {
            System.out.println("[SignalSender] do some work");
            Thread.sleep(500);
        }
    }

    static class SignalReceiver implements Runnable {
        private final SimpleSemaphore semaphore;

        public SignalReceiver(SimpleSemaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            System.out.println("[SignalReceiver] run");
            while (true) {
                try {
                    semaphore.release();
                    doSomeWork();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

        private void doSomeWork() throws InterruptedException {
            System.out.println("[SignalReceiver] do some work");
            Thread.sleep(700);
        }
    }
}

Посмотрим на вывод консоли:

[SignalSender] run
[SignalSender] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalReceiver] run
[SignalReceiver] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalReceiver] do some work
[SignalSender] do some work
[SignalSender] do some work
[SignalReceiver] do some work
Мы видим, как потоки обмениваются сигналами, при этом они ждут друг друга, и начинают работу, только получив соответствующий сигнал от семафора.

пятница, 14 марта 2014 г.

Элементарная реализация BlockingQueue на Java

BlockingQueue (блокирующая очередь) - это очередь, которая блокирует поток, в двух случаях:
  • поток пытается получить элементы из пустой очереди
  • поток пытается положить элементы в полную очередь

Когда поток пытается получить элементы из пустой очереди, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не положит элементы в очередь. Аналогично, когда поток пытается положить элементы в полную очередь, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не возьмет элементы их очереди и таким образом не освободит место в ней. Естественно, введение понятия "полная очередь" подразумевает, что очередь имеет ограниченный размер, который обычно задается в конструкторе.

В Java есть интерфейс BlockingQueue, а также множество реализаций этого интерфейса:

Тем не менее, было бы полезно самим реализовать простейшую блокирующую очередь для понимания ее работы. Причем наша реализация (для начала) должна быть самой простой. Реализации в JDK намного более сложные, чем наша примитивная реализация, которая приведена ниже. Затем можно было бы детально разобрать реализации блокирующих очередей в Java. Впрочем, разбор реализаций в Java мы оставим когда-нибудь на потом, а сейчас займемся собственной простейшей реализацией блокирующей очереди.

public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }


    public synchronized void put(Object item) throws InterruptedException  {
        while (this.queue.size() == this.limit) {
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }


    public synchronized Object take() throws InterruptedException{
        while (this.queue.size() == 0){
            wait();
        }
        if (this.queue.size() == this.limit){
            notifyAll();
        }

        return this.queue.remove(0);
    }
}

Рассмотрим метод put:

public synchronized void put(Object item) throws InterruptedException  {
    while (this.queue.size() == this.limit) {
        wait();
    }
    if (this.queue.size() == 0) {
        notifyAll();
    }
    this.queue.add(item);
}
Если очередь заполнена, то поток ставится в ожидание. Если же очередь пустая, то вызывается метод notifyAll, который пробуждает потоки, которые поставлены в ожидание при получении элементов из очереди. Аналогично работает метод take.

Обратите внимание, что в нашей простейшей реализации методы put, take являются synchronized в отличие от реализаций в JDK. Это сделано из двух соображений:

  • простота
  • желание обойтись только элементарными примитивами синхронизации в JDK, а именно методами wait, notifyAll

Давайте промоделируем работу очереди размеров в 5 элементов. Пусть у нас есть один поток (Producer), который, скажем, каждые 2 миллисекунды создает и кладет новый объект в очередь (пока что не подключаем поток, который забирает элементы из очереди). Очевидно, что через 10 миллисекунд, после того как очередь вся будет заполнена, при попытке положить шестой по счету элемент в очередь, этот поток будет поставлен в ожидание вот этим кусом кода из метода put:

while (this.queue.size() == this.limit) {
    wait();
}
Теперь подключим второй поток, который будет забирать элементы из очереди (Consumer). Он определит, что очередь полная и пошлет notification всем ожидающим потокам. Это произойдет в следующем куске кода метода take:
if (this.queue.size() == this.limit){
    notifyAll();
}
После этого поток заберет первый элемент из списка, и вернет его в методе take, выйдя таким образом из этого (synchronized!!!) метода и освободив монитор, ассоциированный c очередью. Затем монитор захватит поток Producer, пробудившийся от ожидания, и продолжит класть элементы в очередь.

Теперь приведем полный код демо-примера:

import java.util.LinkedList;
import java.util.List;

public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }


    public synchronized void put(T item) throws InterruptedException  {
        System.out.println("[BlockingQueue] try to put: " + item );
        while (this.queue.size() == this.limit) {
            System.out.println("[BlockingQueue] queue is full, waiting until space is free");
            wait();
        }
        if (this.queue.size() == 0) {
            System.out.println("[BlockingQueue] queue is empty, notify");
            notifyAll();
        }
        this.queue.add(item);
        System.out.println("[BlockingQueue] put ok: " + item );
    }


    public synchronized T take() throws InterruptedException{
        System.out.println("[BlockingQueue] try to take");
        while (this.queue.size() == 0){
            System.out.println("[BlockingQueue] queue is empty, waiting until smth is put");
            wait();
        }
        if (this.queue.size() == this.limit){
            System.out.println("[BlockingQueue] queue is full, notify");
            notifyAll();
        }

        T item = this.queue.remove(0);
        System.out.println("[BlockingQueue] take ok: " + item );
        return item;
    }
}
import java.util.Random;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new BlockingQueue(5);
        new Thread(new Producer(queue)).start();
        Thread.currentThread().sleep(1000);
        new Thread(new Consumer(queue)).start();
    }

    static class Producer implements Runnable {
        private final BlockingQueue queue;

        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("[Producer] run");
            while (true) {
                try {
                    queue.put(produce());
                    Thread.currentThread().sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private Integer produce() {
            Integer i = new Random().nextInt(100);
            System.out.println("[Producer] produce: " + i);
            return i;
        }
    }

    static class Consumer implements Runnable {
        private final BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("[Consumer] run");
            while (true) {
                try {
                    consume();
                    Thread.currentThread().sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private void consume() throws InterruptedException {
            Integer i = queue.take();
            System.out.println("[Consumer] consumed: " + i);
        }
    }
}

Вывод консоли будет такой (первая часть, где Producer кладет первые пять элементов в очередь):
[Producer] run
[Producer] produce: 86
[BlockingQueue] try to put: 86
[BlockingQueue] queue is empty, notify
[BlockingQueue] put ok: 86
[Producer] produce: 73
[BlockingQueue] try to put: 73
[BlockingQueue] put ok: 73
[Producer] produce: 20
[BlockingQueue] try to put: 20
[BlockingQueue] put ok: 20
[Producer] produce: 84
[BlockingQueue] try to put: 84
[BlockingQueue] put ok: 84
[Producer] produce: 73
[BlockingQueue] try to put: 73
[BlockingQueue] put ok: 73
[Producer] produce: 67
[BlockingQueue] try to put: 67
[BlockingQueue] queue is full, waiting until space is free
Затем вступает в работу Consumer:
[Consumer] run
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 86
[Consumer] consumed: 86
[BlockingQueue] put ok: 67
[Producer] produce: 63
[BlockingQueue] try to put: 63
[BlockingQueue] queue is full, waiting until space is free
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 73
[Consumer] consumed: 73
[BlockingQueue] put ok: 63
[Producer] produce: 90
[BlockingQueue] try to put: 90
[BlockingQueue] queue is full, waiting until space is free
[BlockingQueue] try to take
[BlockingQueue] queue is full, notify
[BlockingQueue] take ok: 20
[Consumer] consumed: 20
[BlockingQueue] put ok: 90
[Producer] produce: 57
[BlockingQueue] try to put: 57
[BlockingQueue] queue is full, waiting until space is free
На этом куске вывода хорошо видно, как Consumer и Producer попеременно используют очередь для обмена данными.

пятница, 7 марта 2014 г.

Демо-проект: Apache CXF, Jetty, MySQL

Решил выложить на Github небольшой демо-проект: клиент-серверное приложение, управляющее аккаунтами (счетами).

Начальная задача: сделать сервер, который позволяет читать состояние счета по его ID, а также добавлять/снимать средства со счета. Состояния счетов хранятся с БД. Сервер должен работать в нагруженное среде, поэтому нужно добавить кеширование. Транспортный протокол может быть любой. Я выбрал веб-сервисы, реализация веб-сервисов была сделана с помощью Apache CXF. После запуска сервера WSDL можно посмотреть по адресу: http://localhost:9000/accountService?wsdl

Вторая часть задачки: сделать клиента, который в несколько потоков читает/пишет значения счетов. Кол-во читателей/писателей, а также множество идентификаторов счетов задается параметрами клиента (например, из командной строки).

Третья часть задачки: сервер должен статистику работы скидывать в файл. Статистика должна быть такой: общее кол-во обработанных запросов, а также удельное кол-во запросов в ед. времени (т.е. rps - requests per second). Нужно предусмотреть возможность сброса статсы в ноль. Сброс статсы в ноль я сделал с помощью вызова HTTP-хендлера, который и обрабатывается Jetty. Сброс статсы осуществляется HTTP GET-ом урлы http://localhost:9001/zero. По урлу http://localhost:9001/getStats получаем текущее значение rps.

Несколько моментов про реализацию. Итак, рассмотрим реализацию AccountServiceImpl:

@WebService(endpointInterface = "net.iryndin.clientserverdemo1.commons.api.AccountService",serviceName = "AccountService")
public class AccountServiceImpl implements AccountService {
    private Map cache = new ConcurrentHashMap<>();

    @Override
    public Long getAmount(Integer id) {
        AtomicLong balance = cache.get(id);
        statsHelper.incrQueries();
        if (balance == null) {
            return 0L;
        } else {
            return balance.get();
        }
    }

    @Override
    public void addAmount(final Integer id, Long value) throws Exception {
        AtomicLong balance = cache.get(id);
        if (balance == null) {
            balance = new AtomicLong(0);
            cache.put(id, balance);
        }
        balance.addAndGet(value);
        final AtomicLong finalBalance = balance;
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    accountDao.save(id, finalBalance.get());
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        });
        statsHelper.incrQueries();
    }
}  
Видно, что чтение и запись идет прямиком к кеш, т.е. скорость чтения и записи определяется скоростью работы кеша. Запись в базу запускается в отдельном потоке. Если бы она запускалась из этого же потока, то и производительность записи была бы совсем другой. На самом деле, такое решение, конечно, не всегда приемлемо, но поскольку это тестовый проект, я сделал именно так.

Теперь рассмотрим моменты реализации клиента. См. файл MainClient:

    private static void runReaders(int count) {
        if (count <=0) {
            System.out.println("No readers run");
            return;
        }
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        System.out.println("Run readers: " + count);
        while (count > 0) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    while (running) {
                        accountService.getAmount(idsHelper.getRandomId());
                    }
                }
            });
            count--;
        }
    }

    private static void runWriters(int count) {
        if (count <=0) {
            System.out.println("No writers run");
            return;
        }
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        System.out.println("Run writers: " + count);
        while (count > 0) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    while (running) {
                        boolean positive = new Random().nextBoolean();
                        int balance = new Random().nextInt(100000);
                        balance = positive ? balance : -balance;
                        try {
                            accountService.addAmount(idsHelper.getRandomId(), (long) balance);
                        } catch (Exception e) {
                            //
                        }
                    }
                }
            });
            count--;
        }
    }
Мы запускаем потоки по числу rCount, wCount и крутимся там в бесконечном цикле (в бесконечном, так как переменную running в коде никто не меняет). Все этот создает неплохую нагрузку на сервер.

Последнее, рассмотрим скрипт на Python 3 (measure.py), который запускает клиент с различными значениями параметров rCount, wCount и измеряет производительность (просто считывает данные с урла http://localhost:9001/getStats):

import subprocess
import time
import urllib.request

def main():
    readers = [0, 20, 40, 60, 80, 100]
    writers = [0, 20, 40, 60, 80, 100]

    output = open('rps.txt','w', 1)
    
    for r in readers:
        for w in writers:
            run_client(r,w,output)
            print('Done reader=%d, writer=%d' % (r,w))
            
    output.close()

def run_client(readers_qty, writers_qty, output):        
    if readers_qty==0 and writers_qty==0:
        return
    proc = subprocess.Popen(['java', '-jar', '-Xms2G', '-Xmx8G', 'client.jar', str(readers_qty), str(writers_qty)], shell=False)
    time.sleep(1)
    f = urllib.request.urlopen('http://localhost:9001/zero')
    time.sleep(30)
    f = urllib.request.urlopen('http://localhost:9001/getStats')
    s = f.read()
    rps = int(s)
    print('rps=%d' % rps)
    output.write('(%d,%d): %d\n' % (readers_qty,writers_qty,rps))
    
    proc.terminate()
    time.sleep(5)
     
if __name__ == "__main__":
    main()

В результате, мы получаем следующую табличку (по горизонтали: число wCount, по вертикали: rCount, значения в ячейках: requests-per-second):

rCount/wCount 0 20 40 60 80 100
0 - 12197 12319 12865 12648 13614
20 13324 13240 13049 12657 12499 12858
40 13198 12249 12676 13112 12882 12442
60 12420 12250 12348 12928 12534 13013
80 12531 12934 12304 12428 13133 12978
100 12972 13204 11869 11541 12781 12496
Т.е. сервер держит где-то 12-13К реквестов. Это число полносью определяется производительностью ConcurrentHashMap.

Постоянные читатели