суббота, 14 ноября 2009 г.

ECF: Распределенная обработка событий


Продолжаем разговор. Нынешняя заметка будет посвящена проблеме организации распределенной обработки событий с помощью механизмов, которые предоставляет Eclipse Communication Framework.

Подробно про обработку событий в OSGi-среде я уже писал. Повторяться не буду, сконцентрируюсь только на особенностях, позволяющих сделать этот механизм распределенным, т.е. генерироваться события будут в бандле, запущенном на одной системе, а обрабатываться - в бандле, запущенном, соответственно, на другой.

Вся обработка событий в OSGi базируется на интерфейсе EventAdmin. ECF предлагает свою реализацию этого интерфейса - класс DistributedEventAdmin, который расположен в бандле org.eclipse.ecf.remoteservice.eventadmin. К сожалению, данный бандл не входит в поставку ECF 3.0/3.1, поэтому его придется взять на CVS-сервере: pserver://dev.eclipse.org//cvsroot/rt в каталоге org.eclipse.ecf/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin.

Работает этот механизм так: DistributedEventAdmin представляет собой SharedObject. Как мы уже говорили, такие объекты копируются между ECF-контейнерами и могут обмениваться сообщениями. Соответственно, все вызовы sendEvent и postEvent - это отправка сообщений между репликами объекта DistributedEventAdmin (синхронная и асинхронная, соответственно).


ECF обладает интересной особенностью: если мы зарегистрируем в разных контейнерах SharedObject'ы под одним и тем же ID, то они будут считаться репликами и между ними будет возможен обмен сообщениями. Это важно, потому что DistributedEventAdmin не умеет автоматически создавать свои реплики (не переопределяет метод initialize() из класса BaseSharedObject), поэтому требуется его регистрировать и в контейнере-источнике событий и в контейнере-приемнике, причем под одним и тем же именем.

Итак, в бандле-источнике событий нужно сделать следующие:

1. Создать серверный контейнер или создать клиентский и подключиться к используемому серверу:

public static final String GENERIC_SERVER_ID = "ecftcp://localhost:4280/mygroup";



public static final String GENERIC_SERVER_CONTAINER = "ecf.generic.server";



// ...

container = getContainerManager().getContainerFactory().createContainer(GENERIC_SERVER_CONTAINER,

        getID(GENERIC_SERVER_ID));



// ...

public static ID getID(String id)

{

    return IDFactory.getDefault().createStringID(id);

}


2. Создать экземляр класса DistributedEventAdmin, расшарить его и стартовать:

DistributedEventAdmin eventAdmin = new DistributedEventAdmin(context);

ISharedObjectContainer so = (ISharedObjectContainer) container.getAdapter(ISharedObjectContainer.class);

so.getSharedObjectManager().addSharedObject(getID(name), eventAdmin, null);



eventAdmin.start();

 


3. Зарегистрировать eventAdmin как сервис. Если мы хотим, чтобы он мог отправлять события с любыми топиками, то так:

context.registerService(EventAdmin.class.getName(), eventAdmin, null);


Если же мы хотим ограничить топики событий, то так:

Properties props = new Properties();

props.put(EventConstants.EVENT_TOPIC, topics);

context.registerService(EventAdmin.class.getName(), eventAdmin, props);

 


Все, после этого можно генерировать события. Они будут отправляться на все реплики DistributedEventAdmin.

Теперь разберемся с обработкой событий. Здесь все несколько проще. Нам нужно:

1. Подключиться к серверу (это может быть контейнер, через который генерируют события или сервер, к которому он подключен).

2. Создать экземляр класса DistributedEventAdmin, расшарить его и стартовать.

3. Зарегистрировать обработчик событий:

protected Dictionary<String, Object> getHandlerServiceProperties(String... topics)

{

    Dictionary<String, Object> result = new Hashtable<String, Object>();

    result.put(EventConstants.EVENT_TOPIC, topics);

    return result;

}



// ...

eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(),

        new DemoEventHandler(), getHandlerServiceProperties("name/samolisov/ecf/*"));

 


В принципе, никто не мешает написать своего наследника DistributedEventAdmin, который бы умел автоматически реплицироваться на все контейнеры, подключенные к контейнеру-источнику событий. В случае, если не ожидается подключения новых контейнеров, т.е. если репликация осуществляется только один раз - это вполне оправданное решение. Но это я оставляю вам как упражнение :)

Естественно, что контейнеры не обязательно должны быть типа ecf.generic.client и ecf.generic.server. Это могут быть любые контейнеры, реализующие SharedObject API, например XMPP, ActiveMQ, Weblogic и т.д.

В примерах к статье находятся так же простейший генератор событий, представляющий собой отдельный поток, в котором раз в 5 секунд вызывается метод postEvent, и простейший обработчик, выводящий информацию о полученном событии в OSGi-консоль.

Понравилось сообщение - подпишитесь на блог или читайте меня в twitter

4 комментария:

  1. Какова будет производительность данной схемы? Сколько тысяч событий в секунду она сможет пропускать через себя?

    ОтветитьУдалить
  2. Сходу не скажу, надо пробовать.

    ОтветитьУдалить
  3. К сожалению полный архив исходников не сохранился.

    ОтветитьУдалить

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!