Общее·количество·просмотров·страницы

Java Dev Notes - разработка на Java (а также на JavaScript/Python/Flex и др), факты, события из АйТи

четверг, 26 августа 2010 г.

Блокировка чтения/записи. Часть 1.

Read/write Locks

По материалам Read/Write Locks in Java

Условия:

Имеются общие данные, к которым потоки обращаются на чтение и изменение (запись). При этом изменение данных происходит относительно редко. Гораздо чаще происходит их чтение.

Несколько потоков, которые читают данные, не изменяя их, не создают проблем друг другу. Поэтому нет смысла их синхронизировать - ведь они не мешают друг другу. Но если хотя бы один поток начал изменение данных, то все другие потоки (читающие и изменяющие данные) должны подождать, пока этот поток не закончит работу. Т.е. мы можем иметь множественных одновременных читателей данных, но лишь один поток, который меняет данные.

Задача:

Требуется эффективно синхронизировать читающие и изменяющие потоки между собой.

Решение:

Для начала, еще раз подытожим условия блокировки:

Доступ для чтения: если нет потоков, которые изменяют данные, а также нет потоков, которые запросили доступ для изменения данных.

Доступ для записи: если нет потоков, которые читают или изменяют данные.

Если поток хочет прочитать данные, то это позволяется, если ни один поток не изменяет данные и ни один поток не запросил доступ на запись данных. Мы отдаем записи данных более высокий приоритет, чем чтению. Если бы мы так не делали, то могло бы возникнуть "голодание" (starvation) потоков, изменяющих данные (при условии, что запросы на чтение данных происходят чаще, чем запросы на запись данных). Потоки, запрашивающие доступ на запись, должны были бы ждать, пока не отработают потоки, читающие данные. Так как запросов на чтение много, то новые читающие потоки добавлялись бы постоянно, что привело бы в долговременному (а то и вообще бесконечному) простою потоков на запись. Поэтому поток может получить разрешение на чтение, только если ни один поток не изменяет данные и ни один поток не запросил доступ на запись.

Поток, который хочет получить доступ к записи данных, получает этот доступ, если только ни один поток не читает и не пишет данные. Приэтом не имеет значения, сколько поток запросило доступ на запись, так как мы хотим обеспечить равные шансы всем этим потокам.

Поэтому мы можем написать следующие код:

public class ReadWriteLock{
 
private int readers = 0;
private int writers = 0;
private int writeRequests = 0;
 
public synchronized void lockRead() throws InterruptedException{
while(writers > 0 || writeRequests > 0){
wait();
}
readers++;
}
 
public synchronized void unlockRead(){
readers--;
notifyAll();
}
 
public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
 
while(readers > 0 || writers > 0){
wait();
}
writeRequests--;
writers++;
}
 
public synchronized void unlockWrite() {
writers--;
notifyAll();
}
}


Здесь мы имеем два метода для начала блокировки, и два метода для окончания блокировки. Код, который мы реализовали, не подразумевает возвратного блокирования (reentrant locking).

Ниже приведен пример использования блокировки чтения/записи для потоко-безопасного кеша:

public class Cache {
HashMap<Integer,String> map = new HashMap<Integer,String>();
ReadWriteLock lock = new ReadWriteLock();
 
public String get(Integer key) throws CacheException {
try {
lock.lockRead();
return map.get(key);
} catch (InterruptedException ie) {
throw new CacheException(ie);
} finally {
lock.unlockRead();
}
}
 
public void put(Integer key, String value) throws CacheException {
try {
lock.lockWrite();
map.put(key,value);
} catch (InterruptedException ie) {
throw new CacheException(ie);
} finally {
lock.unlockWrite();
}
}
 
public void remove(Integer key) throws CacheException {
try {
lock.lockWrite();
map.remove(key);
} catch (InterruptedException ie) {
throw new CacheException(ie);
} finally {
lock.unlockWrite();
}
}
}

вторник, 24 августа 2010 г.

Многопоточный сервер

Рассмотрим создание многопоточного сервера. Наш сервер будет уметь немногое: для каждого нового соединения будет создаваться отдельный поток, который будет печатать фразу "jdevnotes multithreaded Server runs" и закрывать соединение. Естественно, что сервер будет слушать какой-либо порт - пусть это будет порт 9000.

Как нам завершать работу сервера? Для этого применим следующий прием: стоп-монитор. Стоп-монитор будет слушать отдельный порт (назовем его стоп-портом) - пусть это будет порт 9001. Если произошло соединение с этим портом и на стоп-монитор отправлена любая последовательность байтов, то стоп-монитор дает команду на прекращение работы. Естественно, что стоп-монитор работает в отдельном потоке.

Сервер будет состоять из 4 файлов:
Main.java - содержит метод main,
StopMonitor.java - содержит стоп-монитор,
MultiThreadedServer.java - основной поток сервера, запускает рабочие потоки, которые и пишут/читают данные для каждого соединения,
Worker.java - рабочий поток.

Код нашего метода main будет таков (Main.java):

public class Main {
public static final int PORT_WORK = 9000;
public static final int PORT_STOP = 9001;
 
public static void main(String[] args) {
MultiThreadedServer server = new MultiThreadedServer(PORT_WORK);
new Thread(server).start();
try {
Thread monitor = new StopMonitor(PORT_STOP);
monitor.start();
monitor.join();
System.out.println("Right after join.....");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Stopping Server");
server.stop();
}
}
 


Что здесь происходит: стартуем MultiThreadedServer, затем стартуем стоп-монитор и присоединяем (join) его к текущему потоку. Теперь текущий поток (т.е. код метода main) будет ждать завершения потока стоп-монитора. И строчка

System.out.println("Right after join.....");

выполнится только после того, как поток стоп-монитора завершится. А завершится он только тогда, когда будет соединение на стоп-порт и на стоп-порт будет отправлена какай-нибудь последовательность байтов.

Смотрим код стоп-монитора (StopMonitor.java):

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
 
public class StopMonitor extends Thread {
 
private ServerSocket serverSocket;
 
public StopMonitor(int port) {
setDaemon(true);
setName("StopMonitor");
try {
serverSocket = new ServerSocket(port, 1, InetAddress.getByName("127.0.0.1"));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
 
@Override
public void run() {
System.out.println("stop monitor thread listening on: "+ serverSocket.getInetAddress()+":"+serverSocket.getLocalPort());
Socket socket;
try {
socket = serverSocket.accept();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
reader.readLine();
System.out.println("stop signal received, stopping server");
socket.close();
serverSocket.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
 


Код MultiThreadedServer будет таков (MultiThreadedServer.java):

import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;
 
public class MultiThreadedServer implements Runnable{
 
protected int serverPort = 9000;
protected ServerSocket serverSocket = null;
protected boolean isStopped = false;
 
public MultiThreadedServer(int port){
this.serverPort = port;
}
 
@Override
public void run(){
openServerSocket();
while(! isStopped()){
Socket clientSocket = null;
try {
clientSocket = this.serverSocket.accept();
} catch (IOException e) {
if(isStopped()) {
System.out.println("Server Stopped.") ;
return;
}
throw new RuntimeException("Error accepting client connection", e);
}
new Thread(
new Worker(clientSocket)
).start();
}
System.out.println("Server Stopped.") ;
}
 
 
private synchronized boolean isStopped() {
return this.isStopped;
}
 
public synchronized void stop(){
this.isStopped = true;
try {
this.serverSocket.close();
} catch (IOException e) {
throw new RuntimeException("Error closing server", e);
}
}
 
private void openServerSocket() {
System.out.println("Opening server socket...");
try {
this.serverSocket = new ServerSocket(this.serverPort);
} catch (IOException e) {
throw new RuntimeException("Cannot open port " + this.serverPort, e);
}
}
 
}


Код рабочего потока (Worker.java):

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
 
 
public class Worker implements Runnable {
 
protected Socket clientSocket = null;
 
public Worker(Socket clientSocket) {
this.clientSocket = clientSocket;
}
 
@Override
public void run() {
try {
InputStream input = clientSocket.getInputStream();
OutputStream output = clientSocket.getOutputStream();
long time = System.currentTimeMillis();
output.write("jdevnotes multithreaded server runs\n".getBytes());
output.close();
input.close();
System.out.println("Request processed: " + time);
} catch (IOException e) {
e.printStackTrace();
}
}
}
 


Запустив наше приложение, подсоединимся телнетом на порт 9000 локалхоста. Получим ответ:


$ telnet localhost 9000
Trying ::1...
Connected to localhost.
Escape character is '^]'.
jdevnotes multithreaded server runs
Connection closed by foreign host.


Теперь опять же телнетом зайдем на порт 9001 и напишем любую фразу ("ааа"):

$ telnet localhost 9001
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaa
Connection closed by foreign host.


После этого произойдет закрытие сокета на порту 9001, окончание работы стоп-монитора, переход выполнения на главный поток (код метода main) и завершение программы. На этом все....

Постоянные читатели