- grails-app - родительский каталог для исходников приложения
- conf - конфиги
- controllers - контроллеры - буковка "C" в аббревиатуре MVC.
- domain - доменные объекты. Т.е. здесь лежат энтити, по которым будет создаваться схема БД
- i18n - поддержка интернационализации (i18n).
- services - сервисы
- taglib - библиотеки тегов
- utils - Grails -специфичные утилиты
- views - Groovy Server Pages - вьюшки, аналог JSP, буковка "V" в аббревиатуре MVC
- scripts - Gant-скрипты.
- src - вспомогательные исходники
- groovy - исходники на Groovy
- java - исходники на Java
- test - юнит-тесты, интеграционные тесты
Общее·количество·просмотров·страницы
Java Dev Notes - разработка на Java (а также на JavaScript/Python/Flex и др), факты, события из АйТи
четверг, 27 марта 2014 г.
Grails - Convention over Configuration
суббота, 15 марта 2014 г.
Семафоры в Java: реализация простого семафора
Семафор используется для обмена сигналами между потоками, или же для охраны критической секции. Их также можно использовать и вместо локов. Несмотря на то, что в JDK уже реализован семафор (java.util.concurrent.Semaphore), полезно будет самим реализовать этот объект.
Итак, у нашего семафора будет всего лишь два метода: take, release. Соответственно, простейшая реализация будет такой:
public class SimpleSemaphore {
boolean taken = false;
public synchronized void take() {
this.taken = true;
this.notify();
}
public synchronized void release() throws InterruptedException {
while (!this.taken) wait();
this.taken = false;
}
}
Теперь давайте рассмотрим программу, которая использует семафор для обмена сигналами. У нас будет два потока: SignalSender, SignalReceiver, которые будут посылать друг другу сигналы. Вот код:
public class Main {
public static void main(String[] args) throws InterruptedException {
SimpleSemaphore semaphore = new SimpleSemaphore();
new Thread(new SignalSender(semaphore)).start();
Thread.currentThread().sleep(2000);
new Thread(new SignalReceiver(semaphore)).start();
}
static class SignalSender implements Runnable {
private final SimpleSemaphore semaphore;
public SignalSender(SimpleSemaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
System.out.println("[SignalSender] run");
while (true) {
try {
doSomeWork();
semaphore.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void doSomeWork() throws InterruptedException {
System.out.println("[SignalSender] do some work");
Thread.sleep(500);
}
}
static class SignalReceiver implements Runnable {
private final SimpleSemaphore semaphore;
public SignalReceiver(SimpleSemaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
System.out.println("[SignalReceiver] run");
while (true) {
try {
semaphore.release();
doSomeWork();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void doSomeWork() throws InterruptedException {
System.out.println("[SignalReceiver] do some work");
Thread.sleep(700);
}
}
}
Посмотрим на вывод консоли:
[SignalSender] run [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] run [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalReceiver] do some work [SignalSender] do some work [SignalSender] do some work [SignalReceiver] do some workМы видим, как потоки обмениваются сигналами, при этом они ждут друг друга, и начинают работу, только получив соответствующий сигнал от семафора.
пятница, 14 марта 2014 г.
Элементарная реализация BlockingQueue на Java
- поток пытается получить элементы из пустой очереди
- поток пытается положить элементы в полную очередь
Когда поток пытается получить элементы из пустой очереди, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не положит элементы в очередь. Аналогично, когда поток пытается положить элементы в полную очередь, он ставится в ожидание до тех пор, пока какой-нибудь другой поток не возьмет элементы их очереди и таким образом не освободит место в ней. Естественно, введение понятия "полная очередь" подразумевает, что очередь имеет ограниченный размер, который обычно задается в конструкторе.
В Java есть интерфейс BlockingQueue, а также множество реализаций этого интерфейса:
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingDeque
- LinkedBlockingQueue
- LinkedTransferQueue
- PriorityBlockingQueue
- SynchronousQueue
Тем не менее, было бы полезно самим реализовать простейшую блокирующую очередь для понимания ее работы. Причем наша реализация (для начала) должна быть самой простой. Реализации в JDK намного более сложные, чем наша примитивная реализация, которая приведена ниже. Затем можно было бы детально разобрать реализации блокирующих очередей в Java. Впрочем, разбор реализаций в Java мы оставим когда-нибудь на потом, а сейчас займемся собственной простейшей реализацией блокирующей очереди.
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void put(Object item) throws InterruptedException {
while (this.queue.size() == this.limit) {
wait();
}
if (this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
public synchronized Object take() throws InterruptedException{
while (this.queue.size() == 0){
wait();
}
if (this.queue.size() == this.limit){
notifyAll();
}
return this.queue.remove(0);
}
}
Рассмотрим метод put:
public synchronized void put(Object item) throws InterruptedException {
while (this.queue.size() == this.limit) {
wait();
}
if (this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
Если очередь заполнена, то поток ставится в ожидание. Если же очередь пустая, то вызывается метод notifyAll, который пробуждает потоки,
которые поставлены в ожидание при получении элементов из очереди. Аналогично работает метод take.
Обратите внимание, что в нашей простейшей реализации методы put, take являются synchronized в отличие от реализаций в JDK. Это сделано из двух соображений:
- простота
- желание обойтись только элементарными примитивами синхронизации в JDK, а именно методами wait, notifyAll
Давайте промоделируем работу очереди размеров в 5 элементов. Пусть у нас есть один поток (Producer), который, скажем, каждые 2 миллисекунды создает и кладет новый объект в очередь (пока что не подключаем поток, который забирает элементы из очереди). Очевидно, что через 10 миллисекунд, после того как очередь вся будет заполнена, при попытке положить шестой по счету элемент в очередь, этот поток будет поставлен в ожидание вот этим кусом кода из метода put:
while (this.queue.size() == this.limit) {
wait();
}
Теперь подключим второй поток, который будет забирать элементы из очереди (Consumer).
Он определит, что очередь полная и пошлет notification всем ожидающим потокам. Это произойдет в следующем куске кода метода take:
if (this.queue.size() == this.limit){
notifyAll();
}
После этого поток заберет первый элемент из списка, и вернет его в методе take,
выйдя таким образом из этого (synchronized!!!) метода и освободив монитор, ассоциированный c очередью.
Затем монитор захватит поток Producer, пробудившийся от ожидания, и продолжит класть элементы в очередь.
Теперь приведем полный код демо-примера:
import java.util.LinkedList; import java.util.List; public class BlockingQueue{ private List queue = new LinkedList (); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void put(T item) throws InterruptedException { System.out.println("[BlockingQueue] try to put: " + item ); while (this.queue.size() == this.limit) { System.out.println("[BlockingQueue] queue is full, waiting until space is free"); wait(); } if (this.queue.size() == 0) { System.out.println("[BlockingQueue] queue is empty, notify"); notifyAll(); } this.queue.add(item); System.out.println("[BlockingQueue] put ok: " + item ); } public synchronized T take() throws InterruptedException{ System.out.println("[BlockingQueue] try to take"); while (this.queue.size() == 0){ System.out.println("[BlockingQueue] queue is empty, waiting until smth is put"); wait(); } if (this.queue.size() == this.limit){ System.out.println("[BlockingQueue] queue is full, notify"); notifyAll(); } T item = this.queue.remove(0); System.out.println("[BlockingQueue] take ok: " + item ); return item; } }
import java.util.Random;
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new BlockingQueue(5);
new Thread(new Producer(queue)).start();
Thread.currentThread().sleep(1000);
new Thread(new Consumer(queue)).start();
}
static class Producer implements Runnable {
private final BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("[Producer] run");
while (true) {
try {
queue.put(produce());
Thread.currentThread().sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Integer produce() {
Integer i = new Random().nextInt(100);
System.out.println("[Producer] produce: " + i);
return i;
}
}
static class Consumer implements Runnable {
private final BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("[Consumer] run");
while (true) {
try {
consume();
Thread.currentThread().sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consume() throws InterruptedException {
Integer i = queue.take();
System.out.println("[Consumer] consumed: " + i);
}
}
}
Вывод консоли будет такой (первая часть, где Producer кладет первые пять элементов в очередь):
[Producer] run [Producer] produce: 86 [BlockingQueue] try to put: 86 [BlockingQueue] queue is empty, notify [BlockingQueue] put ok: 86 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 20 [BlockingQueue] try to put: 20 [BlockingQueue] put ok: 20 [Producer] produce: 84 [BlockingQueue] try to put: 84 [BlockingQueue] put ok: 84 [Producer] produce: 73 [BlockingQueue] try to put: 73 [BlockingQueue] put ok: 73 [Producer] produce: 67 [BlockingQueue] try to put: 67 [BlockingQueue] queue is full, waiting until space is freeЗатем вступает в работу Consumer:
[Consumer] run [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 86 [Consumer] consumed: 86 [BlockingQueue] put ok: 67 [Producer] produce: 63 [BlockingQueue] try to put: 63 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 73 [Consumer] consumed: 73 [BlockingQueue] put ok: 63 [Producer] produce: 90 [BlockingQueue] try to put: 90 [BlockingQueue] queue is full, waiting until space is free [BlockingQueue] try to take [BlockingQueue] queue is full, notify [BlockingQueue] take ok: 20 [Consumer] consumed: 20 [BlockingQueue] put ok: 90 [Producer] produce: 57 [BlockingQueue] try to put: 57 [BlockingQueue] queue is full, waiting until space is freeНа этом куске вывода хорошо видно, как Consumer и Producer попеременно используют очередь для обмена данными.
пятница, 7 марта 2014 г.
Демо-проект: Apache CXF, Jetty, MySQL
Начальная задача: сделать сервер, который позволяет читать состояние счета по его ID, а также добавлять/снимать средства со счета. Состояния счетов хранятся с БД. Сервер должен работать в нагруженное среде, поэтому нужно добавить кеширование. Транспортный протокол может быть любой. Я выбрал веб-сервисы, реализация веб-сервисов была сделана с помощью Apache CXF. После запуска сервера WSDL можно посмотреть по адресу: http://localhost:9000/accountService?wsdl
Вторая часть задачки: сделать клиента, который в несколько потоков читает/пишет значения счетов. Кол-во читателей/писателей, а также множество идентификаторов счетов задается параметрами клиента (например, из командной строки).
Третья часть задачки: сервер должен статистику работы скидывать в файл. Статистика должна быть такой: общее кол-во обработанных запросов, а также удельное кол-во запросов в ед. времени (т.е. rps - requests per second). Нужно предусмотреть возможность сброса статсы в ноль. Сброс статсы в ноль я сделал с помощью вызова HTTP-хендлера, который и обрабатывается Jetty. Сброс статсы осуществляется HTTP GET-ом урлы http://localhost:9001/zero. По урлу http://localhost:9001/getStats получаем текущее значение rps.
Несколько моментов про реализацию. Итак, рассмотрим реализацию AccountServiceImpl:
@WebService(endpointInterface = "net.iryndin.clientserverdemo1.commons.api.AccountService",serviceName = "AccountService")
public class AccountServiceImpl implements AccountService {
private Map cache = new ConcurrentHashMap<>();
@Override
public Long getAmount(Integer id) {
AtomicLong balance = cache.get(id);
statsHelper.incrQueries();
if (balance == null) {
return 0L;
} else {
return balance.get();
}
}
@Override
public void addAmount(final Integer id, Long value) throws Exception {
AtomicLong balance = cache.get(id);
if (balance == null) {
balance = new AtomicLong(0);
cache.put(id, balance);
}
balance.addAndGet(value);
final AtomicLong finalBalance = balance;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
accountDao.save(id, finalBalance.get());
} catch (SQLException e) {
e.printStackTrace();
}
}
});
statsHelper.incrQueries();
}
}
Видно, что чтение и запись идет прямиком к кеш, т.е. скорость чтения и записи определяется скоростью работы кеша. Запись в базу запускается в отдельном потоке. Если бы она запускалась из этого же потока, то и производительность записи была бы совсем другой. На самом деле, такое решение, конечно, не всегда приемлемо, но поскольку это тестовый проект, я сделал именно так.
Теперь рассмотрим моменты реализации клиента. См. файл MainClient:
private static void runReaders(int count) {
if (count <=0) {
System.out.println("No readers run");
return;
}
ExecutorService executorService = Executors.newFixedThreadPool(count);
System.out.println("Run readers: " + count);
while (count > 0) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (running) {
accountService.getAmount(idsHelper.getRandomId());
}
}
});
count--;
}
}
private static void runWriters(int count) {
if (count <=0) {
System.out.println("No writers run");
return;
}
ExecutorService executorService = Executors.newFixedThreadPool(count);
System.out.println("Run writers: " + count);
while (count > 0) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (running) {
boolean positive = new Random().nextBoolean();
int balance = new Random().nextInt(100000);
balance = positive ? balance : -balance;
try {
accountService.addAmount(idsHelper.getRandomId(), (long) balance);
} catch (Exception e) {
//
}
}
}
});
count--;
}
}
Мы запускаем потоки по числу rCount, wCount и крутимся там в бесконечном цикле (в бесконечном, так как переменную running в коде никто не меняет). Все этот создает неплохую нагрузку на сервер.
Последнее, рассмотрим скрипт на Python 3 (measure.py), который запускает клиент с различными значениями параметров rCount, wCount и измеряет производительность (просто считывает данные с урла http://localhost:9001/getStats):
import subprocess
import time
import urllib.request
def main():
readers = [0, 20, 40, 60, 80, 100]
writers = [0, 20, 40, 60, 80, 100]
output = open('rps.txt','w', 1)
for r in readers:
for w in writers:
run_client(r,w,output)
print('Done reader=%d, writer=%d' % (r,w))
output.close()
def run_client(readers_qty, writers_qty, output):
if readers_qty==0 and writers_qty==0:
return
proc = subprocess.Popen(['java', '-jar', '-Xms2G', '-Xmx8G', 'client.jar', str(readers_qty), str(writers_qty)], shell=False)
time.sleep(1)
f = urllib.request.urlopen('http://localhost:9001/zero')
time.sleep(30)
f = urllib.request.urlopen('http://localhost:9001/getStats')
s = f.read()
rps = int(s)
print('rps=%d' % rps)
output.write('(%d,%d): %d\n' % (readers_qty,writers_qty,rps))
proc.terminate()
time.sleep(5)
if __name__ == "__main__":
main()
В результате, мы получаем следующую табличку (по горизонтали: число wCount, по вертикали: rCount, значения в ячейках: requests-per-second):
| rCount/wCount | 0 | 20 | 40 | 60 | 80 | 100 |
|---|---|---|---|---|---|---|
| 0 | - | 12197 | 12319 | 12865 | 12648 | 13614 |
| 20 | 13324 | 13240 | 13049 | 12657 | 12499 | 12858 |
| 40 | 13198 | 12249 | 12676 | 13112 | 12882 | 12442 |
| 60 | 12420 | 12250 | 12348 | 12928 | 12534 | 13013 |
| 80 | 12531 | 12934 | 12304 | 12428 | 13133 | 12978 |
| 100 | 12972 | 13204 | 11869 | 11541 | 12781 | 12496 |