Продолжаем разговор. Нынешняя заметка будет посвящена проблеме организации распределенной обработки событий с помощью механизмов, которые предоставляет Eclipse Communication Framework.
Подробно про обработку событий в OSGi-среде я уже писал. Повторяться не буду, сконцентрируюсь только на особенностях, позволяющих сделать этот механизм распределенным, т.е. генерироваться события будут в бандле, запущенном на одной системе, а обрабатываться - в бандле, запущенном, соответственно, на другой.
Вся обработка событий в OSGi базируется на интерфейсе EventAdmin. ECF предлагает свою реализацию этого интерфейса - класс DistributedEventAdmin, который расположен в бандле org.eclipse.ecf.remoteservice.eventadmin. К сожалению, данный бандл не входит в поставку ECF 3.0/3.1, поэтому его придется взять на CVS-сервере: pserver://dev.eclipse.org//cvsroot/rt в каталоге org.eclipse.ecf/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin.
Работает этот механизм так: DistributedEventAdmin представляет собой SharedObject. Как мы уже говорили, такие объекты копируются между ECF-контейнерами и могут обмениваться сообщениями. Соответственно, все вызовы sendEvent и postEvent - это отправка сообщений между репликами объекта DistributedEventAdmin (синхронная и асинхронная, соответственно).
ECF обладает интересной особенностью: если мы зарегистрируем в разных контейнерах SharedObject'ы под одним и тем же ID, то они будут считаться репликами и между ними будет возможен обмен сообщениями. Это важно, потому что DistributedEventAdmin не умеет автоматически создавать свои реплики (не переопределяет метод initialize() из класса BaseSharedObject), поэтому требуется его регистрировать и в контейнере-источнике событий и в контейнере-приемнике, причем под одним и тем же именем.
Итак, в бандле-источнике событий нужно сделать следующие:
1. Создать серверный контейнер или создать клиентский и подключиться к используемому серверу:
public static final String GENERIC_SERVER_ID = "ecftcp://localhost:4280/mygroup";
public static final String GENERIC_SERVER_CONTAINER = "ecf.generic.server";
// ...
container = getContainerManager().getContainerFactory().createContainer(GENERIC_SERVER_CONTAINER,
getID(GENERIC_SERVER_ID));
// ...
public static ID getID(String id)
{
return IDFactory.getDefault().createStringID(id);
}
public static final String GENERIC_SERVER_CONTAINER = "ecf.generic.server";
// ...
container = getContainerManager().getContainerFactory().createContainer(GENERIC_SERVER_CONTAINER,
getID(GENERIC_SERVER_ID));
// ...
public static ID getID(String id)
{
return IDFactory.getDefault().createStringID(id);
}
2. Создать экземляр класса DistributedEventAdmin, расшарить его и стартовать:
DistributedEventAdmin eventAdmin = new DistributedEventAdmin(context);
ISharedObjectContainer so = (ISharedObjectContainer) container.getAdapter(ISharedObjectContainer.class);
so.getSharedObjectManager().addSharedObject(getID(name), eventAdmin, null);
eventAdmin.start();
ISharedObjectContainer so = (ISharedObjectContainer) container.getAdapter(ISharedObjectContainer.class);
so.getSharedObjectManager().addSharedObject(getID(name), eventAdmin, null);
eventAdmin.start();
3. Зарегистрировать eventAdmin как сервис. Если мы хотим, чтобы он мог отправлять события с любыми топиками, то так:
context.registerService(EventAdmin.class.getName(), eventAdmin, null);
Если же мы хотим ограничить топики событий, то так:
Properties props = new Properties();
props.put(EventConstants.EVENT_TOPIC, topics);
context.registerService(EventAdmin.class.getName(), eventAdmin, props);
props.put(EventConstants.EVENT_TOPIC, topics);
context.registerService(EventAdmin.class.getName(), eventAdmin, props);
Все, после этого можно генерировать события. Они будут отправляться на все реплики DistributedEventAdmin.
Теперь разберемся с обработкой событий. Здесь все несколько проще. Нам нужно:
1. Подключиться к серверу (это может быть контейнер, через который генерируют события или сервер, к которому он подключен).
2. Создать экземляр класса DistributedEventAdmin, расшарить его и стартовать.
3. Зарегистрировать обработчик событий:
protected Dictionary<String, Object> getHandlerServiceProperties(String... topics)
{
Dictionary<String, Object> result = new Hashtable<String, Object>();
result.put(EventConstants.EVENT_TOPIC, topics);
return result;
}
// ...
eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(),
new DemoEventHandler(), getHandlerServiceProperties("name/samolisov/ecf/*"));
{
Dictionary<String, Object> result = new Hashtable<String, Object>();
result.put(EventConstants.EVENT_TOPIC, topics);
return result;
}
// ...
eventHandlerRegistration = bundleContext.registerService(EventHandler.class.getName(),
new DemoEventHandler(), getHandlerServiceProperties("name/samolisov/ecf/*"));
В принципе, никто не мешает написать своего наследника DistributedEventAdmin, который бы умел автоматически реплицироваться на все контейнеры, подключенные к контейнеру-источнику событий. В случае, если не ожидается подключения новых контейнеров, т.е. если репликация осуществляется только один раз - это вполне оправданное решение. Но это я оставляю вам как упражнение :)
Естественно, что контейнеры не обязательно должны быть типа ecf.generic.client и ecf.generic.server. Это могут быть любые контейнеры, реализующие SharedObject API, например XMPP, ActiveMQ, Weblogic и т.д.
В примерах к статье находятся так же простейший генератор событий, представляющий собой отдельный поток, в котором раз в 5 секунд вызывается метод postEvent, и простейший обработчик, выводящий информацию о полученном событии в OSGi-консоль.
Понравилось сообщение - подпишитесь на блог или читайте меня в twitter
4 комментария:
Какова будет производительность данной схемы? Сколько тысяч событий в секунду она сможет пропускать через себя?
Сходу не скажу, надо пробовать.
где исходники взять можно ?
К сожалению полный архив исходников не сохранился.
Отправить комментарий
Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!