Сегодня я хочу рассмотреть один из наиболее распространенных подходов к построению глобальной архитектуры приложения - подход, основанный на генерации и последующей обработке событий. По большому счету, событийную модель можно использовать и для организации некой части приложения (например - обработки транзакций). Но в данной статье буду говорить о приложении в целом.
Большинство приложений в процессе своей работы постоянно реагируют на те или иные события. Это может быть щелчок пользователем по иконке, получение http-запроса, получение сигнала от датчика, завершение коммита транзакции и т.д. Реакция приложения проявляется в выполнении некоторых действий (отрисовка окна, отправка http-ответа, отправка сигнала исполнительному устройству, запуск BPEL-процесса и т.д.). Так вот, суть в том, что никто не мешает перенести такое поведение на уровень архитектуры приложения, т.е. организовав не только его внешнее поведение в соответствии с событийной моделью, но и внутреннее строение.
Кто работал с WinAPI или с Java Swing, тот должен быть хорошо знаком с такой моделью построения программы. В приложении есть некоторые агенты - события, которые генерируются в ответ на внешнее воздействие. Есть обработчики событий - код, который непосредственно делает что-то полезное. И есть то, что сводит гору и Магомета - диспетчер событий. Именно он вызывает те или иные обработчики в ответ на возникновение тех или иных событий.
Все приложение при этом строится в виде цикла обработки событий (в терминах WinAPI - цикл обработки сообщений, там события называются сообщениями).
Таким образом алгоритм работы приложения, основанного на событийной модели, следующий:
1. Зарегистрировать обработчики событий в диспетчере событий.
2. В ответ на внешнее воздействие сгенерировать событие.
3. Послать событие диспетчеру событий.
4. Диспетчер событий принимает событие и ищет для него обработчик (если тот зарегистрирован).
5. Диспетчер событий вызывает обработчик.
6. Перейти на пункт 2.
Причем вот что интересно: посылать событие диспетчеру и принимать его в нем можно асинхронно. Это позволяет распараллеливать процессы генерации и обработки событий. Для обмена событиями между такими процессами используется очередь событий.
В чем же здесь профит? Если еще раз представить себе систему, построенную на базе событийной модели, то становится понятно следующее:
1. Весь код, который делает что-то полезное, вынесен в независимые друг от друга обработчики событий. Тем самым сильно увеличивается связность (cohesion) кода и уменьшается его связанность (сопряжение, coupling).
2. Разделена логика процесса выполнения программы и управления этим процессом. При этом, код, относящийся к управлению, собран в одном месте, что позволяет модифицировать его независимо от кода, который непосредственно делает что-то полезное. Можно легко сделать так, чтобы каждый обработчик события запускался в новом потоке или наоборот - все обработчики событий работали в одном потоке.
3. Если обработчики одного и того же события независимы друг от друга - их можно очень легко параллелить. Обработчики разных событий можно параллелить почти всегда.
4. Так как код слабосвязан, то он хорошо ложится на модель распределенных вычислений. Т.е. обработчики событий можно разнести по отдельным узлам сети. Это - удобный момент при разработке приложения, которое должно работать на кластере.
5. Приложение легко расширять путем регистрации новых обработчиков существующих событий и/или добавлением новых событий.
А теперь слайды. Давайте бегло рассмотрим небольшой пример - http-сервер на Java. Пусть наш сервер будет принимать запросы от пользователей и генерировать событие в зависимости от типа запроса и расширения запрашиваемого файла. Далее, диспетчер событий будет искать зарегистрированные обработчики и вызывать их.
Цикл ожидания соединения от клиентов и цикл обработки сообщений разнесем на разные потоки. Общаться друг с другом эти потоки будут с помощью очереди сообщений (EventsQueue). В нашем случае ее код почти тривиален, единственная особенность - очередь сообщений реализует паттерн синглтон:
package org.beq.httpd.eventsmanager;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.beq.httpd.events.AbstractEvent;
public class EventsQueue {
private static EventsQueue _instance = null;
private Queue<AbstractEvent> _queue = new ConcurrentLinkedQueue<AbstractEvent>();
private EventsQueue() {}
public static EventsQueue getInstance() {
if (_instance == null)
_instance = new EventsQueue();
return _instance;
}
@SuppressWarnings("unchecked")
public synchronized <T extends AbstractEvent> T fetchEvent() {
return (T) _queue.poll();
}
public synchronized void putEvent(AbstractEvent event) {
_queue.add(event);
System.out.println("event with class: " + event.getClass().getName() + " added");
System.out.println("queue size: " + _queue.size());
}
}
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.beq.httpd.events.AbstractEvent;
public class EventsQueue {
private static EventsQueue _instance = null;
private Queue<AbstractEvent> _queue = new ConcurrentLinkedQueue<AbstractEvent>();
private EventsQueue() {}
public static EventsQueue getInstance() {
if (_instance == null)
_instance = new EventsQueue();
return _instance;
}
@SuppressWarnings("unchecked")
public synchronized <T extends AbstractEvent> T fetchEvent() {
return (T) _queue.poll();
}
public synchronized void putEvent(AbstractEvent event) {
_queue.add(event);
System.out.println("event with class: " + event.getClass().getName() + " added");
System.out.println("queue size: " + _queue.size());
}
}
Генерация событий реализована в классе EventsManager. EventsManager ищет в таблице зарегистрированных событий класс события по методу http-запроса и расширению запрашиваемого файла. Затем строит объект найденного класса с помощью reflection и помещает его в очередь событий:
package org.beq.httpd.eventsmanager;
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.http.IHTTPRequest;
import org.beq.httpd.http.IHTTPResponse;
import org.beq.httpd.http.Method;
public class EventsManager {
private static final EventsQueue EVENTS_QUEUE = EventsQueue.getInstance();
private EventsTable _table = new EventsTable();
public void generateEventAndPutInQueue(Method method, String extname,
IHTTPRequest req, IHTTPResponse resp) throws EventGenerationException {
Class<?> clazz = _table.getEventClass(method, extname);
try {
Class<?>[] constructorParams = new Class<?>[] {IHTTPRequest.class, IHTTPResponse.class};
Object[] params = new Object[] {req, resp};
AbstractEvent event = (AbstractEvent) clazz.getConstructor(constructorParams).newInstance(params);
EVENTS_QUEUE.putEvent(event);
}
catch (Exception e) {
throw new EventGenerationException("Could not create event by class: " + clazz,
clazz, e);
}
}
}
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.http.IHTTPRequest;
import org.beq.httpd.http.IHTTPResponse;
import org.beq.httpd.http.Method;
public class EventsManager {
private static final EventsQueue EVENTS_QUEUE = EventsQueue.getInstance();
private EventsTable _table = new EventsTable();
public void generateEventAndPutInQueue(Method method, String extname,
IHTTPRequest req, IHTTPResponse resp) throws EventGenerationException {
Class<?> clazz = _table.getEventClass(method, extname);
try {
Class<?>[] constructorParams = new Class<?>[] {IHTTPRequest.class, IHTTPResponse.class};
Object[] params = new Object[] {req, resp};
AbstractEvent event = (AbstractEvent) clazz.getConstructor(constructorParams).newInstance(params);
EVENTS_QUEUE.putEvent(event);
}
catch (Exception e) {
throw new EventGenerationException("Could not create event by class: " + clazz,
clazz, e);
}
}
}
Все события наследуются от AbstractEvent и обязательно имеют конструкторы с одинаковой сигнатурой. По большому счету, класс события - бин, инкапсулирующий всю информацию, нужную для его корректной обработки. Код класса AbstractEvent следующий:
package org.beq.httpd.events;
import org.beq.httpd.http.IHTTPRequest;
import org.beq.httpd.http.IHTTPResponse;
public abstract class AbstractEvent {
private IHTTPRequest request;
private IHTTPResponse response;
public AbstractEvent(IHTTPRequest request, IHTTPResponse response) {
this.request = request;
this.response = response;
}
public IHTTPRequest getRequest() {
return request;
}
public IHTTPResponse getResponse() {
return response;
}
}
import org.beq.httpd.http.IHTTPRequest;
import org.beq.httpd.http.IHTTPResponse;
public abstract class AbstractEvent {
private IHTTPRequest request;
private IHTTPResponse response;
public AbstractEvent(IHTTPRequest request, IHTTPResponse response) {
this.request = request;
this.response = response;
}
public IHTTPRequest getRequest() {
return request;
}
public IHTTPResponse getResponse() {
return response;
}
}
Приведу небольшой фрагмент кода, демонстрирующий работу цикла генерации сообщений:
EventsManager manager = new EventsManager();
Server server = new Server(parsePort(args[0]));
while (true)
{
Connection conn = server.accept();
//...
try
{
IHTTPRequest request = ...
IHTTPResponse response = ...
Method method = ...
String extname = ...
manager.generateEventAndPutInQueue(method, extname, request, response);
}
catch (EventGenerationException e) {
// ...
}
}
Server server = new Server(parsePort(args[0]));
while (true)
{
Connection conn = server.accept();
//...
try
{
IHTTPRequest request = ...
IHTTPResponse response = ...
Method method = ...
String extname = ...
manager.generateEventAndPutInQueue(method, extname, request, response);
}
catch (EventGenerationException e) {
// ...
}
}
Перейдем теперь к циклу обработки сообщений. Прежде всего рассмотрим диспетчер событий - EventsDispatcher:
package org.beq.httpd.dispatcher;
import java.util.List;
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.eventsmanager.EventsQueue;
import org.beq.httpd.handlers.IHandler;
public class EventsDispatcher implements Runnable {
private static final EventsQueue EVENTS_QUEUE = EventsQueue.getInstance();
private IHandlerRunner runner;
private IHandlersRegistry registry;
private Thread thread;
public EventsDispatcher(IHandlersRegistry registry, IHandlerRunner runner) {
this.runner = runner;
this.registry = registry;
thread = new Thread(this, "DISPATHER");
}
public void start() {
thread.start();
}
public void run() {
while (true) {
AbstractEvent event = EVENTS_QUEUE.fetchEvent();
if (event != null)
runHandlers(registry.getHandlers(event.getClass()), event);
}
}
@SuppressWarnings("unchecked")
private <T extends AbstractEvent> void runHandlers(List<IHandler<? extends AbstractEvent>> handlers, T event) {
for (IHandler<? extends AbstractEvent> handler: handlers)
runner.run((IHandler<T>) handler, event);
}
}
import java.util.List;
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.eventsmanager.EventsQueue;
import org.beq.httpd.handlers.IHandler;
public class EventsDispatcher implements Runnable {
private static final EventsQueue EVENTS_QUEUE = EventsQueue.getInstance();
private IHandlerRunner runner;
private IHandlersRegistry registry;
private Thread thread;
public EventsDispatcher(IHandlersRegistry registry, IHandlerRunner runner) {
this.runner = runner;
this.registry = registry;
thread = new Thread(this, "DISPATHER");
}
public void start() {
thread.start();
}
public void run() {
while (true) {
AbstractEvent event = EVENTS_QUEUE.fetchEvent();
if (event != null)
runHandlers(registry.getHandlers(event.getClass()), event);
}
}
@SuppressWarnings("unchecked")
private <T extends AbstractEvent> void runHandlers(List<IHandler<? extends AbstractEvent>> handlers, T event) {
for (IHandler<? extends AbstractEvent> handler: handlers)
runner.run((IHandler<T>) handler, event);
}
}
Особых сложностей здесь нет. Как видим, класс представлят собой отдельный поток, в методе run которого и происходит обработка сообщений. Каждый обработчик запускается с помощью класса, реализующего интерфейс IHandlerRunner. Таким образом процедура запуска абстрагирована, что позволяет легко ее изменять. Код интерфейса IHandlerRunner следующий:
package org.beq.httpd.dispatcher;
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.handlers.IHandler;
public interface IHandlerRunner {
public <T extends AbstractEvent> void run(IHandler<T> handler, T event);
}
import org.beq.httpd.events.AbstractEvent;
import org.beq.httpd.handlers.IHandler;
public interface IHandlerRunner {
public <T extends AbstractEvent> void run(IHandler<T> handler, T event);
}
Сам обработчик события представляет собой класс, реализующий интерфейс IHandler:
package org.beq.httpd.handlers;
import org.beq.httpd.events.AbstractEvent;
public interface IHandler<T extends AbstractEvent> {
public void onEvent(T event);
}
import org.beq.httpd.events.AbstractEvent;
public interface IHandler<T extends AbstractEvent> {
public void onEvent(T event);
}
Обработчики событий хранятся в соответствующем реестре. Фактически реестр представляет собой мэп, в котором каждому классу события соответствует список зарегитрированных для него обработчиков. Для удобства использования были написаны отдельные методы для регистрации и поиска обработчика.
Работа с реестром и запуск диспетчера событий могут выглядеть следующим образом:
IHandlersRegistry registry = new HandlersRegistry();
registry.registerHandler(StaticEvent.class, new SimpleStaticHandler());
// ...
EventsDispatcher dispatcher = new EventsDispatcher(registry, new SingleThreadHandlerRunner());
dispatcher.start();
registry.registerHandler(StaticEvent.class, new SimpleStaticHandler());
// ...
EventsDispatcher dispatcher = new EventsDispatcher(registry, new SingleThreadHandlerRunner());
dispatcher.start();
Конечно, можно вынести регистрацию обработчиков (как, собственно, и добавление новых событий) в отдельные сервисы и тем самым сделать приложение расширяемым с помощью, например, бандлов на OSGi-платформе. Но это уже тема для следующего разговора.
Понравилось сообщение - подпишитесь на блог или читайте меня в twitter
Связность объяснили как на английском, а далее используете из той же оперы термин, но уже без перевода. Нарушаете стиль )
ОтветитьУдалить"Тем самым сильно уменьшается связность (cohesion) кода (а coupling соответственно увеличивается)"
Я бы порекомендовал еще и код приложить в атачаменте:)
ОтветитьУдалитьЗачем в классе EventsQueue методы
ОтветитьУдалитьfetchEvent/putEvent объявлены как synchronized?
В данном конкретном примере synchronized не нужен - все равно помещать событие в очередь будет только один поток и извлекать тоже будет только один поток. Но вот на будущее synchronized будет полезен - чтобы не было так: один поток поместил значение, а на консоль вывели отладочную печать для другого потока.
ОтветитьУдалитьТак объект ConcurrentLinkedQueue thread-safe вроде и внешняя синхронизация не нужна.
ОтветитьУдалитьТам в методе putEvent есть еще логгирование.
ОтветитьУдалитьДа, кусок вывода на консоль я упустил.
ОтветитьУдалитьПисал я сервера, работающие на событийной модели -- кроме геморроя и ошибок ничего не получил. Вся проблема в асинхронности: может быть так, что на момент получения события данные уже неактуальны (естественно, транзакция завершилась). Сохранение порядка событий тоже пролематичен: видели в виндах неперерисованные области, а в свинге невалидированные компоненты при изменении размеров? Кроме того вся инфраструктура сервисов превращается в кашу. Правильно инициализировать такую структуру становится практически невозможным: если сервис создается динамически, важен также момент регистрации листенера, иначе он пропустит необходимое сообщение.
ОтветитьУдалитьСобытийная модель работает, когда компоненты-получатели событий изолированы и не возвращают данных. Например система удаленного логгинга подошла бы (пожалуй и все). Но чаще события используются в качестве хаков, чтобы добавить некую функциональность в цепочку выполнения, например post-processing.
>> Если сервис создается динамически, важен также момент регистрации листенера, иначе он пропустит необходимое сообщение.
ОтветитьУдалитьДа, такая проблема действительно есть, в частности, она очень актуальна для OSGi. Решением может быть накапливание событий в БД до их обработки.
>> Событийная модель работает, когда компоненты-получатели событий изолированы и не возвращают данных.
Это очень верная мысль! Именно, когда можно изолировать обработку тех или иных аспектов поведения системы нужно выделять события и их обработчики.
Про возвращаемые данные, правда, не совсем понял. По моему здесь важно кому мы возвращаем данные. Если диспетчеру событий, чтобы он на основании их что-то сделал, то и правда будут проблемы. Если же в поток пользователю (а чем это принципиально отличается от лога?) то особых проблем я пока не вижу.
UPD: неплохим примером систем, для реализации которых подходит EDA, являются различные Instant Messenger'ы и чатики. Как правильно заметил Антон - событийная модель хорошо работает в основном когда нужно обеспечить передачу потока данных в одну сторону. В случае Instant Messenger каждое входящее сообщение - это событие и наша задача показать его пользователю. С исходящими сообщениями можно работать как используя EDA, так и без оной.
ОтветитьУдалить