При интеграции приложений в единую информационную систему наиболее остро встает проблема обеспечения целостности и непротиворечивости данных. Суть данной проблемы в следующем: каждое приложение может работать со своим независимым хранилищем данных и при интеграции приложений возможна ситуация, когда данные в одном хранилище обновились, а в другом (например, в результате сбоя по питанию) - нет. Последствия таких ошибок могут быть довольно печальны: в случае сколь-нибудь крупного производственного предприятия сбой при сохранении производственного задания может привести к простою всего предприятия, или наоборот - выпуску лишней дневной нормы продукции.
Для решения данной проблемы используются т.н. распределенные транзакции - транзакции, охватывающие несколько источников данных. В мире Java такие транзакции поддерживаются с помощью Java Transaction API (JSR 907), являющегося частью спецификации JavaEE. Однако, данный факт не обозначает, что работать с JTA можно только с помощью Java EE-сервера приложений. Существуют различные, в том числе и OpenSource, реализации JTA, в частности - Atomikos TransactionsEssentials, которая содержит JDBC/XA и JMS/XA пулы соединений, а также координатор распределенных транзакций. В данной статье мы рассмотрим использование Atomikos TransactionsEssentials для управления распределенными транзакциями, в которых будут участновать JDBC- и JMS-соединения, а также Hibernate. Для объединения компонентов системы будем использовать Spring Framework.
Содержание
- Понятие XA-транзакции.
- Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework.
- Подключение JMS/XA (ActiveMQ).
- Использование Atomikos в качестве менеджера транзакций для Hibernate.
- Заключение.
- Ресурсы.
Понятие XA-транзакции
Спецификация на распределенные глобальные - XA - транзакций была разработана вендорским комитетом X/Open Group. XA транзакции управляются с помощью т.н. координатора XA-транзакций и объединяют несколько источников данных - JDBC-пулы соединений и/или JMS. Источники даных, способные участвовать в XA-транзакции называются XA-ресурсами и реализуют интерфейс javax.transaction.xa.XAResource. JTA содержит немного измененную спецификацию XA.
Коммит XA-транзакции считается успешным ТОЛЬКО если успешен коммит всех локальных транзакций XA-ресурсов. В случае, если коммит в каком-либо из локальных ресурсов неуспешен, то происходит откат всей глобальной транзакции. Для реализации такого поведения используется протокол двухфазного коммита (2PC).
Двухфазный коммит имеет два четко разделенных этапа:
Этап подготовки - координатор посылает сообщение участникам распределенной транзакции о подготовке к транзакции (prepare message). Это сообщение также содержит уникальный номер транзакции TID. Когда участники получают это сообщение, они проверяют смогут ли зафиксировать транзакцию и отвечают координатору. При этом транзакция исполняется, но не фиксируется и ее состояние сохраняется на диске.
Этап фиксации - после того, как координатор получил все ответы, он решает зафиксировать или прервать начатую транзакцию в соответствии с правилом глобальной фиксации. Если все участники глобальной транзакции ответили, что могут ее зафиксировать, то он посылает участникам сообщение о фиксации транзакции, в противном случае все участники получают сообщение об откате транзакции.
Например, в PostgreSQL двухфазный коммит реализован так, что транзакции сначала подготавливаются (PREPARE TRANSACTION) и сохраняются на диск. В последствии же они могут быть зафиксированы (COMMIT PREPARED) или отменены (ROLLBACK PREPARED) даже после перезагрузки системы.
Для участия в распределенных транзакциях как СУБД или JMS/MQ-брокер, так и их JDBC- и JMS-драйвера должны поддерживать XA протокол согласно JTA.
Использование JDBC/XA-пулов и координатора транзакций из Atomikos с помощью Spring Framework
В состав библиотеки Atomikos TransactionsEssentials входят XA-совместимые пулы соединений с JDBC- и JMS-, а так же JTA-совместимый координатор транзакций. В данной части статьи мы рассмотрим конфигурирование и использование Atomikos с помощью SpringFramework.
Постановка задачи
Рассмотрим следующую задачу: имеется две информационные системы: Интернет-магазин (ESHOP), содержащий сведения о товарах, их цены и наличие на складе и Система управления отношениями с клиентами (CRM). Необходимо реализовать бизнес-процесс покупки товара по следующему алгоритму:
1. Получить актуальну цену товара (ESHOP).
2. Ввести заказы в систему (CRM).
3. Выставить счет клиенту (CRM).
4. Обновить сведения о наличии товара на складе (ESHOP).
При выполнении любого этапа возможна ошибка (например, упал сервер СУБД или на складе нет необходимого количества товара), поэтому данные операции необходимо выполнять в рамках XA-транзакции.
Для чистоты эсперимента будем считать, что Интернет-магазин использует СУБД PostgreSQL, а CRM - Oracle.
Подготовка СУБД к выполнению XA-транзакций
Прежде всего необходимо подготовить СУБД для обработки XA-транзакций. Для PostgreSQL необходимо в настройках сервера установить значение параметра max_prepared_transactions равным, например, 10. Подробнее можно прочитать в документации на сервер PostgreSQL.
Для Oracle 9.2/10 необходимо дать пользователю, от имени которого будет выполняться транзакция, соответствующие права:
grant select on sys.dba_pending_transactions to user name; grant select on sys.pending_trans$ to user name; grant select on sys.dba_2pc_pending to user name; grant execute on sys.dbms_system to user name;
Разработка приложения
Теперь можно приступить непосредственно к написанию приложения. Сделаем следующее:
1. Определим XA-совместимые источники данных.
2. Настроим менеджер транзакций Spring Framework на использование Atomikos.
3. Настроим декларативное управление транзакциями с помощью аннотаций.
4. Создадим DAO-классы и сервисы для эмуляции ESHOP и CRM.
5. Создадим сервис-интегратор, демонстрирующий использование XA-транзакции.
Будем использовать следующий шаблон XML-файла описания контекста Spring Framework, в который будут добавляться описания бинов:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
</beans>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
</beans>
В качестве источника данных используется класс com.atomikos.jdbc.AtomikosDataSourceBean, позволяющий создать XA-совместимый пул соединений. Наиболее важными параметрами данного класса являются: xaDataSourceName, который принимает название класса XA-совместимого источника данных, и xaProperties, который принимает коллекцию свойств, описывающих параметры подключения к СУБД.
<!-- XA Datasource - using Oracle DBMS (using the URL property)-->
<bean id="dataSource1"
class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init"
destroy-method="close">
<property name="uniqueResourceName" value="EIS1DBMS"/>
<property name="xaDataSourceClassName" value="${jdbc.oracle.driverClassName}"/>
<property name="xaProperties">
<props>
<prop key="URL">${jdbc.oracle.url}</prop>
<prop key="user">${jdbc.oracle.username}</prop>
<prop key="password">${jdbc.oracle.password}</prop>
</props>
</property>
<property name="poolSize" value="${jdbc.oracle.pool-size}"/>
<property name="testQuery" value="${jdbc.oracle.testQuery}"/>
</bean>
<bean id="dataSource1"
class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init"
destroy-method="close">
<property name="uniqueResourceName" value="EIS1DBMS"/>
<property name="xaDataSourceClassName" value="${jdbc.oracle.driverClassName}"/>
<property name="xaProperties">
<props>
<prop key="URL">${jdbc.oracle.url}</prop>
<prop key="user">${jdbc.oracle.username}</prop>
<prop key="password">${jdbc.oracle.password}</prop>
</props>
</property>
<property name="poolSize" value="${jdbc.oracle.pool-size}"/>
<property name="testQuery" value="${jdbc.oracle.testQuery}"/>
</bean>
В качетсве XA-совместимого источника данных для СУБД Oracle используется класс oracle.jdbc.xa.client.OracleXADataSource.
Для PostgreSQL XA-совместимым источником данных является класс org.postgresql.xa.PGXADataSource, который конфигурируется несколько иначе: вместо параметра URL необходимо явно передать параметры serverName, portNumber, databaseName и т.д.:
<!-- XA Datasource - using PostgreSQL DBMS (using the serverName, portNumber and other properties -->
<bean id="dataSource2"
class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init"
destroy-method="close">
<property name="uniqueResourceName" value="EIS2DBMS"/>
<property name="xaDataSourceClassName" value="${jdbc.postgres.driverClassName}"/>
<property name="xaProperties">
<props>
<prop key="serverName">${jdbc.postgres.serverName}</prop>
<prop key="portNumber">${jdbc.postgres.portNumber}</prop>
<prop key="databaseName">${jdbc.postgres.databaseName}</prop>
<prop key="user">${jdbc.postgres.username}</prop>
<prop key="password">${jdbc.postgres.password}</prop>
</props>
</property>
<property name="poolSize" value="${jdbc.postgres.pool-size}"/>
<property name="testQuery" value="${jdbc.postgres.testQuery}"/>
</bean>
<bean id="dataSource2"
class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init"
destroy-method="close">
<property name="uniqueResourceName" value="EIS2DBMS"/>
<property name="xaDataSourceClassName" value="${jdbc.postgres.driverClassName}"/>
<property name="xaProperties">
<props>
<prop key="serverName">${jdbc.postgres.serverName}</prop>
<prop key="portNumber">${jdbc.postgres.portNumber}</prop>
<prop key="databaseName">${jdbc.postgres.databaseName}</prop>
<prop key="user">${jdbc.postgres.username}</prop>
<prop key="password">${jdbc.postgres.password}</prop>
</props>
</property>
<property name="poolSize" value="${jdbc.postgres.pool-size}"/>
<property name="testQuery" value="${jdbc.postgres.testQuery}"/>
</bean>
После определения источников данных нужно сконфигурировать Spring Framework на использование JTA в качестве менеджера транзакций. Для этого существует класс org.springframework.transaction.jta.JtaTransactionManager, принимающий два параметра: transactionManager и userTransaction. Atomikos предоставляет собственные реализации как transactionManager'а, так и userTransaction:
<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="jtaTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close"/>
<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="jtaUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="userTransaction" ref="jtaUserTransaction"/>
</bean>
<bean id="jtaTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close"/>
<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="jtaUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="jtaTransactionManager"/>
<property name="userTransaction" ref="jtaUserTransaction"/>
</bean>
Созданный с помощью Spring Framework менеджер транзакций теперь можно использовать внутри разрабатываемого приложения. Сами транзакции при этом можно декларативно описывать с помощью аннотаций, для чего в контекст Spring необходимо добавить:
<tx:annotation-driven transaction-manager="transactionManager" />
Управление транзакциями с помощью аннотаций подробно рассмотренно в статье: Как подружить Hibernate со Spring и обеспечить управление транзакциями через @ннотации.
В состав Spring Framework входит довольно удобная для программиста поддержка JDBC. Чтобы воспользоваться данной возможностью, необходимо реализовывать свои DAO-классы, как наследники org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport, тем самым получая доступ к org.springframework.jdbc.core.simple.SimpleJdbcTemplate:
package name.samolisov.jta.xa.demo.eshop.jdbc;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import name.samolisov.jta.xa.demo.eshop.IProductDao;
import name.samolisov.jta.xa.demo.eshop.Product;
import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
import org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport;
public class JdbcProductDao extends SimpleJdbcDaoSupport implements IProductDao {
private static final String PRODUCT_TABLE = "TBL_PRODUCT";
private static final String GET_ALL_QUERY = "select * from " + PRODUCT_TABLE + " order by price";
private static final String GET_PRICE_QUERY = "select price from " + PRODUCT_TABLE + " where id = ?";
private static final String GET_COUNT_QUERY = "select count from " + PRODUCT_TABLE + " where id = ?";
private static final String GET_NAME_QUERY = "select name from " + PRODUCT_TABLE + " where id = ?";
private static final String UPDATE_COUNT_QUERY = "update " + PRODUCT_TABLE + " set count = ? where id = ?";
private static ProductRowMapper MAPPER = new ProductRowMapper();
@Override
public List<Product> getAllProducts() {
return getSimpleJdbcTemplate().query(GET_ALL_QUERY, MAPPER);
}
@Override
public double getPrice(Long productId) {
return getSimpleJdbcTemplate().queryForObject(GET_PRICE_QUERY, Double.class, productId);
}
@Override
public int getCount(Long productId) {
return getSimpleJdbcTemplate().queryForInt(GET_COUNT_QUERY, productId);
}
@Override
public String getProductName(Long productId) {
return getSimpleJdbcTemplate().queryForObject(GET_NAME_QUERY, String.class, productId);
}
@Override
public void updateCount(Long productId, Integer newCount) {
getSimpleJdbcTemplate().update(UPDATE_COUNT_QUERY, newCount, productId);
}
public static class ProductRowMapper implements ParameterizedRowMapper<Product> {
@Override
public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setCount(rs.getInt("count"));
product.setName(rs.getString("name"));
product.setPrice(rs.getDouble("price"));
return product;
}
}
}
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import name.samolisov.jta.xa.demo.eshop.IProductDao;
import name.samolisov.jta.xa.demo.eshop.Product;
import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
import org.springframework.jdbc.core.simple.SimpleJdbcDaoSupport;
public class JdbcProductDao extends SimpleJdbcDaoSupport implements IProductDao {
private static final String PRODUCT_TABLE = "TBL_PRODUCT";
private static final String GET_ALL_QUERY = "select * from " + PRODUCT_TABLE + " order by price";
private static final String GET_PRICE_QUERY = "select price from " + PRODUCT_TABLE + " where id = ?";
private static final String GET_COUNT_QUERY = "select count from " + PRODUCT_TABLE + " where id = ?";
private static final String GET_NAME_QUERY = "select name from " + PRODUCT_TABLE + " where id = ?";
private static final String UPDATE_COUNT_QUERY = "update " + PRODUCT_TABLE + " set count = ? where id = ?";
private static ProductRowMapper MAPPER = new ProductRowMapper();
@Override
public List<Product> getAllProducts() {
return getSimpleJdbcTemplate().query(GET_ALL_QUERY, MAPPER);
}
@Override
public double getPrice(Long productId) {
return getSimpleJdbcTemplate().queryForObject(GET_PRICE_QUERY, Double.class, productId);
}
@Override
public int getCount(Long productId) {
return getSimpleJdbcTemplate().queryForInt(GET_COUNT_QUERY, productId);
}
@Override
public String getProductName(Long productId) {
return getSimpleJdbcTemplate().queryForObject(GET_NAME_QUERY, String.class, productId);
}
@Override
public void updateCount(Long productId, Integer newCount) {
getSimpleJdbcTemplate().update(UPDATE_COUNT_QUERY, newCount, productId);
}
public static class ProductRowMapper implements ParameterizedRowMapper<Product> {
@Override
public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setCount(rs.getInt("count"));
product.setName(rs.getString("name"));
product.setPrice(rs.getDouble("price"));
return product;
}
}
}
Код DAO-класса, реализующего взаимодействие с СУБД для CRM приводить нет смысла, чтобы не загромождать статью. Ссылка на исходный код системы приведена в разделе Ресурсы.
Бизнес-логика в случае Интернет-магазина довольно простая: необходимо создать методы для получения наименования и стоимости товара по его идентификатору, а так же для обновления количества товара, имеющегося на складе - метод sell - причем, необходимо проверять имеется ли на складе достаточное количество товара и, если нет, выбрасывать исключение.
При разработке приложений с использованием Spring Framework обычно применяется архитектурный паттерн Многоуровневая архитектура в следующей реализации:
В общем случае бизнес-логика не сводится к манипулированию одним источником данных (одной сущностью, одной таблицей). Даже в одной локальной транзакции как правило выполняется несколько запросов, поэтому и разделяют слой доступа к данным (содержит только единичные запросы, такие как вставить или изменить сущность) и слой бизнес-логики (содержит последовательности операций над сущностями).
- Слой инфраструктуры (источники данных, менеджер транзакций и т.д.)
- Слой доступа к данным (DAO)
- Слой бизнес-логики (сервисы)
- Слой интерфейса (например, Spring MVC)
package name.samolisov.jta.xa.demo.eshop; import org.apache.log4j.Logger; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; public class EshopService implements IEshopService { private static final Logger _log = Logger.getLogger(EshopService.class); private IProductDao dao; public void setDao(IProductDao dao) { this.dao = dao; } @Override public double getPrice(Long productId) { double price = dao.getPrice(productId); _log.debug("Price for id " + productId + " is " + price); return price; } @Override public String getProductName(Long productId) { String name = dao.getProductName(productId); _log.debug("Product name for id " + productId + " is " + name); return name; } @Override @Transactional(readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Exception.class) public void sell(Long productId, Integer count) throws Exception { int hasCount = dao.getCount(productId); if (hasCount < count) throw new Exception("Could not sell " + count + ". We have only " + hasCount); dao.updateCount(productId, hasCount - count); } }
Метод sell аннотирован @Transactional, поэтому он будет выполняться в рамках транзакции: при входе в данный метод будет создана транзакция, если ее еще нет (propagation = Propagation.REQUIRED). В случае, если на складе нет необходимого количества товара - будет выброшено исключение и будет выполнент откат транзакции. Бизнес-логика CRM-системы реализована аналогично. Объединяются архитектурные уровни с помощью Spring IoC-контейнера: классы доступа к данным и сервисы описываются как бины и инъектируются друг в друга: <bean id="productDao" class="name.samolisov.jta.xa.demo.eshop.jdbc.JdbcProductDao"> <property name="dataSource" ref="dataSource2"/> </bean> <bean id="eshopService" class="name.samolisov.jta.xa.demo.eshop.EshopService"> <property name="dao" ref="productDao"/> </bean> <bean id="crmDao" class="name.samolisov.jta.xa.demo.crm.jdbc.JdbcCrmDao"> <property name="dataSource" ref="dataSource1"/> </bean> <bean id="crmService" class="name.samolisov.jta.xa.demo.crm.CrmService"> <property name="dao" ref="crmDao"/> </bean>
На данном этапе мы создали приложение, эмулирующее Интернет-магазин, использующий одну СУБД и CRM-систему, использующую другую СУБД. Теперь необходимо объединить эти системы и выполнить запросы к ним в рамках одной распределенной транзакции. Для этого создадим класс-интегратор package name.samolisov.jta.xa.demo.local; import name.samolisov.jta.xa.demo.crm.ICrmService; import name.samolisov.jta.xa.demo.eshop.IEshopService; import org.springframework.transaction.annotation.Transactional; public class IntegratorService { private ICrmService crmService; private IEshopService eshopService; @Transactional(rollbackFor = Exception.class) public void makeTransaction(Long productId, String customerName, int count) throws Exception { // ...(ESHOP) double price = eshopService.getPrice(productId); String productName = eshopService.getProductName(productId); // ...(CRM) crmService.addOrders(customerName, productName, price, count); // ...(CRM) crmService.addAccount(customerName, productName, price, count); // ...(ESHOP) eshopService.sell(productId, count); } public void setCrmService(ICrmService crmService) { this.crmService = crmService; } public void setEshopService(IEshopService eshopService) { this.eshopService = eshopService; } }
... и зарегистрируем его в контексте Spring Framework: <bean id="integratorService" class="name.samolisov.jta.xa.demo.local.IntegratorService"> <property name="crmService" ref="crmService"/> <property name="eshopService" ref="eshopService"/> </bean>
Метод makeTransaction класса IntegratorService аннотирован @Transactional, поэтому данный метод будет выполняться внутри транзакции, которая будет откачена, если при выполнении метода будет выброшено исключение. Создадим класс Main, в котором будет подниматься контекст Spring Framework и выполняться метод makeTransaction: package name.samolisov.jta.xa.demo.local; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { try { ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context-jdbc.xml"); IntegratorService service = (IntegratorService) ctx.getBean("integratorService"); service.makeTransaction(1L, "Pavel Samolisov", 4); } catch (Exception e) { e.printStackTrace(); } } }
Тестирование приложения Предположим, что существует база данных в PostgreSQL с таблицей TBL_PRODUCT, содержащей следующие данные: Предположим, что существует база данных В СУБД Oracle с пустыми таблицами TBL_ACCOUNT и TBL_ORDER: Таким образом на складе есть 5 единиц товара под названием BMW X5 (идентификатор - 1). В коде метода main класса Main делается попытка купить 4 единицы товара. После запуска приложения данная попытка должна быть завершена успешно: Т.е. на складе осталась одна единица товара, а в CRM создан один счет и занесено четыре заказа. Если сейчас заново запустить приложение, то в CRM будут созданы счет и заказы, однако при обновлении сведений о количестве единиц товара на складе будет брошено исключение (на складе сейчас одна единица товара, а надо продать четыре), поэтому будет выполнен откат распределенной транзакции - данные не сохранятся ни в Oracle, ни в PostgreSQL. Подключаем JMS/XA (ActiveMQ)
Для обеспечения гарантированой доставки сообщений между удаленными системами используется подсистема Java Message Service (JMS). Существует несколько реализаций данной подсистемы, как коммерческих (Oracle AQ, WebSphere MQ), так и с открытыми исходными кодами (ActiveMQ, OpenJMS). В данном примере будем использовать ActiveMQ - JMS-брокер с открытым исходным кодом, поддерживающий Enterprise-расширения и хранение сообщений как в СУБД, так и в файлах. Ассинхронная отправка/прием сообщений с помощью JMS подразумевает следующее: существует слушатель, который периодически проверяет состояние JMS-очереди/топика (полинг). Если в очередь записывается сообщение, то данный слушатель считывает его и выполняет обработку. Как запись, так и считывание сообщения может осуществляться в рамках транзакции (понятно, что это - отдельные транзакции для записи и считывания). При записи: в случае возникновения исключительной ситуации - сообщение не записывается в очередь/топик. При чтении: в случае возникновения исключительной ситуации - сообщение выталкивается в специальную очередь для необработанных сообщений. XA-транзакции с участием JMS/JDBC подразумевают, что в случае ошибок будет откачены не только изменения в базе данных, но и не записано/возвращено сообщение JMS-брокеру. Постановка задачи Для демонстрации использования JMS в распеделенных транзакциях расширим созданное в предыдущем разделе приложение: обеспечим интеграцию с CRM-системой через JMS-очередь. Для этого необходимо выполнить два действия: 1. Разработать слушатель JMS-очереди, который при получении сообщения будет сохранять заказы и счета в CRM-системе. 2. Разработать механизм отправки сообщений для CRM в JMS-очередь. И отправка и чтение сообщений будут осуществляться в рамках XA-транзакций. Для использования JMS-очереди в Spring Framework необходимо определить следующие бины: Фабрику соединений, поддерживающую XA: <!-- Configure the ActiveMQ Queue Factory --> <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="${jms.broker.url}"/> </bean>
Непосредственно очередь сообщений: <!-- Configure the ActiveMQ Queue for accounts --> <bean id="crmQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="${jms.crm.queue.name}"/> </bean>
Atomikos-коннектор для JMS: <bean id="connectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="QUEUE_BROCKER"/> <property name="xaConnectionFactory" ref ="xaFactory"/> </bean>
Следует обратить внимание, что коннектор принимает два параметра: уникальный идентификатор - название ресурса и ссылку на фабрику соединений с JMS-брокером. Отправка сообщений Для взаимодействия с JMS в состав Spring Framework входит класс org.springframework.jms.core.JmsTemplate. При создании данный класс принимает следующие основные параметры: - connectionFactory - ссылку на фабрику соединений, - sessionTransacted - если данный флаг установлен в true, то взаимодействие с JMS будет осуществляться в рамках транзакции. Обязательно необходимо устанавить значение параметра sessionTransacted в true. <bean id="crmJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryBean"/> <property name="defaultDestination" ref="crmQueue"/> <property name="receiveTimeout" value="3000"/> <property name="sessionTransacted" value="true"/> </bean>
Сама транзакция при отправке сообщений в JMS-очередь должна создаваться в коде, использующем jmsTemplate. Для демонстрации интеграции создадим сервис JmsIntegratorService, в котором взаимодействие с Интернет-магазином будет осуществляться напрямую, а с CRM - через JMS: package name.samolisov.jta.xa.demo.jms; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import name.samolisov.jta.xa.demo.eshop.IEshopService; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.transaction.annotation.Transactional; public class JmsIntegratorService { private JmsTemplate crmJmsTemplate; private IEshopService eshopService; @Transactional(rollbackFor = Exception.class) public void makeTransaction(final Long productId, final String customerName, final int count) throws Exception { // ...(ESHOP) final double price = eshopService.getPrice(productId); final String productName = eshopService.getProductName(productId); // ... (CRM) crmJmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setStringProperty("name", customerName); message.setStringProperty("productName", productName); message.setDoubleProperty("price", price); message.setIntProperty("count", count); return message; } }); // ... (ESHOP) eshopService.sell(productId, count); } public void setCrmJmsTemplate(JmsTemplate crmJmsTemplate) { this.crmJmsTemplate = crmJmsTemplate; } public void setEshopService(IEshopService eshopService) { this.eshopService = eshopService; } }
Метод makeTransaction аннотирован @Transactional, соответственно он будет выполняться внутри транзакции, включающей в себя так же и взаимодействие с JMS. Прием сообщений Для асинхронного чтения сообщений используется механизм слушателей - классов, реализующих интерфейс javax.jms.MessageListener. Для интеграции с CRM создадим слушатель OnCreateAccountAndOrdersListener, который обрабатывает получаемые сообщения и заносит счета и заказы в систему: package name.samolisov.jta.xa.demo.crm.jms; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import name.samolisov.jta.xa.demo.crm.ICrmService; import org.apache.log4j.Logger; public class OnCreateAccountAndOrdersListener implements MessageListener { private static final Logger _log = Logger.getLogger(OnCreateAccountAndOrdersListener.class); private ICrmService service; public void setService(ICrmService service) { this.service = service; } @Override public void onMessage(Message msg) { try { MapMessage m = (MapMessage) msg; String name = m.getStringProperty("name"); String productName = m.getStringProperty("productName"); double price = m.getDoubleProperty("price"); int count = m.getIntProperty("count"); _log.debug("Creating an Account with name = " + name + ", product = " + productName + ", price = " + price + ", count = " + count); service.addAccount(name, productName, price, count); _log.debug("Creating Orders with name = " + name + ", product = " + productName + ", price = " + price + ", count = " + count); service.addOrders(name, productName, price, count); } catch (Exception e) { _log.error("Could not handle a message", e); // выбросим Runtime Exception, чтобы откатить транзакцию throw new RuntimeException(e.getMessage()); } } }
Важно! При чтении сообщения мы не описываем транзакцию даже декларативно (с помощью аннотации @Transactional), соответственно мы не можем управлять параметрами отката. По-умолчанию, транзакции откатываются только при бросании исключений-наследников от RuntimeException (неуловимых исключений). Данное исключение бросается в блоке catch слушателя. В Spring Framework для регистрации слушателей применяется концепция контейнеров. Контейнер представляет собой экземпляр класса org.springframework.jms.listener.DefaultMessageListenerContainer. При создании экземпляра данного класса необходимо определить следующие основные параметры: - connectionFactory - фабрика соединений с JMS-брокером;
- destination - ссылка на очередь, с которой будут считываться сообщения;
- messageListener - ссылка на слушатель сообщений;
- transactionManager - ссылка на координатор транзакций, зарегистрированный в Spring Framework. Именно данный координатор будет создавать транзакции при получении сообщения и вызове слушателя. Если не указать transactionManager, то при вызове метода onMessage слушателя транзакции создаваться не будут.
- sessionTransacted - если данный параметр установлен в true, то JMS-очередь будет участвовать в распределенной транзакции при чтении.
<bean id="onCreateAccountAndOrdersListener" class="name.samolisov.jta.xa.demo.crm.jms.OnCreateAccountAndOrdersListener"> <property name="service" ref="crmService"/> </bean> <bean id="crmListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="transactionManager" ref="transactionManager" /> <property name="connectionFactory" ref="connectionFactoryBean" /> <property name="messageListener" ref="onCreateAccountAndOrdersListener" /> <property name="destination" ref="crmQueue" /> <property name="concurrentConsumers" value="1" /> <property name="receiveTimeout" value="3000" /> <property name="sessionTransacted" value="true"/> </bean>
Необходимо обратить внимание на взаимосвязь параметров transactionManager и sessionTransacted: а) если transactionManager не установлен и sessionTransacted == false, то при обработке сообщений транзакции создаваться не будут; б) если transactionManager не установлен и sessionTransacted == true, то будет сгенерировано исключение при создании бина; в) если transactionManager установлен и sessionTransacted == false, то метод onMessage слушателя будет выполняться в транзакции, но чтение сообщений из очереди не будет производиться в этой транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены, однако сообщение в JMS не вернется; г) если transactionManager установлен и sessionTransacted == true, то метод onMessage слушателя будет выполняться в транзакции и чтение сообщения будет выполняться в этой же транзакции. Т.е., если при выполнении метода onMessage произойдет ошибка, то созданные изменения будут откачены и сообщение вернется в JMS. Тестирование приложения Для тестирования создадим два класса с методами main - MainIntegrator, выполняющий метод makeTransaction, и MainReceiver, запускающий слушателя. Предположим, что брокер ActiveMQ установлен, настроен и запущен, исходное состояние баз данных восстановлено. Запустим MainIntegrator, т.к. исходное состояние баз данных корректно и брокер запущен, то транзакция завершится успешно: сообщение записано в очередь, изменения в базе данных Интернет-магазина выполнены. Теперь запустим MainReceiver. Т.к. в очереди есть сообщение, то после запуска слушателя оно будет считано и обработано: в таблицы TBL_ACCOUNT и TBL_ORDER будут занесены данные. Можно поэкспериментировать с эмуляцией ошибок: запустить MainIntegrator еще раз. Т.к., на складе осталась одна единица товара, а производится попытка купить четыре, то будет брoшено исключение и выполнен откат транзакции - сообщение не будет записано в очередь JMS. Если в код слушателя добавить строчку то при его вызове так же будет выполнен откат транзакции. Само сообщение при этом будет вытолкнуто в очередь ActiveMQ.DLQ: из которой его можно прочитать и обработать уже другим процессом. Использование Atomikos в качестве менеджера транзакций для Hibernate
Hibernate является одной из самых популярных реализаций объектно-реляционного отображения (ORM) в мире Java. Данный фреймворк довольно гибко настраивается и может использовать координатор из JTA для управления транзакциями. Для того, чтобы Hibernate использовал JTA (в нашем случае - Atomikos) в качестве менеджера транзакций необходимо настроить два параметра:- hibernate.transaction.factory_class - фабрика транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory;
- hibernate.transaction.manager_lookup_class - класс, отвечающий за взаимодействие с координатором транзакций, необходимо указать значение com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup.
<!-- Configure a Hibernate Properties --> <bean id="hibernateProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="hibernate.transaction.factory_class"> com.atomikos.icatch.jta.hibernate3.AtomikosJTATransactionFactory </prop> <prop key="hibernate.transaction.manager_lookup_class"> com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup </prop> <prop key="hibernate.dialect"> ${hibernate.dialect} </prop> <prop key="hibernate.hbm2ddl.auto"> ${hibernate.hbm2ddl.auto} </prop> <prop key="hibernate.show_sql"> ${hibernate.show_sql} </prop> <prop key="hibernate.cache.provider_class">org.hibernate.cache.OSCacheProvider</prop> <prop key="hibernate.cache.use_query_cache">true</prop> <prop key="hibernate.connection.autocommit">false</prop> </props> </property> </bean> <!-- Configure Hibernate to use the Atomikos JTA and datasource for transaction control --> <bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean"> <property name="annotatedClasses"> <list> <value>name.samolisov.jta.xa.demo.crm.Account</value> <value>name.samolisov.jta.xa.demo.crm.Order</value> </list> </property> <property name="dataSource" ref="dataSource1"/> <property name="hibernateProperties" ref="hibernateProperties"/> </bean> <!-- Configure the Spring hibernate template with the session factory from above --> <bean id="hibernateTemplate" class="org.springframework.orm.hibernate3.HibernateTemplate"> <property name="sessionFactory" ref="sessionFactory"/> </bean> <bean id="hibernateCrmDao" class="name.samolisov.jta.xa.demo.crm.hibernate.HibernateCrmDao"> <property name="hibernateTemplate" ref="hibernateTemplate"/> </bean>
Благодаря использованию патерна "Инверсия управления" в Spring Framework можно легко определить сервисы, использующие HibernateCrmDao вместо JdbcCrmDao. Код данных сервисов и сервиса интеграции останется неизменным, транзакции так же будут определяться декларативно, в качестве менеджера транзакций будет использоваться координатор, определенный в Spring Framework. Тестирование нового варианта приложения не отличается от варианта, используюшего чистый JDBC. Заключение
В данной статье мы рассмотрели построение JavaSE приложения, использующего распределенные транзакции, с помощью Spring Framework, Atomikos, JDBC, JMS и Hibernate. Как оказалось, для использования возможностей Java Transaction API сервер приложений не является необходимым компонентом. Spring Framework позволяет легко обеспечить удобное декларативное управление распределенными транзакциями точно так же, как и локальными, при этом в транзакциях могут участвовать не только соединения с базой данных, но и JMS, что позволяет создавать легкие, но надежные интеграционные решения, не требующие использования тяжелых компонентов.Ресурсы
- Two-phase commit protocol;
- XA and NonXA datasource;
- J2EE Without the Application Server;
- Транзакционные стратегии;
- Освоение JCA-транзакций;
- Исходный код к статье (Maven-проект, GitHub).
13 комментариев:
Я понимаю, что в статью надо бы добавить картинок, схем транзакций, кодов XA-координаторов и прочих кошерных вещей, но совсем нет времени.
Отличная статья, большое вам спасибо!
Небольшой вопрос, что бы окончательно уложить всё в голове: распределённая транзакция может работать только в рамках одного приложения? К примеру возьмём описанный выше сценарий ESHOP-JMS-CRM. Я правильно понимаю, что при возникновении исключения в методе OnCreateAccountAndOrdersListener.onMessage откатяться только изменения, вызванные методами service.addAccount и service.addOrders, а метод eshopService.sell(productId, count) в JmsIntegratorService.makeTransaction будет выполнен, так как предшествующий ему код упешно поставил сообщение в очередь?
Да, все правильно. Более того, на момент записи сообщения в очередь слушатель вообще может отсутствовать (асинхронность).
Однако JMS брокеры как правило сохраняют сообщения, отправленные в очереди/топики, поэтому, если сообщение попало в JMS, то в какой-то мере гарантируется его доставка.
Прекрасная статья!
После длительного периода "теоретической физики" (OSGi, Eclipse труляля) ядреный полезный пищевой концентрат!
Продолжайте в этом же направлении!
PS:
C MQ брокером долго определялись, тестировали: выбрали JBoss Шершня http://www.jboss.org/hornetq
Легкий, элегентный, очень быстрый (the Performance Leader in Enterprise Messaging).
Мы в проекте используем Oracle Fussion Middleware, а для души решил потестить ActiveMQ, однако, спасибо за наводку - посмотрю в сторону шершня.
Ну и вообще спасибо за отзыв.
Я так понимаю с Jdeveloper'ом работаете. Как вам после eclipse эта IDE?
Непривычно, но композиты рисовать - милое дело
полезно. спс!
Как распределенные транзации будут работать с dblink? неявно оракл берет на себя управление транзакцией, когда встречаются dblink, так кто будет главнее?
Такая конфигурация поддерживается, но с ограничениями. Детали зависят от используемой версии Oracle. Подробнее можно прочесть в разделе XA and Database Links документа.
А какой уровень изоляции будет использоваться для данной распределенной транзакции? Если по умолчанию в БД, то READ COMMITED, так? Тогда мне кажется, что есть кэйс, при котором транзакция может некорректно выполнится в условиях реальной много конкурентной среды.
Например код в методе EshopService.sell()
int hasCount = dao.getCount(productId);
может быть прочитан двумя транзакциями одновременно, так как будет применена SHARED блокировка для таблицы товаров. А значит обе транзакции будут одновременно думать, что у нас на складе есть 4 BMW в наличии, что не корректно. Нужно ли здесь повышать уровень изоляции транзакции и как?
Поправьте меня, если я ошибаюсь.
Когда писал статью (это же уже больше 4-х лет прошло, вот время летит) об уровнях изоляции не задумывался. Как я понимаю будет использоваться уровень по-умолчанию, причем в каждом подключенном ресурсе свой. Это конечно же надо учитывать. Более того, координатор транзакций после получения всех положительных ответов на вызов prepare() будет ПОСЛЕДОВАТЕЛЬНО выдавать команды commit() всем ресурсам, а это значит, что данные в одном ресурсе обновятся раньше, чем в другом, что может вызвать гонки. Классический пример - паттерн "квитанция": пишем в очередь id сообщения, а данные сохраняем в БД. Коммит. В очередь закоммитилось раньше, чем в БД, а на эту очередь есть подписчик, который прежде чем обрабатывать сообщение "поднимает" данные из БД по этому id. Соответственно подписчик "видит" сообщение, делает запрос в базу и получает пустой result set, т.к. коммит до базы еще не прошел. Сам такое неоднократно встречал.
По поводу вашего комментария. Не вижу разницы между окружением с распределенными транзакциями и с одной базой. Действительно, в случае, если не делать SELECT FOR UPDATE, то каждая одновременно выполняемая транзакция при уровне изоляции READ_COMMITTED увидит одно и то же значение и тут уже которой повезет закоммититься последней. У классической реализации оптимистической блокировки та же проблема - кто гарантирует, что две транзакции изменения объекта не придут одновременно и не "увидят" одно и то же значение поля VERSION при старте своей работы?
В принципе, если говорить о каком-нибудь веб-приложении, где пользователь долго-долго заполняет форму, потом нажимает SUBMIT, на сервере стартует транзакция и данные быстро обновляются, то вероятность того, что два пользователя одновременно нажмут кнопку SUBMIT да еще и для одного и того же объекта чрезвычайно мала, с другой же стороны здесь можно применить и SELECT FOR UPDATE, т.к. время выполнения транзакции гораздо меньше времени ввода данных пользователем. Видел подобную реализацию оптимистической (оптимистической, несмотря на наличие SELECT version FOR UPDATE) в гайдах Но для более конкурентной среды, например для процессинговой системы, нужно думать как бы и рыбку съесть (не потерять обновление) и косточной не подавиться (не проиграть в конкурентности).
В связи с гигантским паразитным трафиком на статью из Германии, URL был изменен.
Отправить комментарий
Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!