Одним из наиболее применяемых паттернов интеграции информационных систем является паттерн "Фильтры и трубы". При построении интеграционного решения на Oracle SOA Suite в качестве "фильтров" выступают композиты, а в качестве "труб" удобно использовать JMS-очереди.
Зачастую необходимо реализовать не просто считывание сообщения из "трубы", а считывание и обработку сообщения в рамках одной транзакции, при этом при откате данной транзакции сообщение должно возвращаться в очередь, из которой было считано. В данной заметке мы рассмотрим как добиться такого поведения, а так же какие существуют ограничения, накладываемые Oracle SOA Suite'ом на реализацию транзакционного чтения из JMS.
Создание демонстрационного проекта
Для демонстрации создадим проект JMSTransactionDemo. Вся разработка композитов для Oracle SOA Suite будет производиться в интегрированной среде разработки JDeveloper.
В создаваемом проекте создадим пустой композит SOA Suite.
В поле созданного пустого композита необходимо перетащить JMSAdapter с палитры компонентов. После этого запустится мастер конфигурирования JMS-адаптера. На первом шаге данного мастера необходимо указать название адаптера. Т.к. мы через данный адаптер будет считывать сообщения из очереди, то назовем его InJMSAdapter.
Выберем в качестве используемой системы обмена сообщениями Oracle WebLogic JMS.
Укажем подключение к серверу приложений, выбрав его из списка. Если у вас нет подключений к серверам приложений, то необходимо создать такое подключение, нажав на кнопку с изображением большого зеленого плюса.
Укажем, что определим операцию и схему сообщения, считываемого из JMS-очереди, с помощью мастера.
Выберем операцию. JMS-адаптер позволяет выполнять следующие операции: считывание сообщения (Consume Message), запись сообщения (Produce Message) и синхронный обмен в режиме "запрос-ответ" (Request/Reply). Мы собираемся считывать сообщения из очереди, поэтому выбираем операцию Consume Message.
Теперь можно выбрать очередь, с которой будет осуществляться считывание сообщений, указать тип считываемого сообщения (в нашем случае - TextMessage) и JNDI-наименование пула подключений к JMS-очереди. Можно использовать стандартный пул подключений к WebLogic JMS, предоставленный Oracle SOA Suite - eis/wls/Queue.
Так как мы указали, что будем считывать текстовое сообщение, то нам необходимо определить схему данного сообщения. Для цели нашего демонстрационного примера схема некритична, главное - чтобы была возможность отправлять в очередь сообщения выбранного типа для тестирования нашего создаваемого композита.
На последнем шаге мастера конфигурирования JMS-адаптера нас проинформируют о том, что определение адаптера завершено и что после нажатия кнопки Finish будут созданы файлы с его описанием.
Сконфигурированный адаптер отобразится в окне редактирования композита. Так как мы выбрали операцию Consume Message, то данный адаптер отобразится в левой части композита, т.е. он представляет собой сервис, через который можно создавать новые экземпляры композита.
Теперь необходимо создать BPEL-процесс, в котором будет реализована логика обработки считываемых сообщений. Для создания BPEL процесса достаточно перетащить иконку BPEL Process из палитры компонентов в поле редактирования композита. После перетаскивания запустится диалог создания нового BPEL-процесса. Необходимо указать название процесса, версию спецификации BPEL (1.1 или 2.0) и выбрать шаблон процесса. Создаваемый в демонстрационных целях процесс не будет делать ничего полезного, поэтому остановимся на самом простом варианте - т.н. One Way BPEL Process, т.е. на процессе, который только принимает сообщение, но ничего не возвращает по результатам своей работы.
В окне редактирования композита созданный BPEL процесс необходимо соединить проводом с JMS-адаптером. После этого в BPEL-процессе будет доступна партнерская ссылка InJMSAdapter, которую нужно использовать для настройки активности Receive.
Если щелкнуть два раза по активности Receive, то появится диалог настройки данной активности. В диалоге настройки необходимо выбрать партнерскую ссылку (Partner Link) InJMSAdapter. В поле Operation автоматически будет подставлена единственная операция, определенная по данной партнерской ссылке, - Consume_Message. Необходимо создать переменную, в которую будет записываться считанное из InJMSAdapter сообщение. Для этого необходимо нажать кнопку с изображением большого зеленого плюса, расположенную напротив поля Variable.
После настройки активности Receive нам необходимо определить логику демонстрационного процесса. Т.к. цель нашей демонстрации - показать выталкивание сообщения обратно в очередь при возникновении ошибок в BPEL-процессе, то логика данного процесса должна быть предельно простой: сгенерировать ошибку с помощью активности Throw.
Активность Throw по-умолчанию не настроена. Если по активности щелкнуть два раза, то появится соответствующий диалог настройки.
Обязательными для заполнения являются поля группы Fault QName: Namespace URI и Local Part. Заполняются данные поля с помощью специального диалога, доступного по кнопке с изображением лупы.
Существует три группы ошибок:
- System Faults - системные ошибки;
- Project WSDL Fault - ошибки, определенные в WSDL файлах проекта, например, исключительные ситуации, определенные в методах веб-сервисов;
- Partner Links - ошибки, определенные в файлах определения партнерских ссылок.
Сейчас в демонстрационном проекте доступны только системные ошибки, информации об ошибках из других групп просто нет. Поэтому выберем для активности Throw системную ошибку, например rollback.
Информация о системной ошибке rollback будет скопирована в определение активности Throw.
Настройка транзакций в BPEL-процессе
Теперь у нас создан демонстрационный композит, считывающий сообщение из JMS-очереди и содержащий BPEL-процесс обработки данного сообщения. Однако, если мы сейчас запустим данный композит, то наблюдать транзакционное считывание не сможем - сообщения не будут выталкиваться обратно в JMS-очередь.
Чтобы JMS сообщения обрабатывались в рамках транзакции, композит необходимо настроить.
Прежде всего следует указать тип распространения транзакции в BPEL-процессе. Делается это установкой свойства bpel.config.transaction компонента BPELProcess. Возможны значения: required - если на момент запуска процесса транзакция уже создана, то процесс исполняется в рамках данной транзакции, если транзакция не создана, то она создается, и requiresNew - всегда создается новая траназакция, если же транзакция на момент запуска процесса существует, то она приостанавливается. Значением данного свойства по-умолчанию является required.
Затем необходимо указать синхронный режим обработки однонаправленных сообщений, т.е. сообщений, считываемых из JMS-очереди. Делается это выставлением значения свойства bpel.config.oneWayDeliveryPolicy компонента BPELProcess в значение sync. Данное значение указывает на то, что обрабатываться сообщение должно в том же потоке, в котором считывается. По-умолчанию, после считывания сообщения из очереди создается новый поток, в котором запускается BPEL-процесс, обработки сообщения. Так как транзакция привязывается к потоку, то считывание сообщения и его обработка будут производиться в разных транзакциях. После установки значения данного свойства в sync процессы считывания и обработки сообщения будут выполняться в рамках одной транзакции. Стоит отметить, что значение данного свойства не влияет на чтение сообщений из других очередей, если таковое присутствует в запущеном экземпляре BPEL-процесса. Все такие операции выполняются асинхронно с учетом корреляционного идентификатора.
Непосредственно для управления параметрами повторной обработки вытолкнутого в очередь сообщения служит набор свойств компонента JMS-адаптер:
- jca.retry.count - количество попыток повторного чтения сообщения;
- jca.retry.interval - интервал времени между попытками чтения, в секундах;
- jca.retry.backoff - коэффициент увеличения интервала между попытками чтения. Если начальное значение интервала (jca.retry.interval) равно одной секунде, а значение данного параметра, например, двум, то между попытками чтения будут следующие временные промежутки: 1, 2, 4, 8, 16, ..., в секундах.
- jca.retry.maxInterval - максимальное значение временного промежутка между попытками повторного чтения сообщений
Значения всех свойств прописываются с помощью тега property в xml-описании композита. Полное описание композита демонстрационного примера, в котором выставлены все значения описанных свойств, следующее:
Тестирование работы демонстрационного композита
Протестируем работу композита. Для этого развернем его на сервере SOA Suite. В Enterprise Manager откроем вкладку с информацией о композите JMSTransactionDemo и убедимся, что запущеных экземпляров данного композита нет.
В консоли управления WebLogic откроем вкладку Monitoring очереди, из которой осуществляется считывание. Убедимся, что сейчас в очереди нет непрочитанных сообщений.
Поместим сообщение в данную очередь (для этого можно написать простую программку на Java, либо создать отдельный композит, настроенный на работу с данной очередью). Сообщение будет считываться из очереди и периодически возвращаться в нее. Если обновлять вкладку Monitoring консоли управления WebLogic, то можно видеть, что несмотря на то, что сообщение считывается композитом JMSTransactionDemo оно отображается как непрочитанное.
В конце-концов сообщение будет зафиксировано как прочитанное. После этого можно открыть вкладку Instances в окне информации о композите JMSTransactionDemo в Enterprise Manager. Видно, что было запущено и отработало пять экземпляров композита. В настройках данного композита мы выставили значение параметра jca.retry.count равным четырем. Таким образом один экземпляр соответствует начальной попытке чтения сообщения, а другие четыре - повторным попыткам.
В окне трассировки демонстрационного композита видно, что было создано пять экземпляров BPEL-процесса DemoBPELProcess, при этом каждый экземпляр завершился неуспешно.
Ограничения, накладываемые на транзакционное чтение из очереди
В Oracle SOA Suite имеется ряд ограничений, накладываемых на транзакционное чтение из JMS-очереди. Рассмотрим их подробнее.
1. Транзакционное чтение работает только при условии работы JMS-адаптера в режиме XA-транзакций. Данный режим должен быть выставлен как в фабрике соединений, так и в зарегистрированном в адаптере пуле подключений.
2. При возникновении исключительной ситуации bindingFault выталкивания сообщения обратно в очередь не происходит. При этом, например, транзакции в базах данных откатываются нормально. Вероятно, это - ошибка Oracle, однако в версии 11.1.1.5 она не исправлена.
3. Иногда возникают странности, особенно при первоначальной настройке JMS-адаптера и пулов соединений. Странности заключаются в следующем: даже с правильными настройками сообщение не выталкивается обратно в очередь. Помогает перезапуск сервера SOA Suite.
При написании заметки использовалась официальная документация Oracle:
- Transaction and Fault Propagation Semantics in BPEL Processes;
- Using Fault Handling in a BPEL Process.
Понравилось сообщение - подпишитесь на блог
Этот комментарий был удален автором.
ОтветитьУдалить