понедельник, 21 февраля 2011 г.

Распределенные транзакции (XA) с помощью JTA в JavaSE (на примере Spring + Atomikos)


При интеграции приложений в единую информационную систему наиболее остро встает проблема обеспечения целостности и непротиворечивости данных. Суть данной проблемы в следующем: каждое приложение может работать со своим независимым хранилищем данных и при интеграции приложений возможна ситуация, когда данные в одном хранилище обновились, а в другом (например, в результате сбоя по питанию) - нет. Последствия таких ошибок могут быть довольно печальны: в случае сколь-нибудь крупного производственного предприятия сбой при сохранении производственного задания может привести к простою всего предприятия, или наоборот - выпуску лишней дневной нормы продукции.

Для решения данной проблемы используются т.н. распределенные транзакции - транзакции, охватывающие несколько источников данных. В мире Java такие транзакции поддерживаются с помощью Java Transaction API (JSR 907), являющегося частью спецификации JavaEE. Однако, данный факт не обозначает, что работать с JTA можно только с помощью Java EE-сервера приложений. Существуют различные, в том числе и OpenSource, реализации JTA, в частности - Atomikos TransactionsEssentials, которая содержит JDBC/XA и JMS/XA пулы соединений, а также координатор распределенных транзакций. В данной статье мы рассмотрим использование Atomikos TransactionsEssentials для управления распределенными транзакциями, в которых будут участновать JDBC- и JMS-соединения, а также Hibernate. Для объединения компонентов системы будем использовать Spring Framework.

Содержание



  1. Понятие XA-транзакции.

  2. Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework.

  3. Подключение JMS/XA (ActiveMQ).

  4. Использование Atomikos в качестве менеджера транзакций для Hibernate.

  5. Заключение.

  6. Ресурсы.



Понятие XA-транзакции


Спецификация на распределенные глобальные - XA - транзакций была разработана вендорским комитетом X/Open Group. XA транзакции управляются с помощью т.н. координатора XA-транзакций и объединяют несколько источников данных - JDBC-пулы соединений и/или JMS. Источники даных, способные участвовать в XA-транзакции называются XA-ресурсами и реализуют интерфейс javax.transaction.xa.XAResource. JTA содержит немного измененную спецификацию XA.

Коммит XA-транзакции считается успешным ТОЛЬКО если успешен коммит всех локальных транзакций XA-ресурсов. В случае, если коммит в каком-либо из локальных ресурсов неуспешен, то происходит откат всей глобальной транзакции. Для реализации такого поведения используется протокол двухфазного коммита (2PC).
Двухфазный коммит имеет два четко разделенных этапа:

Этап подготовки - координатор посылает сообщение участникам распределенной транзакции о подготовке к транзакции (prepare message). Это сообщение также содержит уникальный номер транзакции TID. Когда участники получают это сообщение, они проверяют смогут ли зафиксировать транзакцию и отвечают координатору. При этом транзакция исполняется, но не фиксируется и ее состояние сохраняется на диске.

Этап фиксации - после того, как координатор получил все ответы, он решает зафиксировать или прервать начатую транзакцию в соответствии с правилом глобальной фиксации. Если все участники глобальной транзакции ответили, что могут ее зафиксировать, то он посылает участникам сообщение о фиксации транзакции, в противном случае все участники получают сообщение об откате транзакции.

Например, в PostgreSQL двухфазный коммит реализован так, что транзакции сначала подготавливаются (PREPARE TRANSACTION) и сохраняются на диск. В последствии же они могут быть зафиксированы (COMMIT PREPARED) или отменены (ROLLBACK PREPARED) даже после перезагрузки системы.

Для участия в распределенных транзакциях как СУБД или JMS/MQ-брокер, так и их JDBC- и JMS-драйвера должны поддерживать XA протокол согласно JTA.

Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework


В состав библиотеки Atomikos TransactionsEssentials входят XA-совместимые пулы соединений с JDBC- и JMS-, а так же JTA-совместимый координатор транзакций. В данной части статьи мы рассмотрим конфигурирование и использование Atomikos с помощью SpringFramework.

Постановка задачи
Рассмотрим следующую задачу: имеется две информационные системы: Интернет-магазин (ESHOP), содержащий сведения о товарах, их цены и наличие на складе и Система управления отношениями с клиентами (CRM). Необходимо реализовать бизнес-процесс покупки товара по следующему алгоритму:

1. Получить актуальну цену товара (ESHOP).
2. Ввести заказы в систему (CRM).
3. Выставить счет клиенту (CRM).
4. Обновить сведения о наличии товара на складе (ESHOP).

При выполнении любого этапа возможна ошибка (например, упал сервер СУБД или на складе нет необходимого количества товара), поэтому данные операции необходимо выполнять в рамках XA-транзакции.

Для чистоты эсперимента будем считать, что Интернет-магазин использует СУБД PostgreSQL, а CRM - Oracle.

Подготовка СУБД к выполнению XA-транзакций
Прежде всего необходимо подготовить СУБД для обработки XA-транзакций. Для PostgreSQL необходимо в настройках сервера установить значение параметра max_prepared_transactions равным, например, 10. Подробнее можно прочитать в документации на сервер PostgreSQL.

Для Oracle 9.2/10 необходимо дать пользователю, от имени которого будет выполняться транзакция, соответствующие права:

grant select on sys.dba_pending_transactions to user name;
grant select on sys.pending_trans$ to user name;
grant select on sys.dba_2pc_pending to user name;
grant execute on sys.dbms_system to user name;


Разработка приложения
Теперь можно приступить непосредственно к написанию приложения. Сделаем следующее:
1. Определим XA-совместимые источники данных.
2. Настроим менеджер транзакций Spring Framework на использование Atomikos.
3. Настроим декларативное управление транзакциями с помощью аннотаций.
4. Создадим DAO-классы и сервисы для эмуляции ESHOP и CRM.
5. Создадим сервис-интегратор, демонстрирующий использование XA-транзакции.

Будем использовать следующий шаблон XML-файла описания контекста Spring Framework, в который будут добавляться описания бинов:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      xmlns:aop="http://www.springframework.org/schema/aop"

      xmlns:tx="http://www.springframework.org/schema/tx"

      xsi:schemaLocation="http://www.springframework.org/schema/beans

                          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd

                          http://www.springframework.org/schema/tx

                          http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">


</beans>


В качестве источника данных используется класс com.atomikos.jdbc.AtomikosDataSourceBean, позволяющий создать XA-совместимый пул соединений. Наиболее важными параметрами данного класса являются: xaDataSourceName, который принимает название класса XA-совместимого источника данных, и xaProperties, который принимает коллекцию свойств, описывающих параметры подключения к СУБД.

    <!-- XA Datasource - using Oracle DBMS (using the URL property)-->

    <bean id="dataSource1"

         class="com.atomikos.jdbc.AtomikosDataSourceBean"

         init-method="init"

         destroy-method="close">

        <property name="uniqueResourceName" value="EIS1DBMS"/>

        <property name="xaDataSourceClassName" value="${jdbc.oracle.driverClassName}"/>

        <property name="xaProperties">

            <props>

                <prop key="URL">${jdbc.oracle.url}</prop>

                <prop key="user">${jdbc.oracle.username}</prop>

                <prop key="password">${jdbc.oracle.password}</prop>

            </props>

        </property>

        <property name="poolSize" value="${jdbc.oracle.pool-size}"/>

        <property name="testQuery" value="${jdbc.oracle.testQuery}"/>

    </bean>


В качетсве XA-совместимого источника данных для СУБД Oracle используется класс oracle.jdbc.xa.client.OracleXADataSource.

Для PostgreSQL XA-совместимым источником данных является класс org.postgresql.xa.PGXADataSource, который конфигурируется несколько иначе: вместо параметра URL необходимо явно передать параметры serverName, portNumber, databaseName и т.д.:

    <!-- XA Datasource - using PostgreSQL DBMS (using the serverName, portNumber and other properties -->

    <bean id="dataSource2"

         class="com.atomikos.jdbc.AtomikosDataSourceBean"

         init-method="init"

         destroy-method="close">

        <property name="uniqueResourceName" value="EIS2DBMS"/>

        <property name="xaDataSourceClassName" value="${jdbc.postgres.driverClassName}"/>

        <property name="xaProperties">

            <props>

                <prop key="serverName">${jdbc.postgres.serverName}</prop>

                <prop key="portNumber">${jdbc.postgres.portNumber}</prop>

                <prop key="databaseName">${jdbc.postgres.databaseName}</prop>

                <prop key="user">${jdbc.postgres.username}</prop>

                <prop key="password">${jdbc.postgres.password}</prop>

            </props>

        </property>

        <property name="poolSize" value="${jdbc.postgres.pool-size}"/>

        <property name="testQuery" value="${jdbc.postgres.testQuery}"/>

    </bean>


После определения источников данных нужно сконфигурировать Spring Framework на использование JTA в качестве менеджера транзакций. Для этого существует класс org.springframework.transaction.jta.JtaTransactionManager, принимающий два параметра: transactionManager и userTransaction. Atomikos предоставляет собственные реализации как transactionManager'а, так и userTransaction:

    <!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->

    <bean id="jtaTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"

         init-method="init" destroy-method="close"/>



    <!-- Also use Atomikos UserTransactionImp, needed to configure Spring  -->

    <bean id="jtaUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">

        <property name="transactionTimeout" value="300" />

    </bean>



    <!-- Configure the Spring framework to use JTA transactions from Atomikos -->

    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">

        <property name="transactionManager" ref="jtaTransactionManager"/>

        <property name="userTransaction" ref="jtaUserTransaction"/>

    </bean>


Созданный с помощью Spring Framework менеджер транзакций теперь можно использовать внутри разрабатываемого приложения. Сами транзакции при этом можно декларативно описывать с помощью аннотаций, для чего в контекст Spring необходимо добавить:

<tx:annotation-driven transaction-manager="transactionManager" />


Управление транзакциями с помощью аннотаций подробно рассмотренно в статье: Как подружить Hibernate со Spring и обеспечить управление транзакциями через @ннотации.

В состав Spring Framework входит довольно удобная для программиста поддержка JDBC. Чтобы воспользоваться данной возможностью, необходимо реализовывать свои DAO-классы, как наследники org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport, тем самым получая доступ к org.springframework.jdbc.core.simple.SimpleJdbcTemplate:

package name.samolisov.jta.xa.demo.eshop.jdbc;



import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.List;



import name.samolisov.jta.xa.demo.eshop.IProductDao;

import name.samolisov.jta.xa.demo.eshop.Product;



import org.springframework.jdbc.core.simple.ParameterizedRowMapper;

import org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport;



public class JdbcProductDao extends SimpleJdbcDaoSupport implements IProductDao {



    private static final String PRODUCT_TABLE = "TBL_PRODUCT";



    private static final String GET_ALL_QUERY = "select * from " + PRODUCT_TABLE + " order by price";



    private static final String GET_PRICE_QUERY = "select price from " + PRODUCT_TABLE + " where id = ?";



    private static final String GET_COUNT_QUERY = "select count from  " + PRODUCT_TABLE + " where id = ?";



    private static final String GET_NAME_QUERY = "select name from " + PRODUCT_TABLE + " where id = ?";



    private static final String UPDATE_COUNT_QUERY = "update " + PRODUCT_TABLE + " set count = ? where id = ?";



    private static ProductRowMapper MAPPER = new ProductRowMapper();



    @Override

    public List<Product> getAllProducts() {

        return getSimpleJdbcTemplate().query(GET_ALL_QUERY, MAPPER);

    }



    @Override

    public double getPrice(Long productId) {

        return getSimpleJdbcTemplate().queryForObject(GET_PRICE_QUERY, Double.class, productId);

    }



    @Override

    public int getCount(Long productId) {

        return getSimpleJdbcTemplate().queryForInt(GET_COUNT_QUERY, productId);

    }



    @Override

    public String getProductName(Long productId) {

        return getSimpleJdbcTemplate().queryForObject(GET_NAME_QUERY, String.class, productId);

    }



    @Override

    public void updateCount(Long productId, Integer newCount) {

        getSimpleJdbcTemplate().update(UPDATE_COUNT_QUERY, newCount, productId);

    }



    public static class ProductRowMapper implements ParameterizedRowMapper<Product> {

        @Override

        public Product mapRow(ResultSet rs, int rowNum) throws SQLException {

            Product product = new Product();

            product.setId(rs.getLong("id"));

            product.setCount(rs.getInt("count"));

            product.setName(rs.getString("name"));

            product.setPrice(rs.getDouble("price"));

            return product;

        }

    }

}

 


Код DAO-класса, реализующего взаимодействие с СУБД для CRM приводить нет смысла, чтобы не загромождать статью. Ссылка на исходный код системы приведена в разделе Ресурсы.

При разработке приложений с использованием Spring Framework обычно применяется архитектурный паттерн Многоуровневая архитектура в следующей реализации:

  • Слой инфраструктуры (источники данных, менеджер транзакций и т.д.)
  • Слой доступа к данным (DAO)
  • Слой бизнес-логики (сервисы)
  • Слой интерфейса (например, Spring MVC)

В общем случае бизнес-логика не сводится к манипулированию одним источником данных (одной сущностью, одной таблицей). Даже в одной локальной транзакции как правило выполняется несколько запросов, поэтому и разделяют слой доступа к данным (содержит только единичные запросы, такие как вставить или изменить сущность) и слой бизнес-логики (содержит последовательности операций над сущностями).

Бизнес-логика в случае Интернет-магазина довольно простая: необходимо создать методы для получения наименования и стоимости товара по его идентификатору, а так же для обновления количества товара, имеющегося на складе - метод sell - причем, необходимо проверять имеется ли на складе достаточное количество товара и, если нет, выбрасывать исключение.

package name.samolisov.jta.xa.demo.eshop;



import org.apache.log4j.Logger;

import org.springframework.transaction.annotation.Propagation;

import org.springframework.transaction.annotation.Transactional;



public class EshopService implements IEshopService {



    private static final Logger _log = Logger.getLogger(EshopService.class);



    private IProductDao dao;



    public void setDao(IProductDao dao) {

        this.dao = dao;

    }



    @Override

    public double getPrice(Long productId) {

        double price = dao.getPrice(productId);

        _log.debug("Price for id " + productId + " is " +  price);

        return price;

    }



    @Override

    public String getProductName(Long productId) {

        String name = dao.getProductName(productId);

        _log.debug("Product name for id " + productId + " is " + name);

        return name;

    }



    @Override

    @Transactional(readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)

    public void sell(Long productId, Integer count) throws Exception {

        int hasCount = dao.getCount(productId);

        if (hasCount < count)

            throw new Exception("Could not sell " + count + ". We have only " + hasCount);

        dao.updateCount(productId, hasCount - count);

    }

}

 


Метод sell аннотирован @Transactional, поэтому он будет выполняться в рамках транзакции: при входе в данный метод будет создана транзакция, если ее еще нет (propagation = Propagation.REQUIRED). В случае, если на складе нет необходимого количества товара - будет выброшено исключение и будет выполнент откат транзакции.

Бизнес-логика CRM-системы реализована аналогично.

Объединяются архитектурные уровни с помощью Spring IoC-контейнера: классы доступа к данным и сервисы описываются как бины и инъектируются друг в друга:

    <bean id="productDao" class="name.samolisov.jta.xa.demo.eshop.jdbc.JdbcProductDao">

        <property name="dataSource" ref="dataSource2"/>

    </bean>



    <bean id="eshopService" class="name.samolisov.jta.xa.demo.eshop.EshopService">

        <property name="dao" ref="productDao"/>

    </bean>



    <bean id="crmDao" class="name.samolisov.jta.xa.demo.crm.jdbc.JdbcCrmDao">

        <property name="dataSource" ref="dataSource1"/>

    </bean>



    <bean id="crmService" class="name.samolisov.jta.xa.demo.crm.CrmService">

        <property name="dao" ref="crmDao"/>

    </bean>


На данном этапе мы создали приложение, эмулирующее Интернет-магазин, использующий одну СУБД и CRM-систему, использующую другую СУБД. Теперь необходимо объединить эти системы и выполнить запросы к ним в рамках одной распределенной транзакции. Для этого создадим класс-интегратор

   package name.samolisov.jta.xa.demo.local;



import name.samolisov.jta.xa.demo.crm.ICrmService;

import name.samolisov.jta.xa.demo.eshop.IEshopService;



import org.springframework.transaction.annotation.Transactional;



public class IntegratorService {



    private ICrmService crmService;



    private IEshopService eshopService;



    @Transactional(rollbackFor = Exception.class)

    public void makeTransaction(Long productId, String customerName, int count) throws Exception {

        // ...(ESHOP)

        double price = eshopService.getPrice(productId);

        String productName = eshopService.getProductName(productId);



        // ...(CRM)

        crmService.addOrders(customerName, productName, price, count);



        // ...(CRM)

        crmService.addAccount(customerName, productName, price, count);



        // ...(ESHOP)

        eshopService.sell(productId, count);

    }



    public void setCrmService(ICrmService crmService) {

        this.crmService = crmService;

    }



    public void setEshopService(IEshopService eshopService) {

        this.eshopService = eshopService;

    }

}

 


... и зарегистрируем его в контексте Spring Framework:

    <bean id="integratorService" class="name.samolisov.jta.xa.demo.local.IntegratorService">

        <property name="crmService" ref="crmService"/>

        <property name="eshopService" ref="eshopService"/>

    </bean>


Метод makeTransaction класса IntegratorService аннотирован @Transactional, поэтому данный метод будет выполняться внутри транзакции, которая будет откачена, если при выполнении метода будет выброшено исключение.

Создадим класс Main, в котором будет подниматься контекст Spring Framework и выполняться метод makeTransaction:

package name.samolisov.jta.xa.demo.local;



import org.springframework.context.support.ClassPathXmlApplicationContext;



public class Main {



    public static void main(String[] args) {

        try {

            ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context-jdbc.xml");

            IntegratorService service = (IntegratorService) ctx.getBean("integratorService");



            service.makeTransaction(1L, "Pavel Samolisov", 4);

        }

        catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 


Тестирование приложения
Предположим, что существует база данных в PostgreSQL с таблицей TBL_PRODUCT, содержащей следующие данные:



Предположим, что существует база данных В СУБД Oracle с пустыми таблицами
TBL_ACCOUNT и TBL_ORDER:





Таким образом на складе есть 5 единиц товара под названием BMW X5 (идентификатор - 1). В коде метода main класса Main делается попытка купить 4 единицы товара. После запуска приложения данная попытка должна быть завершена успешно:







Т.е. на складе осталась одна единица товара, а в CRM создан один счет и занесено четыре заказа.

Если сейчас заново запустить приложение, то в CRM будут созданы счет и заказы, однако при обновлении сведений о количестве единиц товара на складе будет брошено исключение (на складе сейчас одна единица товара, а надо продать четыре), поэтому будет выполнен откат распределенной транзакции - данные не сохранятся ни в Oracle, ни в PostgreSQL.

Подключаем JMS/XA (ActiveMQ)


Для обеспечения гарантированой доставки сообщений между удаленными системами используется подсистема Java Message Service (JMS). Существует несколько реализаций данной подсистемы, как коммерческих (Oracle AQ, WebSphere MQ), так и с открытыми исходными кодами (ActiveMQ, OpenJMS). В данном примере будем использовать ActiveMQ - JMS-брокер с открытым исходным кодом, поддерживающий Enterprise-расширения и хранение сообщений как в СУБД, так и в файлах.

Ассинхронная отправка/прием сообщений с помощью JMS подразумевает следующее: существует слушатель, который периодически проверяет состояние JMS-очереди/топика (полинг). Если в очередь записывается сообщение, то данный слушатель считывает его и выполняет обработку. Как запись, так и считывание сообщения может осуществляться в рамках транзакции (понятно, что это - отдельные транзакции для записи и считывания).

При записи: в случае возникновения исключительной ситуации - сообщение не записывается в очередь/топик.

При чтении: в случае возникновения исключительной ситуации - сообщение выталкивается в специальную очередь для необработанных сообщений.

XA-транзакции с участием JMS/JDBC подразумевают, что в случае ошибок будет откачены не только изменения в базе данных, но и не записано/возвращено сообщение JMS-брокеру.

Постановка задачи
Для демонстрации использования JMS в распеделенных транзакциях расширим созданное в предыдущем разделе приложение: обеспечим интеграцию с CRM-системой через JMS-очередь. Для этого необходимо выполнить два действия:

1. Разработать слушатель JMS-очереди, который при получении сообщения будет сохранять заказы и счета в CRM-системе.
2. Разработать механизм отправки сообщений для CRM в JMS-очередь.

И отправка и чтение сообщений будут осуществляться в рамках XA-транзакций.

Для использования JMS-очереди в Spring Framework необходимо определить следующие бины:

Фабрику соединений, поддерживающую XA:

    <!-- Configure the ActiveMQ Queue Factory -->

    <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">

        <property name="brokerURL" value="${jms.broker.url}"/>

    </bean>


Непосредственно очередь сообщений:

    <!-- Configure the ActiveMQ Queue for accounts -->

    <bean id="crmQueue" class="org.apache.activemq.command.ActiveMQQueue">

        <constructor-arg value="${jms.crm.queue.name}"/>

    </bean>


Atomikos-коннектор для JMS:

    <bean id="connectionFactoryBean"

         class="com.atomikos.jms.AtomikosConnectionFactoryBean"

         init-method="init"

         destroy-method="close">

        <property name="uniqueResourceName" value="QUEUE_BROCKER"/>

        <property name="xaConnectionFactory" ref ="xaFactory"/>

    </bean>


Следует обратить внимание, что коннектор принимает два параметра: уникальный идентификатор - название ресурса и ссылку на фабрику соединений с JMS-брокером.

Отправка сообщений
Для взаимодействия с JMS в состав Spring Framework входит класс org.springframework.jms.core.JmsTemplate. При создании данный класс принимает следующие основные параметры:

- connectionFactory - ссылку на фабрику соединений,
- sessionTransacted - если данный флаг установлен в true, то взаимодействие с JMS будет осуществляться в рамках транзакции.

Обязательно необходимо устанавить значение параметра sessionTransacted в true.

    <bean id="crmJmsTemplate"

         class="org.springframework.jms.core.JmsTemplate">

        <property name="connectionFactory" ref="connectionFactoryBean"/>

        <property name="defaultDestination" ref="crmQueue"/>

        <property name="receiveTimeout" value="3000"/>

        <property name="sessionTransacted" value="true"/>

    </bean>


Сама транзакция при отправке сообщений в JMS-очередь должна создаваться в коде, использующем jmsTemplate. Для демонстрации интеграции создадим сервис JmsIntegratorService, в котором взаимодействие с Интернет-магазином будет осуществляться напрямую, а с CRM - через JMS:

package name.samolisov.jta.xa.demo.jms;



import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.Session;



import name.samolisov.jta.xa.demo.eshop.IEshopService;



import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.transaction.annotation.Transactional;



public class JmsIntegratorService {



    private JmsTemplate crmJmsTemplate;



    private IEshopService eshopService;



    @Transactional(rollbackFor = Exception.class)

    public void makeTransaction(final Long productId, final String customerName, final int count) throws Exception {

        // ...(ESHOP)

        final double price = eshopService.getPrice(productId);

        final String productName = eshopService.getProductName(productId);



        // ... (CRM)

        crmJmsTemplate.send(new MessageCreator() {

            @Override

            public Message createMessage(Session session) throws JMSException {

                MapMessage message = session.createMapMessage();

                message.setStringProperty("name", customerName);

                message.setStringProperty("productName", productName);

                message.setDoubleProperty("price", price);

                message.setIntProperty("count", count);



                return message;

            }

        });



        // ... (ESHOP)

        eshopService.sell(productId, count);

    }



    public void setCrmJmsTemplate(JmsTemplate crmJmsTemplate) {

        this.crmJmsTemplate = crmJmsTemplate;

    }



    public void setEshopService(IEshopService eshopService) {

        this.eshopService = eshopService;

    }

}


Метод makeTransaction аннотирован @Transactional, соответственно он будет выполняться внутри транзакции, включающей в себя так же и взаимодействие с JMS.

Прием сообщений
Для асинхронного чтения сообщений используется механизм слушателей - классов, реализующих интерфейс javax.jms.MessageListener. Для интеграции с CRM создадим слушатель OnCreateAccountAndOrdersListener, который обрабатывает получаемые сообщения и заносит счета и заказы в систему:

package name.samolisov.jta.xa.demo.crm.jms;



import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.MessageListener;



import name.samolisov.jta.xa.demo.crm.ICrmService;



import org.apache.log4j.Logger;



public class OnCreateAccountAndOrdersListener implements MessageListener {



    private static final Logger _log = Logger.getLogger(OnCreateAccountAndOrdersListener.class);



    private ICrmService service;



    public void setService(ICrmService service) {

        this.service = service;

    }



    @Override

    public void onMessage(Message msg) {

        try {

            MapMessage m = (MapMessage) msg;

            String name = m.getStringProperty("name");

            String productName = m.getStringProperty("productName");

            double price = m.getDoubleProperty("price");

            int count = m.getIntProperty("count");



            _log.debug("Creating an Account with name = " + name + ", product = " + productName

                    + ", price = " + price + ", count = " + count);



            service.addAccount(name, productName, price, count);



            _log.debug("Creating Orders with name = " + name + ", product = " + productName

                    + ", price = " + price + ", count = " + count);



            service.addOrders(name, productName, price, count);

        }

        catch (Exception e) {

            _log.error("Could not handle a message", e);



            // &#1074;&#1099;&#1073;&#1088;&#1086;&#1089;&#1080;&#1084; Runtime Exception, &#1095;&#1090;&#1086;&#1073;&#1099; &#1086;&#1090;&#1082;&#1072;&#1090;&#1080;&#1090;&#1100; &#1090;&#1088;&#1072;&#1085;&#1079;&#1072;&#1082;&#1094;&#1080;&#1102;

            throw new RuntimeException(e.getMessage());

        }

    }

}

 


Важно! При чтении сообщения мы не описываем транзакцию даже декларативно (с помощью аннотации @Transactional), соответственно мы не можем управлять параметрами отката. По-умолчанию, транзакции откатываются только при бросании исключений-наследников от RuntimeException (неуловимых исключений). Данное исключение бросается в блоке catch слушателя.

В Spring Framework для регистрации слушателей применяется концепция контейнеров. Контейнер представляет собой экземпляр класса org.springframework.jms.listener.DefaultMessageListenerContainer. При создании экземпляра данного класса необходимо определить следующие основные параметры:

  • connectionFactory - фабрика соединений с JMS-брокером;
  • destination - ссылка на очередь, с которой будут считываться сообщения;
  • messageListener - ссылка на слушатель сообщений;
  • transactionManager - ссылка на координатор транзакций, зарегистрированный в Spring Framework. Именно данный координатор будет создавать транзакции при получении сообщения и вызове слушателя. Если не указать transactionManager, то при вызове метода onMessage слушателя транзакции создаваться не будут.
  • sessionTransacted - если данный параметр установлен в true, то JMS-очередь будет участвовать в распределенной транзакции при чтении.

    <bean id="onCreateAccountAndOrdersListener" class="name.samolisov.jta.xa.demo.crm.jms.OnCreateAccountAndOrdersListener">

        <property name="service" ref="crmService"/>

    </bean>



    <bean id="crmListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="transactionManager" ref="transactionManager" />

        <property name="connectionFactory" ref="connectionFactoryBean" />

        <property name="messageListener" ref="onCreateAccountAndOrdersListener" />

        <property name="destination" ref="crmQueue" />

        <property name="concurrentConsumers" value="1" />

        <property name="receiveTimeout" value="3000" />

        <property name="sessionTransacted" value="true"/>

    </bean>


Необходимо обратить внимание на взаимосвязь параметров transactionManager и sessionTransacted:

а) если transactionManager не установлен и sessionTransacted == false, то при обработке сообщений транзакции создаваться не будут;

б) если transactionManager не установлен и sessionTransacted == true, то будет сгенерировано исключение при создании бина;

в) если transactionManager установлен и sessionTransacted == false, то метод onMessage слушателя будет выполняться в транзакции, но чтение сообщений из очереди не будет производиться в этой транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены, однако сообщение в JMS не вернется;

г) если transactionManager установлен и sessionTransacted == true, то метод onMessage слушателя будет выполняться в транзакции и чтение сообщения будет выполняться в этой же транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены и сообщение вернется в JMS.

Тестирование приложения
Для тестирования создадим два класса с методами main - MainIntegrator, выполняющий метод makeTransaction, и MainReceiver, запускающий слушателя.

Предположим, что брокер ActiveMQ установлен, настроен и запущен, исходное состояние баз данных восстановлено.

Запустим MainIntegrator, т.к. исходное состояние баз данных корректно и брокер запущен, то транзакция завершится успешно: сообщение записано в очередь, изменения в базе данных Интернет-магазина выполнены.





Теперь запустим MainReceiver. Т.к. в очереди есть сообщение, то после запуска слушателя оно будет считано и обработано: в таблицы TBL_ACCOUNT и TBL_ORDER будут занесены данные.





Можно поэкспериментировать с эмуляцией ошибок: запустить MainIntegrator еще раз. Т.к., на складе осталась одна единица товара, а производится попытка купить четыре, то будет брoшено исключение и выполнен откат транзакции - сообщение не будет записано в очередь JMS.

Если в код слушателя добавить строчку

throw new Exception();


то при его вызове так же будет выполнен откат транзакции. Само сообщение при этом будет вытолкнуто в очередь ActiveMQ.DLQ:



из которой его можно прочитать и обработать уже другим процессом.

Использование Atomikos в качестве менеджера транзакций для Hibernate


Hibernate является одной из самых популярных реализаций объектно-реляционного отображения (ORM) в мире Java. Данный фреймворк довольно гибко настраивается и может использовать координатор из JTA для управления транзакциями.

Для того, чтобы Hibernate использовал JTA (в нашем случае - Atomikos) в качестве менеджера транзакций необходимо настроить два параметра:

  • hibernate.transaction.factory_class - фабрика транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory;
  • hibernate.transaction.manager_lookup_class - класс, отвечающий за взаимодействие с координатором транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup.

В качестве значения параметра dataSource необходимо использовать источник данных с поддержкой XA.

В остальных аспектах Hibernate конфигурируется как обычно:

<!-- Configure a Hibernate Properties -->

    <bean id="hibernateProperties"

         class="org.springframework.beans.factory.config.PropertiesFactoryBean">

        <property name="properties">

            <props>

                <prop key="hibernate.transaction.factory_class">

                    com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory

                </prop>

                <prop key="hibernate.transaction.manager_lookup_class">

                    com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup

                </prop>

                <prop key="hibernate.dialect">

                    ${hibernate.dialect}

                </prop>

                <prop key="hibernate.hbm2ddl.auto">

                    ${hibernate.hbm2ddl.auto}

                </prop>

                <prop key="hibernate.show_sql">

                    ${hibernate.show_sql}

                </prop>

                <prop key="hibernate.cache.provider_class">org.hibernate.cache.OSCacheProvider</prop>

                <prop key="hibernate.cache.use_query_cache">true</prop>

                <prop key="hibernate.connection.autocommit">false</prop>

            </props>

        </property>

    </bean>



    <!--

       Configure Hibernate to use the Atomikos JTA and

       datasource for transaction control

   -->

   <bean id="sessionFactory"

        class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">

      <property name="annotatedClasses">

         <list>

            <value>name.samolisov.jta.xa.demo.crm.Account</value>

            <value>name.samolisov.jta.xa.demo.crm.Order</value>

         </list>

      </property>

      <property name="dataSource" ref="dataSource1"/>

      <property name="hibernateProperties" ref="hibernateProperties"/>

   </bean>



    <!--

      Configure the Spring hibernate template with the session factory from above

   -->

   <bean id="hibernateTemplate"

        class="org.springframework.orm.hibernate3.HibernateTemplate">

      <property name="sessionFactory" ref="sessionFactory"/>

   </bean>



   <bean id="hibernateCrmDao" class="name.samolisov.jta.xa.demo.crm.hibernate.HibernateCrmDao">

      <property name="hibernateTemplate" ref="hibernateTemplate"/>

   </bean>


Благодаря использованию патерна "Инверсия управления" в Spring Framework можно легко определить сервисы, использующие HibernateCrmDao вместо JdbcCrmDao. Код данных сервисов и сервиса интеграции останется неизменным, транзакции так же будут определяться декларативно, в качестве менеджера транзакций будет использоваться координатор, определенный в Spring Framework. Тестирование нового варианта приложения не отличается от варианта, используюшего чистый JDBC.

Заключение


В данной статье мы рассмотрели построение JavaSE приложения, использующего распределенные транзакции, с помощью Spring Framework, Atomikos, JDBC, JMS и Hibernate. Как оказалось, для использования возможностей Java Transaction API сервер приложений не является необходимым компонентом. Spring Framework позволяет легко обеспечить удобное декларативное управление распределенными транзакциями точно так же, как и локальными, при этом в транзакциях могут участвовать не только соединения с базой данных, но и JMS, что позволяет создавать легкие, но надежные интеграционные решения, не требующие использования тяжелых компонентов.

Ресурсы



Понравилось сообщение - подпишитесь на блог

12 комментариев:

Pavel Samolisov комментирует...

Я понимаю, что в статью надо бы добавить картинок, схем транзакций, кодов XA-координаторов и прочих кошерных вещей, но совсем нет времени.

Mikhail Krestjaninoff комментирует...

Отличная статья, большое вам спасибо!

Небольшой вопрос, что бы окончательно уложить всё в голове: распределённая транзакция может работать только в рамках одного приложения? К примеру возьмём описанный выше сценарий ESHOP-JMS-CRM. Я правильно понимаю, что при возникновении исключения в методе OnCreateAccountAndOrdersListener.onMessage откатяться только изменения, вызванные методами service.addAccount и service.addOrders, а метод eshopService.sell(productId, count) в JmsIntegratorService.makeTransaction будет выполнен, так как предшествующий ему код упешно поставил сообщение в очередь?

Pavel Samolisov комментирует...

Да, все правильно. Более того, на момент записи сообщения в очередь слушатель вообще может отсутствовать (асинхронность).

Однако JMS брокеры как правило сохраняют сообщения, отправленные в очереди/топики, поэтому, если сообщение попало в JMS, то в какой-то мере гарантируется его доставка.

Andrew Fink комментирует...

Прекрасная статья!

После длительного периода "теоретической физики" (OSGi, Eclipse труляля) ядреный полезный пищевой концентрат!

Продолжайте в этом же направлении!

PS:
C MQ брокером долго определялись, тестировали: выбрали JBoss Шершня http://www.jboss.org/hornetq

Легкий, элегентный, очень быстрый (the Performance Leader in Enterprise Messaging).

Pavel Samolisov комментирует...

Мы в проекте используем Oracle Fussion Middleware, а для души решил потестить ActiveMQ, однако, спасибо за наводку - посмотрю в сторону шершня.

Ну и вообще спасибо за отзыв.

Timur комментирует...

Я так понимаю с Jdeveloper'ом работаете. Как вам после eclipse эта IDE?

Pavel Samolisov комментирует...

Непривычно, но композиты рисовать - милое дело

sergey комментирует...

полезно. спс!

Nikita Eshkeev комментирует...

Как распределенные транзации будут работать с dblink? неявно оракл берет на себя управление транзакцией, когда встречаются dblink, так кто будет главнее?

Pavel Samolisov комментирует...

Такая конфигурация поддерживается, но с ограничениями. Детали зависят от используемой версии Oracle. Подробнее можно прочесть в разделе XA and Database Links документа.

Serg Kunz комментирует...

А какой уровень изоляции будет использоваться для данной распределенной транзакции? Если по умолчанию в БД, то READ COMMITED, так? Тогда мне кажется, что есть кэйс, при котором транзакция может некорректно выполнится в условиях реальной много конкурентной среды.

Например код в методе EshopService.sell()

int hasCount = dao.getCount(productId);

может быть прочитан двумя транзакциями одновременно, так как будет применена SHARED блокировка для таблицы товаров. А значит обе транзакции будут одновременно думать, что у нас на складе есть 4 BMW в наличии, что не корректно. Нужно ли здесь повышать уровень изоляции транзакции и как?

Поправьте меня, если я ошибаюсь.

Pavel Samolisov комментирует...

Когда писал статью (это же уже больше 4-х лет прошло, вот время летит) об уровнях изоляции не задумывался. Как я понимаю будет использоваться уровень по-умолчанию, причем в каждом подключенном ресурсе свой. Это конечно же надо учитывать. Более того, координатор транзакций после получения всех положительных ответов на вызов prepare() будет ПОСЛЕДОВАТЕЛЬНО выдавать команды commit() всем ресурсам, а это значит, что данные в одном ресурсе обновятся раньше, чем в другом, что может вызвать гонки. Классический пример - паттерн "квитанция": пишем в очередь id сообщения, а данные сохраняем в БД. Коммит. В очередь закоммитилось раньше, чем в БД, а на эту очередь есть подписчик, который прежде чем обрабатывать сообщение "поднимает" данные из БД по этому id. Соответственно подписчик "видит" сообщение, делает запрос в базу и получает пустой result set, т.к. коммит до базы еще не прошел. Сам такое неоднократно встречал.

По поводу вашего комментария. Не вижу разницы между окружением с распределенными транзакциями и с одной базой. Действительно, в случае, если не делать SELECT FOR UPDATE, то каждая одновременно выполняемая транзакция при уровне изоляции READ_COMMITTED увидит одно и то же значение и тут уже которой повезет закоммититься последней. У классической реализации оптимистической блокировки та же проблема - кто гарантирует, что две транзакции изменения объекта не придут одновременно и не "увидят" одно и то же значение поля VERSION при старте своей работы?

В принципе, если говорить о каком-нибудь веб-приложении, где пользователь долго-долго заполняет форму, потом нажимает SUBMIT, на сервере стартует транзакция и данные быстро обновляются, то вероятность того, что два пользователя одновременно нажмут кнопку SUBMIT да еще и для одного и того же объекта чрезвычайно мала, с другой же стороны здесь можно применить и SELECT FOR UPDATE, т.к. время выполнения транзакции гораздо меньше времени ввода данных пользователем. Видел подобную реализацию оптимистической (оптимистической, несмотря на наличие SELECT version FOR UPDATE) в гайдах Но для более конкурентной среды, например для процессинговой системы, нужно думать как бы и рыбку съесть (не потерять обновление) и косточной не подавиться (не проиграть в конкурентности).

Отправить комментарий

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!