- grails-app - родительский каталог для исходников приложения
- conf - конфиги
- controllers - контроллеры - буковка "C" в аббревиатуре MVC.
- domain - доменные объекты. Т.е. здесь лежат энтити, по которым будет создаваться схема БД
- i18n - поддержка интернационализации (i18n).
- services - сервисы
- taglib - библиотеки тегов
- utils - Grails -специфичные утилиты
- views - Groovy Server Pages - вьюшки, аналог JSP, буковка "V" в аббревиатуре MVC
- scripts - Gant-скрипты.
- src - вспомогательные исходники
- groovy - исходники на Groovy
- java - исходники на Java
- test - юнит-тесты, интеграционные тесты
Общее·количество·просмотров·страницы
Java Dev Notes - разработка на Java (а также на JavaScript/Python/Flex и др), факты, события из АйТи
четверг, 27 марта 2014 г.
Grails - Convention over Configuration
суббота, 15 марта 2014 г.
Семафоры в Java: реализация простого семафора
Семафор используется для обмена сигналами между потоками, или же для охраны критической секции. Их также можно использовать и вместо локов. Несмотря на то, что в JDK уже реализован семафор (java.util.concurrent.Semaphore), полезно будет самим реализовать этот объект.
Итак, у нашего семафора будет всего лишь два метода: take, release. Соответственно, простейшая реализация будет такой:
public class SimpleSemaphore { boolean taken = false; public synchronized void take() { this.taken = true; this.notify(); } public synchronized void release() throws InterruptedException { while (!this.taken) wait(); this.taken = false; } }
Теперь давайте рассмотрим программу, которая использует семафор для обмена сигналами. У нас будет два потока: SignalSender, SignalReceiver, которые будут посылать друг другу сигналы. Вот код:
public class Main { public static void main(String[] args) throws InterruptedException { SimpleSemaphore semaphore = new SimpleSemaphore(); new Thread(new SignalSender(semaphore)).start(); Thread.currentThread().sleep(2000); new Thread(new SignalReceiver(semaphore)).start(); } static class SignalSender implements Runnable { private final SimpleSemaphore semaphore; public SignalSender(SimpleSemaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { System.out.println("[SignalSender] run"); while (true) { try { doSomeWork(); semaphore.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void doSomeWork() throws InterruptedException { System.out.println("[SignalSender] do some work"); Thread.sleep(500); } } static class SignalReceiver implements Runnable { private final SimpleSemaphore semaphore; public SignalReceiver(SimpleSemaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { System.out.println("[SignalReceiver] run"); while (true) { try { semaphore.release(); doSomeWork(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void doSomeWork() throws InterruptedException { System.out.println("[SignalReceiver] do some work"); Thread.sleep(700); } } }
Посмотрим на вывод консоли:
[SignalSender] run [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] run [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some workМы видим, как потоки обмениваются сигналами, при этом они ждут друг друга, и начинают работу, только получив соответствующий сигнал от семафора.
пятница, 14 марта 2014 г.
Элементарная реализация BlockingQueue на Java
- поток пытается получить элементы из пустой очереди
- поток пытается положить элементы в полную очередь
Когда поток пытается получить элементы из пустой очереди, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не положит элементы в очередь. Аналогично, когда поток пытается положить элементы в полную очередь, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не возьмет элементы их очереди и таким образом не освободит место в ней. Естественно, введение понятия "полная очередь" подразумевает, что очередь имеет ограниченный размер, который обычно задается в конструкторе.
В Java есть интерфейс BlockingQueue, а также множество реализаций этого интерфейса:
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingDeque
- LinkedBlockingQueue
- LinkedTransferQueue
- PriorityBlockingQueue
- SynchronousQueue
Тем не менее, было бы полезно самим реализовать простейшую блокирующую очередь для понимания ее работы. Причем наша реализация (для начала) должна быть самой простой. Реализации в JDK намного более сложные, чем наша примитивная реализация, которая приведена ниже. Затем можно было бы детально разобрать реализации блокирующих очередей в Java. Впрочем, разбор реализаций в Java мы оставим когда-нибудь на потом, а сейчас займемся собственной простейшей реализацией блокирующей очереди.
public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void put(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); } this.queue.add(item); } public synchronized Object take() throws InterruptedException{ while (this.queue.size() == 0){ wait(); } if (this.queue.size() == this.limit){ notifyAll(); } return this.queue.remove(0); } }
Рассмотрим метод put:
public synchronized void put(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); } this.queue.add(item); }Если очередь заполнена, то поток ставится в ожидание. Если же очередь пустая, то вызывается метод notifyAll, который пробуждает потоки, которые поставлены в ожидание при получении элементов из очереди. Аналогично работает метод take.
Обратите внимание, что в нашей простейшей реализации методы put, take являются synchronized в отличие от реализаций в JDK. Это сделано из двух соображений:
- простота
- желание обойтись только элементарными примитивами синхронизации в JDK, а именно методами wait, notifyAll
Давайте промоделируем работу очереди размеров в 5 элементов. Пусть у нас есть один поток (Producer), который, скажем, каждые 2 миллисекунды создает и кладет новый объект в очередь (пока что не подключаем поток, который забирает элементы из очереди). Очевидно, что через 10 миллисекунд, после того как очередь вся будет заполнена, при попытке положить шестой по счету элемент в очередь, этот поток будет поставлен в ожидание вот этим кусом кода из метода put:
while (this.queue.size() == this.limit) { wait(); }Теперь подключим второй поток, который будет забирать элементы из очереди (Consumer). Он определит, что очередь полная и пошлет notification всем ожидающим потокам. Это произойдет в следующем куске кода метода take:
if (this.queue.size() == this.limit){ notifyAll(); }После этого поток заберет первый элемент из списка, и вернет его в методе take, выйдя таким образом из этого (synchronized!!!) метода и освободив монитор, ассоциированный c очередью. Затем монитор захватит поток Producer, пробудившийся от ожидания, и продолжит класть элементы в очередь.
Теперь приведем полный код демо-примера:
import java.util.LinkedList; import java.util.List; public class BlockingQueue{ private List queue = new LinkedList (); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void put(T item) throws InterruptedException { System.out.println("[BlockingQueue] try to put: " + item ); while (this.queue.size() == this.limit) { System.out.println("[BlockingQueue] queue is full, waiting until space is free"); wait(); } if (this.queue.size() == 0) { System.out.println("[BlockingQueue] queue is empty, notify"); notifyAll(); } this.queue.add(item); System.out.println("[BlockingQueue] put ok: " + item ); } public synchronized T take() throws InterruptedException{ System.out.println("[BlockingQueue] try to take"); while (this.queue.size() == 0){ System.out.println("[BlockingQueue] queue is empty, waiting until smth is put"); wait(); } if (this.queue.size() == this.limit){ System.out.println("[BlockingQueue] queue is full, notify"); notifyAll(); } T item = this.queue.remove(0); System.out.println("[BlockingQueue] take ok: " + item ); return item; } }
import java.util.Random; public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueueВывод консоли будет такой (первая часть, где Producer кладет первые пять элементов в очередь):queue = new BlockingQueue (5); new Thread(new Producer(queue)).start(); Thread.currentThread().sleep(1000); new Thread(new Consumer(queue)).start(); } static class Producer implements Runnable { private final BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("[Producer] run"); while (true) { try { queue.put(produce()); Thread.currentThread().sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } private Integer produce() { Integer i = new Random().nextInt(100); System.out.println("[Producer] produce: " + i); return i; } } static class Consumer implements Runnable { private final BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("[Consumer] run"); while (true) { try { consume(); Thread.currentThread().sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } } private void consume() throws InterruptedException { Integer i = queue.take(); System.out.println("[Consumer] consumed: " + i); } } }
[Producer] run [Producer] produce: 86 [BlockingQueue] try to put: 86 [BlockingQueue] queue is empty, notify [BlockingQueue] put ok: 86 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 20 [BlockingQueue] try to put: 20 [BlockingQueue] put ok: 20 [Producer] produce: 84 [BlockingQueue] try to put: 84 [BlockingQueue] put ok: 84 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 67 [BlockingQueue] try to put: 67 [BlockingQueue] queue is full, waiting until space is freeЗатем вступает в работу Consumer:
[Consumer] run [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 86 [Consumer] consumed: 86 [BlockingQueue] put ok: 67 [Producer] produce: 63 [BlockingQueue] try to put: 63 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 73 [Consumer] consumed: 73 [BlockingQueue] put ok: 63 [Producer] produce: 90 [BlockingQueue] try to put: 90 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 20 [Consumer] consumed: 20 [BlockingQueue] put ok: 90 [Producer] produce: 57 [BlockingQueue] try to put: 57 [BlockingQueue] queue is full, waiting until space is freeНа этом куске вывода хорошо видно, как Consumer и Producer попеременно используют очередь для обмена данными.
пятница, 7 марта 2014 г.
Демо-проект: Apache CXF, Jetty, MySQL
Начальная задача: сделать сервер, который позволяет читать состояние счета по его ID, а также добавлять/снимать средства со счета. Состояния счетов хранятся с БД. Сервер должен работать в нагруженное среде, поэтому нужно добавить кеширование. Транспортный протокол может быть любой. Я выбрал веб-сервисы, реализация веб-сервисов была сделана с помощью Apache CXF. После запуска сервера WSDL можно посмотреть по адресу: http://localhost:9000/accountService?wsdl
Вторая часть задачки: сделать клиента, который в несколько потоков читает/пишет значения счетов. Кол-во читателей/писателей, а также множество идентификаторов счетов задается параметрами клиента (например, из командной строки).
Третья часть задачки: сервер должен статистику работы скидывать в файл. Статистика должна быть такой: общее кол-во обработанных запросов, а также удельное кол-во запросов в ед. времени (т.е. rps - requests per second). Нужно предусмотреть возможность сброса статсы в ноль. Сброс статсы в ноль я сделал с помощью вызова HTTP-хендлера, который и обрабатывается Jetty. Сброс статсы осуществляется HTTP GET-ом урлы http://localhost:9001/zero. По урлу http://localhost:9001/getStats получаем текущее значение rps.
Несколько моментов про реализацию. Итак, рассмотрим реализацию AccountServiceImpl:
@WebService(endpointInterface = "net.iryndin.clientserverdemo1.commons.api.AccountService",serviceName = "AccountService") public class AccountServiceImpl implements AccountService { private MapВидно, что чтение и запись идет прямиком к кеш, т.е. скорость чтения и записи определяется скоростью работы кеша. Запись в базу запускается в отдельном потоке. Если бы она запускалась из этого же потока, то и производительность записи была бы совсем другой. На самом деле, такое решение, конечно, не всегда приемлемо, но поскольку это тестовый проект, я сделал именно так.cache = new ConcurrentHashMap<>(); @Override public Long getAmount(Integer id) { AtomicLong balance = cache.get(id); statsHelper.incrQueries(); if (balance == null) { return 0L; } else { return balance.get(); } } @Override public void addAmount(final Integer id, Long value) throws Exception { AtomicLong balance = cache.get(id); if (balance == null) { balance = new AtomicLong(0); cache.put(id, balance); } balance.addAndGet(value); final AtomicLong finalBalance = balance; executorService.submit(new Runnable() { @Override public void run() { try { accountDao.save(id, finalBalance.get()); } catch (SQLException e) { e.printStackTrace(); } } }); statsHelper.incrQueries(); } }
Теперь рассмотрим моменты реализации клиента. См. файл MainClient:
private static void runReaders(int count) { if (count <=0) { System.out.println("No readers run"); return; } ExecutorService executorService = Executors.newFixedThreadPool(count); System.out.println("Run readers: " + count); while (count > 0) { executorService.submit(new Runnable() { @Override public void run() { while (running) { accountService.getAmount(idsHelper.getRandomId()); } } }); count--; } } private static void runWriters(int count) { if (count <=0) { System.out.println("No writers run"); return; } ExecutorService executorService = Executors.newFixedThreadPool(count); System.out.println("Run writers: " + count); while (count > 0) { executorService.submit(new Runnable() { @Override public void run() { while (running) { boolean positive = new Random().nextBoolean(); int balance = new Random().nextInt(100000); balance = positive ? balance : -balance; try { accountService.addAmount(idsHelper.getRandomId(), (long) balance); } catch (Exception e) { // } } } }); count--; } }Мы запускаем потоки по числу rCount, wCount и крутимся там в бесконечном цикле (в бесконечном, так как переменную running в коде никто не меняет). Все этот создает неплохую нагрузку на сервер.
Последнее, рассмотрим скрипт на Python 3 (measure.py), который запускает клиент с различными значениями параметров rCount, wCount и измеряет производительность (просто считывает данные с урла http://localhost:9001/getStats):
import subprocess import time import urllib.request def main(): readers = [0, 20, 40, 60, 80, 100] writers = [0, 20, 40, 60, 80, 100] output = open('rps.txt','w', 1) for r in readers: for w in writers: run_client(r,w,output) print('Done reader=%d, writer=%d' % (r,w)) output.close() def run_client(readers_qty, writers_qty, output): if readers_qty==0 and writers_qty==0: return proc = subprocess.Popen(['java', '-jar', '-Xms2G', '-Xmx8G', 'client.jar', str(readers_qty), str(writers_qty)], shell=False) time.sleep(1) f = urllib.request.urlopen('http://localhost:9001/zero') time.sleep(30) f = urllib.request.urlopen('http://localhost:9001/getStats') s = f.read() rps = int(s) print('rps=%d' % rps) output.write('(%d,%d): %d\n' % (readers_qty,writers_qty,rps)) proc.terminate() time.sleep(5) if __name__ == "__main__": main()
В результате, мы получаем следующую табличку (по горизонтали: число wCount, по вертикали: rCount, значения в ячейках: requests-per-second):
rCount/wCount | 0 | 20 | 40 | 60 | 80 | 100 |
---|---|---|---|---|---|---|
0 | - | 12197 | 12319 | 12865 | 12648 | 13614 |
20 | 13324 | 13240 | 13049 | 12657 | 12499 | 12858 |
40 | 13198 | 12249 | 12676 | 13112 | 12882 | 12442 |
60 | 12420 | 12250 | 12348 | 12928 | 12534 | 13013 |
80 | 12531 | 12934 | 12304 | 12428 | 13133 | 12978 |
100 | 12972 | 13204 | 11869 | 11541 | 12781 | 12496 |