суббота, 7 ноября 2009 г.

ECF: Распределяем объекты между OSGi-фреймворками


Сегодня мы рассмотрим еще одну замечательную возможность, которую предоставляет нам Eclipse Communication Framework - обмен копиями объектов между бандлами, запущенными на разных экземплярах OSGi-фреймворка (т.е. на разных JVM), реализованную в виде SharedObject API. Данный механизм основан на понятии "репликация", суть которого применительно к ECF следующая: в контейнер добавляется экземпляр класса, реализующего интерфейс ISharedObject, который становится доступен всем другим контейнерам, подключенным к тому же серверу, что и донор. Фактически, в каждом клиентском контейнере создаются копии исходного объекта, которые называются репликами.

Прежде всего разберемся непосредственно с распределяемым объектом. В ECF реализован базовый класс BaseSharedObject, который содержит реализацию многих полезных возможностей, таких как идентификация, управление конфигурацией, логгирование, отправка и прием сообщений. Рекомендуется все свои распределяемые объекты наследовать от данного класса.

Наиболее интересными для нас методами являются BaseSharedObject#initialize(), описывающий стратегию репликации главного объекта (т.е. того объекта, который собственно копируется, для него метод BaseSharedObject#isPrimary() вернет true) и стратегию десериализации реплик из словаря свойств, и BaseSharedObject#getReplicaDescription(ID receiver), задающий порядок сериализации реплицируемого объекта в словарь свойств. Словарь свойств - это объект класса, реализующего Map<String, Serializable>. ECF умеет передавать объекты между контейнерами только в таком виде. Чем-то данный механизм напоминает словарь свойств события, которым оперирует EventAdmin. Данный механизм предоставляет программисту возможность наиболее гибко управлять процессом репликации своего объекта (например, определять какие поля не нужно копировать вовсе).

Стоит обратить внимание, что каждое реплицируемое поле класса должно иметь тип, реализующий интерфейс Serializable. Дело в том, что для сериализации полей класса ECF использует стандартные средства Java-платформы.


Давайте рассмотрим пример метода initialize():

    /* (non-Javadoc)

     * @see org.eclipse.ecf.core.sharedobject.BaseSharedObject#initialize()

     */


    protected void initialize() throws SharedObjectInitException

    {

        super.initialize();



        if (isPrimary())

        {

            // If primary, then add an event processor that handles activated

            // event by replicating to all current remote containers

            addEventProcessor(new IEventProcessor()

                {

                    public boolean processEvent(Event event)

                    {

                        if (event instanceof ISharedObjectActivatedEvent)

                        {

                            ISharedObjectActivatedEvent activated = (ISharedObjectActivatedEvent) event;

                            if (activated.getActivatedID().equals(getID()) && isConnected())

                            {

                                MySharedObject.this.replicateToRemoteContainers(null);

                            }

                        }

                        return false;

                    }

                });



            System.out.println("Primary(" + getContext().getLocalContainerID().getName() + ")");

        }

        else

        {

            // This is a replica, so deserialize properties from HashMap to Object

            _name = (String) getConfig().getProperties().get(NAME_PROPERTY);

            _property = (DemoProperty) getConfig().getProperties().get(PROPERTY_PROPERTY);

            System.out.println("Replica(" + getContext().getLocalContainerID().getName() + ")");

        }

    }


Чтобы реплицировать главный экземпляр SharedObject'а, мы регистрируем свой обработчик события ISharedObjectActivatedEvent. Обработчик достаточно прост: мы убеждаемся, что действие происходит именно с нами и вызываем метод replicateToRemoteContainers. Вообще, данный метод принимает массив удаленных контейнеров, которым мы хотим отправить наш объект. В случае, если передается null, - объект отправляется всем контейнерам, с которыми можно установить соединение.

Если же перед нами не главный объект, а его реплика, то мы извлекаем из конфигурации нужные поля и строим из них копию объекта (т.е. производим десериализацию).

Пример метода getReplicaDescription:

    /* (non-Javadoc)

     * @see org.eclipse.ecf.core.sharedobject.BaseSharedObject#getReplicaDescription(org.eclipse.ecf.core.identity.ID)

     */


    protected ReplicaSharedObjectDescription getReplicaDescription(ID receiver)

    {

        System.out.println("getReplicaDescription(receiver = " + receiver + ")");



        // Put primary state into properties and include in replica description

        // This is serialization SharedObject into HashMap

        Map<String, Serializable> properties = new HashMap<String, Serializable>();

        properties.put(NAME_PROPERTY, _name);

        properties.put(PROPERTY_PROPERTY, _property);

        return new ReplicaSharedObjectDescription(this.getClass(), getConfig().getSharedObjectID(),

                getConfig().getHomeContainerID(), properties);

    }

 


В качестве параметра в метод передается ID контейнера-получателя. Таким образом обеспечивается возможность строить сериализованное представление объекта в зависимости от ID получателя. Впрочем, в данном примере мы эту возможность не используем. В целом код метода тривиален: строим словарь параметров, по которому создаем ReplicaSharedObjectDescription.

Теперь научимся всем этим пользоваться. Прежде всего - рассмотрим процедуру добавления в контейнер распределяемого объекта. Общий алгоритм такой:

1. Создаем контейнер, тип которого реализует SharedObject API:

public static final String GENERIC_CLIENT_CONTAINER = "ecf.generic.client";

// ...

_client = ContainerFactory.getDefault().createContainer(GENERIC_CLIENT_CONTAINER);


2. Подключаемся к серверу:

private void connectClient() throws Exception

{

    connectClient(_client, createServerID(), null);

}



private ID createServerID() throws Exception

{

    return createNewID(GENERIC_SERVER_ID);

}



private void connectClient(IContainer containerToConnect, ID connectID, IConnectContext context)

        throws ContainerConnectException

{

    containerToConnect.connect(connectID, context);

}



private ID createNewID(String id)

{

    return IDFactory.getDefault().createStringID(id);

}


3. Добавляем распределяемый объект в контейнер:

private ISharedObjectManager getClientSOManager()

{

    return ((ISharedObjectContainer) _client).getSharedObjectManager();

}



// ...



ISharedObjectManager manager = getClientSOManager();

ID id = manager.addSharedObject(createNewID(OBJECT_ID),

    new MySharedObject(OBJECT_NAME, OBJECT_PROPERTY_ID, OBJECT_PROPERTY_A, OBJECT_PROPERTY_B), null);

System.out.println("Added new SharedObject with ID = " + id.getName());


Все взаимодействие распределяемого объекта и контейнера осуществляется посредством ISharedObjectManager. Данный интерфейс содержит методы для добавления, создания, удаления, подключения, отключения и получения распределяемых объектов. Код интерфейса достаточно хорошо документирован, поэтому останавливаться подробно на нем не будем.

Теперь рассмотрим процедуру получения реплики объекта и работы с нею. Вообще, стоит отметить, что SharedObject API подразумевает асинхронное взаимодействие. Т.е. в контейнере, который будет работать с репликами объектов, регистрируется обработчик события ISharedObjectActivatedEvent. Именно в этом обработчике и должен находиться код, принимающий копию распределяемого объекта. Например, такой:

if (event instanceof ISharedObjectActivatedEvent)

{

    try

    {

        ISharedObjectActivatedEvent ae = (ISharedObjectActivatedEvent) event;

        System.out.println("Took shared object with id = " + ae.getActivatedID().getName()

                + " to " + ae.getLocalContainerID().getName());

        ISharedObjectManager manager = getClientSOManager(index);

        ISharedObject sharedObject = manager.getSharedObject(ae.getActivatedID());

        System.out.println("Shared Object class = " + sharedObject.getClass().getName());



        if (sharedObject instanceof MySharedObject)

        {

            MySharedObject my = (MySharedObject) sharedObject;

            System.out.println("Name = " + my.getName());

            System.out.println("Demo = " + my.getProperty());

        }

    }

    catch (Exception e)

    {

        e.printStackTrace();

    }

}


Здесь мы получаем из контейнера экземпляр распределяемого объекта и выводим в OSGi-консоль значения его свойств.

Еще одной интересной возможностью является обмен сообщениями между объектом и его репликами. Такой обмен можно использовать, например, для синхронизации состояния объектов. В примерах к статье приведен код класса MyMessagingSharedObject, демонстрирующего использование данного механизма. Стоит отметить, что отправка сообщения осуществляется посредством вызова метода BaseSharedObject#sendSharedObjectMsgTo, который принимает в качестве параметров ID контейнера-назначения (если равен null - то сообщение будет отправлено всем контейнерам), и сообщение - объект класса SharedObjectMsg, который можно построить с помощью целого ряда методов SharedObjectMsg#createMsg.

Код отправки сообщения может быть таким:

public void sendMessage(ID targetId, Object message) throws IOException

{

    sendSharedObjectMsgTo(null, SharedObjectMsg.createMsg(this.getClass().getName(),

        "handleMessage", new Object[] {getLocalContainerID(), message}));

}


"handleMessage" - имя метода класса распределяемого объекта, который будет вызван при получении сообщения. Обмен сообщениями, как и обмен объектами, - асинхронный.

ECF позволяет управлять механизмом сериализации/десериализации посылаемых/принимаемых сообщений. Для регистрации своего сериализатора/десериализатора необходимо вызвать метод ISharedObjectContainer#setSharedObjectMessageSerializer. Сделать это можно, например, при создании контейнера:

private void createClient() throws Exception

{

    _client = ContainerFactory.getDefault().createContainer(GENERIC_CLIENT_CONTAINER);

    ((ISharedObjectContainer) _client).setSharedObjectMessageSerializer(new XStreamSOMessageSerializer());

}


В примерах приведен код класса XStreamSOMessageSerializer, который использует библиотеку XStream для отправки сообщений в XML-формате.

Главное - необходимо обеспечить единый механизм сериализации/десериализации на всем пути прохождения сообщения, т.е. и источник, и сервер, и клиент должны зарегистрировать один и тот же сериализатор.

Статья получилась объемная и несколько сумбурная. Если у вас возникли какие-то вопросы - вы всегда можете задать их в комментариях. В следующий раз я постараюсь написать о тех графических средствах, которые предоставляет ECF.

Оставайтесь на связи.

Скачать примеры к статье (исходники и конфигурации запуска. Rar, 375 Кб)

Понравилось сообщение - подпишитесь на блог или читайте меня в twitter

Комментариев нет:

Отправить комментарий

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!