понедельник, 21 февраля 2011 г.

Распределенные транзакции (XA) с помощью JTA в JavaSE (на примере Spring + Atomikos)


При интеграции приложений в единую информационную систему наиболее остро встает проблема обеспечения целостности и непротиворечивости данных. Суть данной проблемы в следующем: каждое приложение может работать со своим независимым хранилищем данных и при интеграции приложений возможна ситуация, когда данные в одном хранилище обновились, а в другом (например, в результате сбоя по питанию) - нет. Последствия таких ошибок могут быть довольно печальны: в случае сколь-нибудь крупного производственного предприятия сбой при сохранении производственного задания может привести к простою всего предприятия, или наоборот - выпуску лишней дневной нормы продукции.

Для решения данной проблемы используются т.н. распределенные транзакции - транзакции, охватывающие несколько источников данных. В мире Java такие транзакции поддерживаются с помощью Java Transaction API (JSR 907), являющегося частью спецификации JavaEE. Однако, данный факт не обозначает, что работать с JTA можно только с помощью Java EE-сервера приложений. Существуют различные, в том числе и OpenSource, реализации JTA, в частности - Atomikos TransactionsEssentials, которая содержит JDBC/XA и JMS/XA пулы соединений, а также координатор распределенных транзакций. В данной статье мы рассмотрим использование Atomikos TransactionsEssentials для управления распределенными транзакциями, в которых будут участновать JDBC- и JMS-соединения, а также Hibernate. Для объединения компонентов системы будем использовать Spring Framework.

Содержание


  1. Понятие XA-транзакции.
  2. Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework.
  3. Подключение JMS/XA (ActiveMQ).
  4. Использование Atomikos в качестве менеджера транзакций для Hibernate.
  5. Заключение.
  6. Ресурсы.


Понятие XA-транзакции


Спецификация на распределенные глобальные - XA - транзакций была разработана вендорским комитетом X/Open Group. XA транзакции управляются с помощью т.н. координатора XA-транзакций и объединяют несколько источников данных - JDBC-пулы соединений и/или JMS. Источники даных, способные участвовать в XA-транзакции называются XA-ресурсами и реализуют интерфейс javax.transaction.xa.XAResource. JTA содержит немного измененную спецификацию XA.

Коммит XA-транзакции считается успешным ТОЛЬКО если успешен коммит всех локальных транзакций XA-ресурсов. В случае, если коммит в каком-либо из локальных ресурсов неуспешен, то происходит откат всей глобальной транзакции. Для реализации такого поведения используется протокол двухфазного коммита (2PC).
Двухфазный коммит имеет два четко разделенных этапа:

Этап подготовки - координатор посылает сообщение участникам распределенной транзакции о подготовке к транзакции (prepare message). Это сообщение также содержит уникальный номер транзакции TID. Когда участники получают это сообщение, они проверяют смогут ли зафиксировать транзакцию и отвечают координатору. При этом транзакция исполняется, но не фиксируется и ее состояние сохраняется на диске.

Этап фиксации - после того, как координатор получил все ответы, он решает зафиксировать или прервать начатую транзакцию в соответствии с правилом глобальной фиксации. Если все участники глобальной транзакции ответили, что могут ее зафиксировать, то он посылает участникам сообщение о фиксации транзакции, в противном случае все участники получают сообщение об откате транзакции.

Например, в PostgreSQL двухфазный коммит реализован так, что транзакции сначала подготавливаются (PREPARE TRANSACTION) и сохраняются на диск. В последствии же они могут быть зафиксированы (COMMIT PREPARED) или отменены (ROLLBACK PREPARED) даже после перезагрузки системы.

Для участия в распределенных транзакциях как СУБД или JMS/MQ-брокер, так и их JDBC- и JMS-драйвера должны поддерживать XA протокол согласно JTA.

Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework


В состав библиотеки Atomikos TransactionsEssentials входят XA-совместимые пулы соединений с JDBC- и JMS-, а так же JTA-совместимый координатор транзакций. В данной части статьи мы рассмотрим конфигурирование и использование Atomikos с помощью SpringFramework.

Постановка задачи
Рассмотрим следующую задачу: имеется две информационные системы: Интернет-магазин (ESHOP), содержащий сведения о товарах, их цены и наличие на складе и Система управления отношениями с клиентами (CRM). Необходимо реализовать бизнес-процесс покупки товара по следующему алгоритму:

1. Получить актуальну цену товара (ESHOP).
2. Ввести заказы в систему (CRM).
3. Выставить счет клиенту (CRM).
4. Обновить сведения о наличии товара на складе (ESHOP).

При выполнении любого этапа возможна ошибка (например, упал сервер СУБД или на складе нет необходимого количества товара), поэтому данные операции необходимо выполнять в рамках XA-транзакции.

Для чистоты эсперимента будем считать, что Интернет-магазин использует СУБД PostgreSQL, а CRM - Oracle.

Подготовка СУБД к выполнению XA-транзакций
Прежде всего необходимо подготовить СУБД для обработки XA-транзакций. Для PostgreSQL необходимо в настройках сервера установить значение параметра max_prepared_transactions равным, например, 10. Подробнее можно прочитать в документации на сервер PostgreSQL.

Для Oracle 9.2/10 необходимо дать пользователю, от имени которого будет выполняться транзакция, соответствующие права:
grant select on sys.dba_pending_transactions to user name;
grant select on sys.pending_trans$ to user name;
grant select on sys.dba_2pc_pending to user name;
grant execute on sys.dbms_system to user name;


Разработка приложения
Теперь можно приступить непосредственно к написанию приложения. Сделаем следующее:
1. Определим XA-совместимые источники данных.
2. Настроим менеджер транзакций Spring Framework на использование Atomikos.
3. Настроим декларативное управление транзакциями с помощью аннотаций.
4. Создадим DAO-классы и сервисы для эмуляции ESHOP и CRM.
5. Создадим сервис-интегратор, демонстрирующий использование XA-транзакции.

Будем использовать следующий шаблон XML-файла описания контекста Spring Framework, в который будут добавляться описания бинов:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      xmlns:aop="http://www.springframework.org/schema/aop"

      xmlns:tx="http://www.springframework.org/schema/tx"

      xsi:schemaLocation="http://www.springframework.org/schema/beans

                          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd

                          http://www.springframework.org/schema/tx

                          http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">


</beans>


В качестве источника данных используется класс com.atomikos.jdbc.AtomikosDataSourceBean, позволяющий создать XA-совместимый пул соединений. Наиболее важными параметрами данного класса являются: xaDataSourceName, который принимает название класса XA-совместимого источника данных, и xaProperties, который принимает коллекцию свойств, описывающих параметры подключения к СУБД.

    <!-- XA Datasource - using Oracle DBMS (using the URL property)-->

    <bean id="dataSource1"

         class="com.atomikos.jdbc.AtomikosDataSourceBean"

         init-method="init"

         destroy-method="close">

        <property name="uniqueResourceName" value="EIS1DBMS"/>

        <property name="xaDataSourceClassName" value="${jdbc.oracle.driverClassName}"/>

        <property name="xaProperties">

            <props>

                <prop key="URL">${jdbc.oracle.url}</prop>

                <prop key="user">${jdbc.oracle.username}</prop>

                <prop key="password">${jdbc.oracle.password}</prop>

            </props>

        </property>

        <property name="poolSize" value="${jdbc.oracle.pool-size}"/>

        <property name="testQuery" value="${jdbc.oracle.testQuery}"/>

    </bean>


В качетсве XA-совместимого источника данных для СУБД Oracle используется класс oracle.jdbc.xa.client.OracleXADataSource.

Для PostgreSQL XA-совместимым источником данных является класс org.postgresql.xa.PGXADataSource, который конфигурируется несколько иначе: вместо параметра URL необходимо явно передать параметры serverName, portNumber, databaseName и т.д.:

    <!-- XA Datasource - using PostgreSQL DBMS (using the serverName, portNumber and other properties -->

    <bean id="dataSource2"

         class="com.atomikos.jdbc.AtomikosDataSourceBean"

         init-method="init"

         destroy-method="close">

        <property name="uniqueResourceName" value="EIS2DBMS"/>

        <property name="xaDataSourceClassName" value="${jdbc.postgres.driverClassName}"/>

        <property name="xaProperties">

            <props>

                <prop key="serverName">${jdbc.postgres.serverName}</prop>

                <prop key="portNumber">${jdbc.postgres.portNumber}</prop>

                <prop key="databaseName">${jdbc.postgres.databaseName}</prop>

                <prop key="user">${jdbc.postgres.username}</prop>

                <prop key="password">${jdbc.postgres.password}</prop>

            </props>

        </property>

        <property name="poolSize" value="${jdbc.postgres.pool-size}"/>

        <property name="testQuery" value="${jdbc.postgres.testQuery}"/>

    </bean>


После определения источников данных нужно сконфигурировать Spring Framework на использование JTA в качестве менеджера транзакций. Для этого существует класс org.springframework.transaction.jta.JtaTransactionManager, принимающий два параметра: transactionManager и userTransaction. Atomikos предоставляет собственные реализации как transactionManager'а, так и userTransaction:

    <!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->

    <bean id="jtaTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"

         init-method="init" destroy-method="close"/>



    <!-- Also use Atomikos UserTransactionImp, needed to configure Spring  -->

    <bean id="jtaUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">

        <property name="transactionTimeout" value="300" />

    </bean>



    <!-- Configure the Spring framework to use JTA transactions from Atomikos -->

    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">

        <property name="transactionManager" ref="jtaTransactionManager"/>

        <property name="userTransaction" ref="jtaUserTransaction"/>

    </bean>


Созданный с помощью Spring Framework менеджер транзакций теперь можно использовать внутри разрабатываемого приложения. Сами транзакции при этом можно декларативно описывать с помощью аннотаций, для чего в контекст Spring необходимо добавить:

<tx:annotation-driven transaction-manager="transactionManager" />


Управление транзакциями с помощью аннотаций подробно рассмотренно в статье: Как подружить Hibernate со Spring и обеспечить управление транзакциями через @ннотации.

В состав Spring Framework входит довольно удобная для программиста поддержка JDBC. Чтобы воспользоваться данной возможностью, необходимо реализовывать свои DAO-классы, как наследники org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport, тем самым получая доступ к org.springframework.jdbc.core.simple.SimpleJdbcTemplate:

package name.samolisov.jta.xa.demo.eshop.jdbc;



import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.List;



import name.samolisov.jta.xa.demo.eshop.IProductDao;

import name.samolisov.jta.xa.demo.eshop.Product;



import org.springframework.jdbc.core.simple.ParameterizedRowMapper;

import org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport;



public class JdbcProductDao extends SimpleJdbcDaoSupport implements IProductDao {



    private static final String PRODUCT_TABLE = "TBL_PRODUCT";



    private static final String GET_ALL_QUERY = "select * from " + PRODUCT_TABLE + " order by price";



    private static final String GET_PRICE_QUERY = "select price from " + PRODUCT_TABLE + " where id = ?";



    private static final String GET_COUNT_QUERY = "select count from  " + PRODUCT_TABLE + " where id = ?";



    private static final String GET_NAME_QUERY = "select name from " + PRODUCT_TABLE + " where id = ?";



    private static final String UPDATE_COUNT_QUERY = "update " + PRODUCT_TABLE + " set count = ? where id = ?";



    private static ProductRowMapper MAPPER = new ProductRowMapper();



    @Override

    public List<Product> getAllProducts() {

        return getSimpleJdbcTemplate().query(GET_ALL_QUERY, MAPPER);

    }



    @Override

    public double getPrice(Long productId) {

        return getSimpleJdbcTemplate().queryForObject(GET_PRICE_QUERY, Double.class, productId);

    }



    @Override

    public int getCount(Long productId) {

        return getSimpleJdbcTemplate().queryForInt(GET_COUNT_QUERY, productId);

    }



    @Override

    public String getProductName(Long productId) {

        return getSimpleJdbcTemplate().queryForObject(GET_NAME_QUERY, String.class, productId);

    }



    @Override

    public void updateCount(Long productId, Integer newCount) {

        getSimpleJdbcTemplate().update(UPDATE_COUNT_QUERY, newCount, productId);

    }



    public static class ProductRowMapper implements ParameterizedRowMapper<Product> {

        @Override

        public Product mapRow(ResultSet rs, int rowNum) throws SQLException {

            Product product = new Product();

            product.setId(rs.getLong("id"));

            product.setCount(rs.getInt("count"));

            product.setName(rs.getString("name"));

            product.setPrice(rs.getDouble("price"));

            return product;

        }

    }

}

 


Код DAO-класса, реализующего взаимодействие с СУБД для CRM приводить нет смысла, чтобы не загромождать статью. Ссылка на исходный код системы приведена в разделе Ресурсы.

При разработке приложений с использованием Spring Framework обычно применяется архитектурный паттерн Многоуровневая архитектура в следующей реализации:
  • Слой инфраструктуры (источники данных, менеджер транзакций и т.д.)
  • Слой доступа к данным (DAO)
  • Слой бизнес-логики (сервисы)
  • Слой интерфейса (например, Spring MVC)
В общем случае бизнес-логика не сводится к манипулированию одним источником данных (одной сущностью, одной таблицей). Даже в одной локальной транзакции как правило выполняется несколько запросов, поэтому и разделяют слой доступа к данным (содержит только единичные запросы, такие как вставить или изменить сущность) и слой бизнес-логики (содержит последовательности операций над сущностями).
Бизнес-логика в случае Интернет-магазина довольно простая: необходимо создать методы для получения наименования и стоимости товара по его идентификатору, а так же для обновления количества товара, имеющегося на складе - метод sell - причем, необходимо проверять имеется ли на складе достаточное количество товара и, если нет, выбрасывать исключение.
package name.samolisov.jta.xa.demo.eshop; import org.apache.log4j.Logger; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; public class EshopService implements IEshopService {     private static final Logger _log = Logger.getLogger(EshopService.class);     private IProductDao dao;     public void setDao(IProductDao dao) {         this.dao = dao;     }     @Override     public double getPrice(Long productId) {         double price = dao.getPrice(productId);         _log.debug("Price for id " + productId + " is " +  price);         return price;     }     @Override     public String getProductName(Long productId) {         String name = dao.getProductName(productId);         _log.debug("Product name for id " + productId + " is " + name);         return name;     }     @Override     @Transactional(readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)     public void sell(Long productId, Integer count) throws Exception {         int hasCount = dao.getCount(productId);         if (hasCount < count)             throw new Exception("Could not sell " + count + ". We have only " + hasCount);         dao.updateCount(productId, hasCount - count);     } }  
Метод sell аннотирован @Transactional, поэтому он будет выполняться в рамках транзакции: при входе в данный метод будет создана транзакция, если ее еще нет (propagation = Propagation.REQUIRED). В случае, если на складе нет необходимого количества товара - будет выброшено исключение и будет выполнент откат транзакции. Бизнес-логика CRM-системы реализована аналогично. Объединяются архитектурные уровни с помощью Spring IoC-контейнера: классы доступа к данным и сервисы описываются как бины и инъектируются друг в друга:
    <bean id="productDao" class="name.samolisov.jta.xa.demo.eshop.jdbc.JdbcProductDao">         <property name="dataSource" ref="dataSource2"/>     </bean>     <bean id="eshopService" class="name.samolisov.jta.xa.demo.eshop.EshopService">         <property name="dao" ref="productDao"/>     </bean>     <bean id="crmDao" class="name.samolisov.jta.xa.demo.crm.jdbc.JdbcCrmDao">         <property name="dataSource" ref="dataSource1"/>     </bean>     <bean id="crmService" class="name.samolisov.jta.xa.demo.crm.CrmService">         <property name="dao" ref="crmDao"/>     </bean>
На данном этапе мы создали приложение, эмулирующее Интернет-магазин, использующий одну СУБД и CRM-систему, использующую другую СУБД. Теперь необходимо объединить эти системы и выполнить запросы к ним в рамках одной распределенной транзакции. Для этого создадим класс-интегратор
   package name.samolisov.jta.xa.demo.local; import name.samolisov.jta.xa.demo.crm.ICrmService; import name.samolisov.jta.xa.demo.eshop.IEshopService; import org.springframework.transaction.annotation.Transactional; public class IntegratorService {     private ICrmService crmService;     private IEshopService eshopService;     @Transactional(rollbackFor = Exception.class)     public void makeTransaction(Long productId, String customerName, int count) throws Exception {         // ...(ESHOP)         double price = eshopService.getPrice(productId);         String productName = eshopService.getProductName(productId);         // ...(CRM)         crmService.addOrders(customerName, productName, price, count);         // ...(CRM)         crmService.addAccount(customerName, productName, price, count);         // ...(ESHOP)         eshopService.sell(productId, count);     }     public void setCrmService(ICrmService crmService) {         this.crmService = crmService;     }     public void setEshopService(IEshopService eshopService) {         this.eshopService = eshopService;     } }  
... и зарегистрируем его в контексте Spring Framework:
    <bean id="integratorService" class="name.samolisov.jta.xa.demo.local.IntegratorService">         <property name="crmService" ref="crmService"/>         <property name="eshopService" ref="eshopService"/>     </bean>
Метод makeTransaction класса IntegratorService аннотирован @Transactional, поэтому данный метод будет выполняться внутри транзакции, которая будет откачена, если при выполнении метода будет выброшено исключение. Создадим класс Main, в котором будет подниматься контекст Spring Framework и выполняться метод makeTransaction:
package name.samolisov.jta.xa.demo.local; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main {     public static void main(String[] args) {         try {             ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context-jdbc.xml");             IntegratorService service = (IntegratorService) ctx.getBean("integratorService");             service.makeTransaction(1L, "Pavel Samolisov", 4);         }         catch (Exception e) {             e.printStackTrace();         }     } }  
Тестирование приложения Предположим, что существует база данных в PostgreSQL с таблицей TBL_PRODUCT, содержащей следующие данные: Предположим, что существует база данных В СУБД Oracle с пустыми таблицами TBL_ACCOUNT и TBL_ORDER: Таким образом на складе есть 5 единиц товара под названием BMW X5 (идентификатор - 1). В коде метода main класса Main делается попытка купить 4 единицы товара. После запуска приложения данная попытка должна быть завершена успешно: Т.е. на складе осталась одна единица товара, а в CRM создан один счет и занесено четыре заказа. Если сейчас заново запустить приложение, то в CRM будут созданы счет и заказы, однако при обновлении сведений о количестве единиц товара на складе будет брошено исключение (на складе сейчас одна единица товара, а надо продать четыре), поэтому будет выполнен откат распределенной транзакции - данные не сохранятся ни в Oracle, ни в PostgreSQL.

Подключаем JMS/XA (ActiveMQ)

Для обеспечения гарантированой доставки сообщений между удаленными системами используется подсистема Java Message Service (JMS). Существует несколько реализаций данной подсистемы, как коммерческих (Oracle AQ, WebSphere MQ), так и с открытыми исходными кодами (ActiveMQ, OpenJMS). В данном примере будем использовать ActiveMQ - JMS-брокер с открытым исходным кодом, поддерживающий Enterprise-расширения и хранение сообщений как в СУБД, так и в файлах. Ассинхронная отправка/прием сообщений с помощью JMS подразумевает следующее: существует слушатель, который периодически проверяет состояние JMS-очереди/топика (полинг). Если в очередь записывается сообщение, то данный слушатель считывает его и выполняет обработку. Как запись, так и считывание сообщения может осуществляться в рамках транзакции (понятно, что это - отдельные транзакции для записи и считывания). При записи: в случае возникновения исключительной ситуации - сообщение не записывается в очередь/топик. При чтении: в случае возникновения исключительной ситуации - сообщение выталкивается в специальную очередь для необработанных сообщений. XA-транзакции с участием JMS/JDBC подразумевают, что в случае ошибок будет откачены не только изменения в базе данных, но и не записано/возвращено сообщение JMS-брокеру. Постановка задачи Для демонстрации использования JMS в распеделенных транзакциях расширим созданное в предыдущем разделе приложение: обеспечим интеграцию с CRM-системой через JMS-очередь. Для этого необходимо выполнить два действия: 1. Разработать слушатель JMS-очереди, который при получении сообщения будет сохранять заказы и счета в CRM-системе. 2. Разработать механизм отправки сообщений для CRM в JMS-очередь. И отправка и чтение сообщений будут осуществляться в рамках XA-транзакций. Для использования JMS-очереди в Spring Framework необходимо определить следующие бины: Фабрику соединений, поддерживающую XA:
    <!-- Configure the ActiveMQ Queue Factory -->     <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">         <property name="brokerURL" value="${jms.broker.url}"/>     </bean>
Непосредственно очередь сообщений:
    <!-- Configure the ActiveMQ Queue for accounts -->     <bean id="crmQueue" class="org.apache.activemq.command.ActiveMQQueue">         <constructor-arg value="${jms.crm.queue.name}"/>     </bean>
Atomikos-коннектор для JMS:
    <bean id="connectionFactoryBean"          class="com.atomikos.jms.AtomikosConnectionFactoryBean"          init-method="init"          destroy-method="close">         <property name="uniqueResourceName" value="QUEUE_BROCKER"/>         <property name="xaConnectionFactory" ref ="xaFactory"/>     </bean>
Следует обратить внимание, что коннектор принимает два параметра: уникальный идентификатор - название ресурса и ссылку на фабрику соединений с JMS-брокером. Отправка сообщений Для взаимодействия с JMS в состав Spring Framework входит класс org.springframework.jms.core.JmsTemplate. При создании данный класс принимает следующие основные параметры: - connectionFactory - ссылку на фабрику соединений, - sessionTransacted - если данный флаг установлен в true, то взаимодействие с JMS будет осуществляться в рамках транзакции. Обязательно необходимо устанавить значение параметра sessionTransacted в true.
    <bean id="crmJmsTemplate"          class="org.springframework.jms.core.JmsTemplate">         <property name="connectionFactory" ref="connectionFactoryBean"/>         <property name="defaultDestination" ref="crmQueue"/>         <property name="receiveTimeout" value="3000"/>         <property name="sessionTransacted" value="true"/>     </bean>
Сама транзакция при отправке сообщений в JMS-очередь должна создаваться в коде, использующем jmsTemplate. Для демонстрации интеграции создадим сервис JmsIntegratorService, в котором взаимодействие с Интернет-магазином будет осуществляться напрямую, а с CRM - через JMS:
package name.samolisov.jta.xa.demo.jms; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import name.samolisov.jta.xa.demo.eshop.IEshopService; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.transaction.annotation.Transactional; public class JmsIntegratorService {     private JmsTemplate crmJmsTemplate;     private IEshopService eshopService;     @Transactional(rollbackFor = Exception.class)     public void makeTransaction(final Long productId, final String customerName, final int count) throws Exception {         // ...(ESHOP)         final double price = eshopService.getPrice(productId);         final String productName = eshopService.getProductName(productId);         // ... (CRM)         crmJmsTemplate.send(new MessageCreator() {             @Override             public Message createMessage(Session session) throws JMSException {                 MapMessage message = session.createMapMessage();                 message.setStringProperty("name", customerName);                 message.setStringProperty("productName", productName);                 message.setDoubleProperty("price", price);                 message.setIntProperty("count", count);                 return message;             }         });         // ... (ESHOP)         eshopService.sell(productId, count);     }     public void setCrmJmsTemplate(JmsTemplate crmJmsTemplate) {         this.crmJmsTemplate = crmJmsTemplate;     }     public void setEshopService(IEshopService eshopService) {         this.eshopService = eshopService;     } }
Метод makeTransaction аннотирован @Transactional, соответственно он будет выполняться внутри транзакции, включающей в себя так же и взаимодействие с JMS. Прием сообщений Для асинхронного чтения сообщений используется механизм слушателей - классов, реализующих интерфейс javax.jms.MessageListener. Для интеграции с CRM создадим слушатель OnCreateAccountAndOrdersListener, который обрабатывает получаемые сообщения и заносит счета и заказы в систему:
package name.samolisov.jta.xa.demo.crm.jms; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import name.samolisov.jta.xa.demo.crm.ICrmService; import org.apache.log4j.Logger; public class OnCreateAccountAndOrdersListener implements MessageListener {     private static final Logger _log = Logger.getLogger(OnCreateAccountAndOrdersListener.class);     private ICrmService service;     public void setService(ICrmService service) {         this.service = service;     }     @Override     public void onMessage(Message msg) {         try {             MapMessage m = (MapMessage) msg;             String name = m.getStringProperty("name");             String productName = m.getStringProperty("productName");             double price = m.getDoubleProperty("price");             int count = m.getIntProperty("count");             _log.debug("Creating an Account with name = " + name + ", product = " + productName                     + ", price = " + price + ", count = " + count);             service.addAccount(name, productName, price, count);             _log.debug("Creating Orders with name = " + name + ", product = " + productName                     + ", price = " + price + ", count = " + count);             service.addOrders(name, productName, price, count);         }         catch (Exception e) {             _log.error("Could not handle a message", e);             // &#1074;&#1099;&#1073;&#1088;&#1086;&#1089;&#1080;&#1084; Runtime Exception, &#1095;&#1090;&#1086;&#1073;&#1099; &#1086;&#1090;&#1082;&#1072;&#1090;&#1080;&#1090;&#1100; &#1090;&#1088;&#1072;&#1085;&#1079;&#1072;&#1082;&#1094;&#1080;&#1102;             throw new RuntimeException(e.getMessage());         }     } }  
Важно! При чтении сообщения мы не описываем транзакцию даже декларативно (с помощью аннотации @Transactional), соответственно мы не можем управлять параметрами отката. По-умолчанию, транзакции откатываются только при бросании исключений-наследников от RuntimeException (неуловимых исключений). Данное исключение бросается в блоке catch слушателя. В Spring Framework для регистрации слушателей применяется концепция контейнеров. Контейнер представляет собой экземпляр класса org.springframework.jms.listener.DefaultMessageListenerContainer. При создании экземпляра данного класса необходимо определить следующие основные параметры:
  • connectionFactory - фабрика соединений с JMS-брокером;
  • destination - ссылка на очередь, с которой будут считываться сообщения;
  • messageListener - ссылка на слушатель сообщений;
  • transactionManager - ссылка на координатор транзакций, зарегистрированный в Spring Framework. Именно данный координатор будет создавать транзакции при получении сообщения и вызове слушателя. Если не указать transactionManager, то при вызове метода onMessage слушателя транзакции создаваться не будут.
  • sessionTransacted - если данный параметр установлен в true, то JMS-очередь будет участвовать в распределенной транзакции при чтении.
    <bean id="onCreateAccountAndOrdersListener" class="name.samolisov.jta.xa.demo.crm.jms.OnCreateAccountAndOrdersListener">         <property name="service" ref="crmService"/>     </bean>     <bean id="crmListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">         <property name="transactionManager" ref="transactionManager" />         <property name="connectionFactory" ref="connectionFactoryBean" />         <property name="messageListener" ref="onCreateAccountAndOrdersListener" />         <property name="destination" ref="crmQueue" />         <property name="concurrentConsumers" value="1" />         <property name="receiveTimeout" value="3000" />         <property name="sessionTransacted" value="true"/>     </bean>
Необходимо обратить внимание на взаимосвязь параметров transactionManager и sessionTransacted: а) если transactionManager не установлен и sessionTransacted == false, то при обработке сообщений транзакции создаваться не будут; б) если transactionManager не установлен и sessionTransacted == true, то будет сгенерировано исключение при создании бина; в) если transactionManager установлен и sessionTransacted == false, то метод onMessage слушателя будет выполняться в транзакции, но чтение сообщений из очереди не будет производиться в этой транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены, однако сообщение в JMS не вернется; г) если transactionManager установлен и sessionTransacted == true, то метод onMessage слушателя будет выполняться в транзакции и чтение сообщения будет выполняться в этой же транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены и сообщение вернется в JMS. Тестирование приложения Для тестирования создадим два класса с методами main - MainIntegrator, выполняющий метод makeTransaction, и MainReceiver, запускающий слушателя. Предположим, что брокер ActiveMQ установлен, настроен и запущен, исходное состояние баз данных восстановлено. Запустим MainIntegrator, т.к. исходное состояние баз данных корректно и брокер запущен, то транзакция завершится успешно: сообщение записано в очередь, изменения в базе данных Интернет-магазина выполнены. Теперь запустим MainReceiver. Т.к. в очереди есть сообщение, то после запуска слушателя оно будет считано и обработано: в таблицы TBL_ACCOUNT и TBL_ORDER будут занесены данные. Можно поэкспериментировать с эмуляцией ошибок: запустить MainIntegrator еще раз. Т.к., на складе осталась одна единица товара, а производится попытка купить четыре, то будет брoшено исключение и выполнен откат транзакции - сообщение не будет записано в очередь JMS. Если в код слушателя добавить строчку
throw new Exception();
то при его вызове так же будет выполнен откат транзакции. Само сообщение при этом будет вытолкнуто в очередь ActiveMQ.DLQ: из которой его можно прочитать и обработать уже другим процессом.

Использование Atomikos в качестве менеджера транзакций для Hibernate

Hibernate является одной из самых популярных реализаций объектно-реляционного отображения (ORM) в мире Java. Данный фреймворк довольно гибко настраивается и может использовать координатор из JTA для управления транзакциями. Для того, чтобы Hibernate использовал JTA (в нашем случае - Atomikos) в качестве менеджера транзакций необходимо настроить два параметра:
  • hibernate.transaction.factory_class - фабрика транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory;
  • hibernate.transaction.manager_lookup_class - класс, отвечающий за взаимодействие с координатором транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup.
В качестве значения параметра dataSource необходимо использовать источник данных с поддержкой XA. В остальных аспектах Hibernate конфигурируется как обычно:
<!-- Configure a Hibernate Properties -->     <bean id="hibernateProperties"          class="org.springframework.beans.factory.config.PropertiesFactoryBean">         <property name="properties">             <props>                 <prop key="hibernate.transaction.factory_class">                     com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory                 </prop>                 <prop key="hibernate.transaction.manager_lookup_class">                     com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup                 </prop>                 <prop key="hibernate.dialect">                     ${hibernate.dialect}                 </prop>                 <prop key="hibernate.hbm2ddl.auto">                     ${hibernate.hbm2ddl.auto}                 </prop>                 <prop key="hibernate.show_sql">                     ${hibernate.show_sql}                 </prop>                 <prop key="hibernate.cache.provider_class">org.hibernate.cache.OSCacheProvider</prop>                 <prop key="hibernate.cache.use_query_cache">true</prop>                 <prop key="hibernate.connection.autocommit">false</prop>             </props>         </property>     </bean>     <!--        Configure Hibernate to use the Atomikos JTA and        datasource for transaction control    -->    <bean id="sessionFactory"         class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">       <property name="annotatedClasses">          <list>             <value>name.samolisov.jta.xa.demo.crm.Account</value>             <value>name.samolisov.jta.xa.demo.crm.Order</value>          </list>       </property>       <property name="dataSource" ref="dataSource1"/>       <property name="hibernateProperties" ref="hibernateProperties"/>    </bean>     <!--       Configure the Spring hibernate template with the session factory from above    -->    <bean id="hibernateTemplate"         class="org.springframework.orm.hibernate3.HibernateTemplate">       <property name="sessionFactory" ref="sessionFactory"/>    </bean>    <bean id="hibernateCrmDao" class="name.samolisov.jta.xa.demo.crm.hibernate.HibernateCrmDao">       <property name="hibernateTemplate" ref="hibernateTemplate"/>    </bean>
Благодаря использованию патерна "Инверсия управления" в Spring Framework можно легко определить сервисы, использующие HibernateCrmDao вместо JdbcCrmDao. Код данных сервисов и сервиса интеграции останется неизменным, транзакции так же будут определяться декларативно, в качестве менеджера транзакций будет использоваться координатор, определенный в Spring Framework. Тестирование нового варианта приложения не отличается от варианта, используюшего чистый JDBC.

Заключение

В данной статье мы рассмотрели построение JavaSE приложения, использующего распределенные транзакции, с помощью Spring Framework, Atomikos, JDBC, JMS и Hibernate. Как оказалось, для использования возможностей Java Transaction API сервер приложений не является необходимым компонентом. Spring Framework позволяет легко обеспечить удобное декларативное управление распределенными транзакциями точно так же, как и локальными, при этом в транзакциях могут участвовать не только соединения с базой данных, но и JMS, что позволяет создавать легкие, но надежные интеграционные решения, не требующие использования тяжелых компонентов.

Ресурсы

Понравилось сообщение - подпишитесь на блог

13 комментариев:

Pavel Samolisov комментирует...

Я понимаю, что в статью надо бы добавить картинок, схем транзакций, кодов XA-координаторов и прочих кошерных вещей, но совсем нет времени.

Mikhail Krestjaninoff комментирует...

Отличная статья, большое вам спасибо!

Небольшой вопрос, что бы окончательно уложить всё в голове: распределённая транзакция может работать только в рамках одного приложения? К примеру возьмём описанный выше сценарий ESHOP-JMS-CRM. Я правильно понимаю, что при возникновении исключения в методе OnCreateAccountAndOrdersListener.onMessage откатяться только изменения, вызванные методами service.addAccount и service.addOrders, а метод eshopService.sell(productId, count) в JmsIntegratorService.makeTransaction будет выполнен, так как предшествующий ему код упешно поставил сообщение в очередь?

Pavel Samolisov комментирует...

Да, все правильно. Более того, на момент записи сообщения в очередь слушатель вообще может отсутствовать (асинхронность).

Однако JMS брокеры как правило сохраняют сообщения, отправленные в очереди/топики, поэтому, если сообщение попало в JMS, то в какой-то мере гарантируется его доставка.

Andrew Fink комментирует...

Прекрасная статья!

После длительного периода "теоретической физики" (OSGi, Eclipse труляля) ядреный полезный пищевой концентрат!

Продолжайте в этом же направлении!

PS:
C MQ брокером долго определялись, тестировали: выбрали JBoss Шершня http://www.jboss.org/hornetq

Легкий, элегентный, очень быстрый (the Performance Leader in Enterprise Messaging).

Pavel Samolisov комментирует...

Мы в проекте используем Oracle Fussion Middleware, а для души решил потестить ActiveMQ, однако, спасибо за наводку - посмотрю в сторону шершня.

Ну и вообще спасибо за отзыв.

Timur комментирует...

Я так понимаю с Jdeveloper'ом работаете. Как вам после eclipse эта IDE?

Pavel Samolisov комментирует...

Непривычно, но композиты рисовать - милое дело

sergey комментирует...

полезно. спс!

Nikita Eshkeev комментирует...

Как распределенные транзации будут работать с dblink? неявно оракл берет на себя управление транзакцией, когда встречаются dblink, так кто будет главнее?

Pavel Samolisov комментирует...

Такая конфигурация поддерживается, но с ограничениями. Детали зависят от используемой версии Oracle. Подробнее можно прочесть в разделе XA and Database Links документа.

Serg Kunz комментирует...

А какой уровень изоляции будет использоваться для данной распределенной транзакции? Если по умолчанию в БД, то READ COMMITED, так? Тогда мне кажется, что есть кэйс, при котором транзакция может некорректно выполнится в условиях реальной много конкурентной среды.

Например код в методе EshopService.sell()

int hasCount = dao.getCount(productId);

может быть прочитан двумя транзакциями одновременно, так как будет применена SHARED блокировка для таблицы товаров. А значит обе транзакции будут одновременно думать, что у нас на складе есть 4 BMW в наличии, что не корректно. Нужно ли здесь повышать уровень изоляции транзакции и как?

Поправьте меня, если я ошибаюсь.

Pavel Samolisov комментирует...

Когда писал статью (это же уже больше 4-х лет прошло, вот время летит) об уровнях изоляции не задумывался. Как я понимаю будет использоваться уровень по-умолчанию, причем в каждом подключенном ресурсе свой. Это конечно же надо учитывать. Более того, координатор транзакций после получения всех положительных ответов на вызов prepare() будет ПОСЛЕДОВАТЕЛЬНО выдавать команды commit() всем ресурсам, а это значит, что данные в одном ресурсе обновятся раньше, чем в другом, что может вызвать гонки. Классический пример - паттерн "квитанция": пишем в очередь id сообщения, а данные сохраняем в БД. Коммит. В очередь закоммитилось раньше, чем в БД, а на эту очередь есть подписчик, который прежде чем обрабатывать сообщение "поднимает" данные из БД по этому id. Соответственно подписчик "видит" сообщение, делает запрос в базу и получает пустой result set, т.к. коммит до базы еще не прошел. Сам такое неоднократно встречал.

По поводу вашего комментария. Не вижу разницы между окружением с распределенными транзакциями и с одной базой. Действительно, в случае, если не делать SELECT FOR UPDATE, то каждая одновременно выполняемая транзакция при уровне изоляции READ_COMMITTED увидит одно и то же значение и тут уже которой повезет закоммититься последней. У классической реализации оптимистической блокировки та же проблема - кто гарантирует, что две транзакции изменения объекта не придут одновременно и не "увидят" одно и то же значение поля VERSION при старте своей работы?

В принципе, если говорить о каком-нибудь веб-приложении, где пользователь долго-долго заполняет форму, потом нажимает SUBMIT, на сервере стартует транзакция и данные быстро обновляются, то вероятность того, что два пользователя одновременно нажмут кнопку SUBMIT да еще и для одного и того же объекта чрезвычайно мала, с другой же стороны здесь можно применить и SELECT FOR UPDATE, т.к. время выполнения транзакции гораздо меньше времени ввода данных пользователем. Видел подобную реализацию оптимистической (оптимистической, несмотря на наличие SELECT version FOR UPDATE) в гайдах Но для более конкурентной среды, например для процессинговой системы, нужно думать как бы и рыбку съесть (не потерять обновление) и косточной не подавиться (не проиграть в конкурентности).

Pavel Samolysov комментирует...

В связи с гигантским паразитным трафиком на статью из Германии, URL был изменен.

Отправить комментарий

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!