вторник, 20 августа 2013 г.

Как мы обеспечили передачу с гарантированной доставкой 5 000 000 сообщений в сутки с помощью Oracle Service Bus

Сейчас работаю над проектом интеграции вновь внедряемой централизованной бизнес-системы с другими системами, установленными в филиалах заказчика. При этом основная задача заключается в том, чтобы синхронизировать данные в филиалах и центре с помощью сервисной шины предприятия (ESB), реализованной на платформе Oracle Service Bus. Мы смогли обеспечить передачу с гарантированной доставкой боле 5 000 000 сообщений в сутки. В данной статье хочу рассказать о том, как это было сделано.


Постановка задачи


Нам необходимо передавать изменения данных из одной системы (системы-источника) в другую - систему-приемник. Данные должны передаваться сразу, а не с помощью пакетной обработки. Соответственно, нужно каким-то образом захватывать события об изменении данных и публиковать их для сервисной шины. Хорошо зарекомендовал себя следующий паттерн: при изменении данных пользователем с помощью триггера в БД системы-источника формируется событие, которое записывается в специальную таблицу. У события есть идентификатор, признак типа - какая именно сущность была создана или изменена, тип операции: создание, изменение или удаление, статус: новое, на обработке, обработанное успешно или обработанное с ошибкой. По идентификатору события мы с помощью соответствующего типу сущности представлению можем получить актуальные на момент обработки события данные. Сервисная шина последовательным опросом таблицы событий (по-другому это называется "полинг") считывает порцию измененных данных, затем для каждого считанного события обращается к соответствующему представлению и получает данные. Данные трансформируются в канонический формат шины, при этом на одну считанную запись может быть сформировано от нуля до пяти сообщений. Сообщения записываются в JMS-очередь, а записям в таблице событий проставляется статус "на обработке". Другой компонент адаптера - прокси-сервис, использующий транспорт JMS, считывает сообщения в каноническом формате из очереди, преобразует их в формат системы-приемника и вызывает веб-сервис, предоставляемый данной системой. Адаптер ждет окончания работы сервиса, после чего получает статус обработки сообщения и записывает его в специальную очередь ответов. Статус считывается шиной из очереди ответов и обновляется в таблице событий системы-источника.

На диаграмме UML данный процесс будет выглядеть следующим образом:


Стоит заметить, что с одной стороны события считываются из базы порциями, по 120 событий за один раз. Таким образом 5 000 000 событий в сутки это 41 600 транзакций. С другой стороны каждая запись раскладывается на от нуля до пяти сообщений, в среднем примем два. Каждое такое сообщение должно быть передано в систему-приемник, получается 10 000 000 транзакций. После этого для каждого события необходимо обновить статус - 5 000 000 транзакций. При этом все транзакции распределенные, т.к. в них участвует по меньшей мере два ресурса: база - очередь или очередь - очередь. Итого получается, что производительность системы должна быть не меньше 180 транзакций в секунду. Это конечно не то, что пафосно называют словом HighLoad, но уже возможно всего в одном - двух порядках от него.


Начало пути


Сразу после подключения системы-источника интеграция толком не заработала. Скорость передачи данных составила около 2-3 событий в секунду, т.е. примерно 200 000 в сутки. Нас такая производительность не устраивала. Плюс к тому же сразу началось забивание источников данных на сервере приложений WebLogic, которое проявляется в том, что пул соединений переходит в состояние Overloaded, а в логах появляются записи о том, что невозможно выделить соединение. К тому же, т.к. мы используем XA-транзакции, то у нас начали срабатывать таймауты транзакций, т.е. наши транзакции считывания длились более трех минут.

Анализ показал, что проблема в работе интеграционного слоя системы-источника. Во-первых, за время с момента включения интеграционного слоя в таблице событий накопилось несколько миллионов записей. Во-вторых, некоторые представления, возвращающие данные, работали очень медленно. Сложность заключалась в том, что на каких-то данных они работали нормально - запись возвращалась за 500 мс, а на каких-то - очень медленно и запись не возвращалась за отведенное для транзакции время.

Первую проблему мы решили следующим образом. Был определен еще один статус для событий: "готов к обработке". События переводятся из статуса "Новое" в статус "Готов к обработке" небольшими порциями - по 50 000 событий. Шина считывает записи не в статусе "Новая", а в статусе "Готов к обработке".

Вторая проблема решилась оптимизацией запросов. Оптимизация запросов к базе данных - это отдельная инженерная дисциплина, в которой есть свои практики и интересные решения. Не буду в данной статье подробно останавливаться на рассмотрении тонкостей выполненной оптимизации, скажу лишь, что после обновления представлений выборка данных по одному событию стала занимать 300 - 400 мс.

Настройка сервисной шины


Оптимизация числа считывателей

На момент запуска интеграции у нас был один физический сервер, на котором было развернуто два экземпляра сервера приложений WebLogic. Соответственно получается, что у нас было два считывателя из базы данных. Если у нас выборка одной записи занимает 300 мс, плюс расходы на полинг таблицы событий, плюс расходы на трансформацию, плюс расходы на управление транзакциями и запись в JMS, то получается, что порцию из 120 записей мы считываем и записываем в JMS-очередь за 40-60 секунд. Если даже считать, что после записи в очередь события обрабатываются мгновенно, то итоговая производительность шины будет составлять примерно 2 * 120 / 60 - 4 сообщения в секунду или 340 000 сообщений в сутки. Данной производительности недостаточно, т.к. система-источник выгружает в сутки в разы больше записей. Соответственно необходимо реализовать параллельный процесс считывания записей из БД.

У DB Adapter'а Oracle Service Bus есть настройка activationInstances, которая отвечает за то, сколько экземпляров адаптера будет создано. Соответственно, чтобы распараллелить процесс полинга, необходимо в настройках прокси-сервиса, построенного по адаптеру, выставить данное значение. Опытным путем было подобрано значение 8. Таким образом производительность считывания будет составлять примерно 8 * 2 * 120/60 - 32 сообщения в секунду или 2 700 000 сообщений в сутки.


Оптимизация RouterRuntimeCache

Кэш скомпилированных прокси-сервисов в Oracle Service Bus состоит из двух частей - статической, т.е. не собираемой сборщиком мусора, и динамической - соответственно, собираемой. По-умолчанию размер статической части кэша равен 100 прокси-сервисам. У нас на шине помимо интеграции с рассматриваемой системой-источником, развернуто еще порядка 50 адаптеров, соответственно кэш в 100 сервисов нас не устраивает. Изменить поведение по-умолчанию можно с помощью настройки

-Dcom.bea.wli.sb.pipeline.RouterRuntimeCache.size=3000

Добавить данную настройку можно в определение переменной окружения EXTRA_JAVA_PROPERTIES в файле setDomainEnv.sh/cmd.

Оптимизация JVM

После проведения всех оптимизаций на графике работы сборщика мусора стало видно, что по сути сборщик мусора работает лишь в небольшом диапазоне буквально в несколько сотен мегабайт. Т.е. куча наполняется, очищается полной сборкой мусора на 800 - 1000 Мб и постепенно снова наполняется. Чтобы сделать запуски сборки мусора более редкими было принято решение увеличить размер кучи, а чтобы мусор в большой куче собирался быстро - включить параллельную сборку в Old Gen. В итоге сервера шины были запущены со следующими настройками:


-Xms16384m -Xmx16384m -Xmn6144m -XX:+AggressiveOpts -Xnoclassgc -XX:ReservedCodeCacheSize=128m -XX:+ParallelRefProcEnabled -XX:+DisableExplicitGC -XX:+UseParallelOldGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:servers/osb_server1/logs/gc.log


С такими настройками полная сборка мусора занимает 1.5 - 2 секунды и выполняется раз в 6 - 8 часов.

Оптимизация передачи данных в систему-приемник


Передача данных в систему-приемник осуществляется после их считывания из JMS-очереди. Считывание осуществляется в несколько потоков. Количество потоков задается в настройках WorkManager'а. Для каждого прокси-сервиса, осуществляющего считывание из JMS настраивается свой WorkManager.


Число потоков было выставлено равным 24-м. Таким образом каждый экземпляр сервера приложений передавал сообщения в 24 потока или суммарно в систему-приемник сообщения передавались в 48 потоков. Интеграционный слой системы-приемника представляет собой кластер из 12-ти экземпляров сервера приложений WebLogic, развернутый на двух физических серверах. Особенностью масштабирования системы-приемника является то, что она может работать только на 32-х битной JVM, соответственно размер кучи для одного экземпляра сервера приложений ограничен двумя гигабайтами. Именно поэтому требуется так много экземпляров сервера приложений.

После запуска интеграции обнаружилось, что база данных системы-приемника сильно нагружает дисковую подсистему. После создания необходимых индексов и оптимизации запросов интеграционный слой заработал с требуемой производительностью.

Масштабирование сервисной шины


Оптимизация настроек сервисной шины и интегрируемых систем позволила добиться требуемой производительности. Однако вскоре выяснилось, что узким местом стала дисковая подсистема сервера, на котором развернута шина. Анализ показал, что диски загружены на 90%, что довольно рискованно. Было принято решение добавить в кластер шины еще один физический сервер, а на существующем уменьшить нагрузку. Таким образом был создан домен Oracle Service Bus с четырьмя экземплярами серверов приложений под JMS, четырьмя серверами OSB и административным сервером. По два экземпляра серверов JMS и серверов OSB были размещены на одном физическом сервере и по два - на другом. Управление серверами обеспечивается с помощью Node Manager'ов - по одному для каждой машины.


В настройках каждого экземпляра OSB-сервера были сделаны следующие изменения. Во-первых, уменьшено число считывателей из БД (значение параметра activationInstances) до четырех. Таким образом всего на кластере развернуто 16 считывателей. Во-вторых, число потоков передачи данных в систему-приемник уменьшено до 16-ти. Таким образом всего данные в систему-приемник передаются в 64 потока.

Стоит отметить, что используемая архитектура интеграции - считывание сообщений с помощью DB Adapter'а - снижает горизонтальную масштабуемость системы, т.к. в системе-источнике могут быть наложены ограничения на число одновременных подключений интеграционного пользователя к БД.

Аппаратная конфигурация


Сейчас шина развернута на двух серверах со следующими характеристиками:

- Процессор: Intel(R) Xeon(R) E5649 2.53GHz, 12 ядер физически, 24 благодаря Hyper-Threading;
- ОЗУ: 128 Гб.

Выводы


На основании нашей истории можно сделать некоторые выводы. Во-первых, стоит сказать, что процесс настройки и масштабирования интеграционного решения является итерационным: мы настраиваем систему в одном месте, после этого обнаруживается новая проблема, решаем данную проблему и до тех пор пока система не станет работать стабильно с требуемой производительностью. Во-вторых, узкими местами зачастую оказываются сначала интегрируемые системы, а только затем сама шина. Только после того, как обеспечена быстрая и надежная работа слоя интеграции подключенных систем можно разбираться с шиной. Другое дело, что до тех пор, пока шина не заработает в полную силу, проблемы в подключенных системах будут невидны, поэтому процесс настройки и является итерационным. В-третьих, нельзя не отметить отличную масштабируемость Oracle Service Bus, по сути нам удалось в два раза повысить производительность системы, просто разнеся ее по двум физическим машинам.

Буду рад, если описанная история поможет вам в запуске ваших интеграционных решений. Вопросы как обычно можно задавать в комментариях.

Понравилось сообщение - подпишитесь на блог и Twitter

6 комментариев:

Unknown комментирует...

Добрый день, Павел.

Что собой представляют канонические типы данных в проекте?
Т.е., описывают ли они сущности доменной области проекта или Вами специфицированы некие общие типы для ESB без привязки к предметной области для обеспечения выполнения технических задач, к примеру, что-то типа конверта входящего сообщения для передачи по JMS?

Unknown комментирует...

Здравствуйте, в нашем случае это именно обобщенные сущности предметной области. Например "абонент", "услуга абонента", "элемент справочника домов" и т.д.

Unknown комментирует...

Если каноническая модель разработана специально для проекта, то каковы преимущества её использования в реальных условиях по сравнению с прямой трансформацией входящих сообщений в исходящие? Если я правильно понял, то спецификации веб-сервисов, клиенты которых реализуются на шине, предоставляются со стороны системы-получателя. Команда шины не управляет их изменениями. Типы канонической модели связаны с типами сообщений запросов. Изменения в этих типах могут привести к необходимости не только обновления клиентов веб-сервисов, но и типов модели. Ну, и влияние двойного преобразования на производительность опять же :)

Unknown комментирует...

У нас реализована не только интеграция вида N -> 1, т.е. когда разнообразные системы-источники передают все данные в одну систему-приемник, но и интеграция между центральной системой и периферией, в таком случае сервисы выставляет уже шина. К тому же сейчас начинаем разрабатывать интеграцию систем на периферии друг с другом и внедрять новые центральные системы. В общем у нас постепенно выращивается настоящая сервисная шина, обеспечивающая интеграцию "многие-ко-многим". К тому же наша каноническая модель данных не дублирует один в один модель одной из систем, хотя во многом на нее и опирается. Это добавляет гибкость в разработке.

Если узким местом в нашей интеграции станет трансформация, то это будет обозначать, что производительность систем возросла многократно и они стали генерировать слишком много трафика. Это будет круто, но увы, реалии показывают, что основная нагрузка приходится все же на считывание и запись сообщений.

Leeho комментирует...

Не смогла понять из изложенной в посте схемы - а что случается при рассинхронизации данных, если, например, БД одной системы восстановили из бэкапа некоторой данности?

Unknown комментирует...

Оперативные данные, такие как баланс пользователя мы подгружаем синхронно с помощью веб-сервисов. В таких случаях проблем не будет. Если из бэкапа восстановили источник, то там операторам придется срочно вносить изменения и к нам польется поток данных. Впрочем у нас разработана защита от дублей. Система-приемник же резервируется, таких падений, чтобы было нужно восстановление из бэкапа еще не было, но в принципе и данный вопрос можно решить домиграцией данных.

Отправить комментарий

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!