Сейчас работаю над проектом интеграции вновь внедряемой централизованной бизнес-системы с другими системами, установленными в филиалах заказчика. При этом основная задача заключается в том, чтобы синхронизировать данные в филиалах и центре с помощью сервисной шины предприятия (ESB), реализованной на платформе Oracle Service Bus. Мы смогли обеспечить передачу с гарантированной доставкой боле 5 000 000 сообщений в сутки. В данной статье хочу рассказать о том, как это было сделано.
Нам необходимо передавать изменения данных из одной системы (системы-источника) в другую - систему-приемник. Данные должны передаваться сразу, а не с помощью пакетной обработки. Соответственно, нужно каким-то образом захватывать события об изменении данных и публиковать их для сервисной шины. Хорошо зарекомендовал себя следующий паттерн: при изменении данных пользователем с помощью триггера в БД системы-источника формируется событие, которое записывается в специальную таблицу. У события есть идентификатор, признак типа - какая именно сущность была создана или изменена, тип операции: создание, изменение или удаление, статус: новое, на обработке, обработанное успешно или обработанное с ошибкой. По идентификатору события мы с помощью соответствующего типу сущности представлению можем получить актуальные на момент обработки события данные. Сервисная шина последовательным опросом таблицы событий (по-другому это называется "полинг") считывает порцию измененных данных, затем для каждого считанного события обращается к соответствующему представлению и получает данные. Данные трансформируются в канонический формат шины, при этом на одну считанную запись может быть сформировано от нуля до пяти сообщений. Сообщения записываются в JMS-очередь, а записям в таблице событий проставляется статус "на обработке". Другой компонент адаптера - прокси-сервис, использующий транспорт JMS, считывает сообщения в каноническом формате из очереди, преобразует их в формат системы-приемника и вызывает веб-сервис, предоставляемый данной системой. Адаптер ждет окончания работы сервиса, после чего получает статус обработки сообщения и записывает его в специальную очередь ответов. Статус считывается шиной из очереди ответов и обновляется в таблице событий системы-источника.
На диаграмме UML данный процесс будет выглядеть следующим образом:
Стоит заметить, что с одной стороны события считываются из базы порциями, по 120 событий за один раз. Таким образом 5 000 000 событий в сутки это 41 600 транзакций. С другой стороны каждая запись раскладывается на от нуля до пяти сообщений, в среднем примем два. Каждое такое сообщение должно быть передано в систему-приемник, получается 10 000 000 транзакций. После этого для каждого события необходимо обновить статус - 5 000 000 транзакций. При этом все транзакции распределенные, т.к. в них участвует по меньшей мере два ресурса: база - очередь или очередь - очередь. Итого получается, что производительность системы должна быть не меньше 180 транзакций в секунду. Это конечно не то, что пафосно называют словом HighLoad, но уже возможно всего в одном - двух порядках от него.
Сразу после подключения системы-источника интеграция толком не заработала. Скорость передачи данных составила около 2-3 событий в секунду, т.е. примерно 200 000 в сутки. Нас такая производительность не устраивала. Плюс к тому же сразу началось забивание источников данных на сервере приложений WebLogic, которое проявляется в том, что пул соединений переходит в состояние Overloaded, а в логах появляются записи о том, что невозможно выделить соединение. К тому же, т.к. мы используем XA-транзакции, то у нас начали срабатывать таймауты транзакций, т.е. наши транзакции считывания длились более трех минут.
Анализ показал, что проблема в работе интеграционного слоя системы-источника. Во-первых, за время с момента включения интеграционного слоя в таблице событий накопилось несколько миллионов записей. Во-вторых, некоторые представления, возвращающие данные, работали очень медленно. Сложность заключалась в том, что на каких-то данных они работали нормально - запись возвращалась за 500 мс, а на каких-то - очень медленно и запись не возвращалась за отведенное для транзакции время.
Первую проблему мы решили следующим образом. Был определен еще один статус для событий: "готов к обработке". События переводятся из статуса "Новое" в статус "Готов к обработке" небольшими порциями - по 50 000 событий. Шина считывает записи не в статусе "Новая", а в статусе "Готов к обработке".
Вторая проблема решилась оптимизацией запросов. Оптимизация запросов к базе данных - это отдельная инженерная дисциплина, в которой есть свои практики и интересные решения. Не буду в данной статье подробно останавливаться на рассмотрении тонкостей выполненной оптимизации, скажу лишь, что после обновления представлений выборка данных по одному событию стала занимать 300 - 400 мс.
Оптимизация числа считывателей
На момент запуска интеграции у нас был один физический сервер, на котором было развернуто два экземпляра сервера приложений WebLogic. Соответственно получается, что у нас было два считывателя из базы данных. Если у нас выборка одной записи занимает 300 мс, плюс расходы на полинг таблицы событий, плюс расходы на трансформацию, плюс расходы на управление транзакциями и запись в JMS, то получается, что порцию из 120 записей мы считываем и записываем в JMS-очередь за 40-60 секунд. Если даже считать, что после записи в очередь события обрабатываются мгновенно, то итоговая производительность шины будет составлять примерно 2 * 120 / 60 - 4 сообщения в секунду или 340 000 сообщений в сутки. Данной производительности недостаточно, т.к. система-источник выгружает в сутки в разы больше записей. Соответственно необходимо реализовать параллельный процесс считывания записей из БД.
У DB Adapter'а Oracle Service Bus есть настройка activationInstances, которая отвечает за то, сколько экземпляров адаптера будет создано. Соответственно, чтобы распараллелить процесс полинга, необходимо в настройках прокси-сервиса, построенного по адаптеру, выставить данное значение. Опытным путем было подобрано значение 8. Таким образом производительность считывания будет составлять примерно 8 * 2 * 120/60 - 32 сообщения в секунду или 2 700 000 сообщений в сутки.
Оптимизация RouterRuntimeCache
Кэш скомпилированных прокси-сервисов в Oracle Service Bus состоит из двух частей - статической, т.е. не собираемой сборщиком мусора, и динамической - соответственно, собираемой. По-умолчанию размер статической части кэша равен 100 прокси-сервисам. У нас на шине помимо интеграции с рассматриваемой системой-источником, развернуто еще порядка 50 адаптеров, соответственно кэш в 100 сервисов нас не устраивает. Изменить поведение по-умолчанию можно с помощью настройки
-Dcom.bea.wli.sb.pipeline.RouterRuntimeCache.size=3000
Добавить данную настройку можно в определение переменной окружения EXTRA_JAVA_PROPERTIES в файле setDomainEnv.sh/cmd.
Оптимизация JVM
После проведения всех оптимизаций на графике работы сборщика мусора стало видно, что по сути сборщик мусора работает лишь в небольшом диапазоне буквально в несколько сотен мегабайт. Т.е. куча наполняется, очищается полной сборкой мусора на 800 - 1000 Мб и постепенно снова наполняется. Чтобы сделать запуски сборки мусора более редкими было принято решение увеличить размер кучи, а чтобы мусор в большой куче собирался быстро - включить параллельную сборку в Old Gen. В итоге сервера шины были запущены со следующими настройками:
-Xms16384m -Xmx16384m -Xmn6144m -XX:+AggressiveOpts -Xnoclassgc -XX:ReservedCodeCacheSize=128m -XX:+ParallelRefProcEnabled -XX:+DisableExplicitGC -XX:+UseParallelOldGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:servers/osb_server1/logs/gc.log
С такими настройками полная сборка мусора занимает 1.5 - 2 секунды и выполняется раз в 6 - 8 часов.
Передача данных в систему-приемник осуществляется после их считывания из JMS-очереди. Считывание осуществляется в несколько потоков. Количество потоков задается в настройках WorkManager'а. Для каждого прокси-сервиса, осуществляющего считывание из JMS настраивается свой WorkManager.
Число потоков было выставлено равным 24-м. Таким образом каждый экземпляр сервера приложений передавал сообщения в 24 потока или суммарно в систему-приемник сообщения передавались в 48 потоков. Интеграционный слой системы-приемника представляет собой кластер из 12-ти экземпляров сервера приложений WebLogic, развернутый на двух физических серверах. Особенностью масштабирования системы-приемника является то, что она может работать только на 32-х битной JVM, соответственно размер кучи для одного экземпляра сервера приложений ограничен двумя гигабайтами. Именно поэтому требуется так много экземпляров сервера приложений.
После запуска интеграции обнаружилось, что база данных системы-приемника сильно нагружает дисковую подсистему. После создания необходимых индексов и оптимизации запросов интеграционный слой заработал с требуемой производительностью.
Оптимизация настроек сервисной шины и интегрируемых систем позволила добиться требуемой производительности. Однако вскоре выяснилось, что узким местом стала дисковая подсистема сервера, на котором развернута шина. Анализ показал, что диски загружены на 90%, что довольно рискованно. Было принято решение добавить в кластер шины еще один физический сервер, а на существующем уменьшить нагрузку. Таким образом был создан домен Oracle Service Bus с четырьмя экземплярами серверов приложений под JMS, четырьмя серверами OSB и административным сервером. По два экземпляра серверов JMS и серверов OSB были размещены на одном физическом сервере и по два - на другом. Управление серверами обеспечивается с помощью Node Manager'ов - по одному для каждой машины.
В настройках каждого экземпляра OSB-сервера были сделаны следующие изменения. Во-первых, уменьшено число считывателей из БД (значение параметра activationInstances) до четырех. Таким образом всего на кластере развернуто 16 считывателей. Во-вторых, число потоков передачи данных в систему-приемник уменьшено до 16-ти. Таким образом всего данные в систему-приемник передаются в 64 потока.
Стоит отметить, что используемая архитектура интеграции - считывание сообщений с помощью DB Adapter'а - снижает горизонтальную масштабуемость системы, т.к. в системе-источнике могут быть наложены ограничения на число одновременных подключений интеграционного пользователя к БД.
Сейчас шина развернута на двух серверах со следующими характеристиками:
- Процессор: Intel(R) Xeon(R) E5649 2.53GHz, 12 ядер физически, 24 благодаря Hyper-Threading;
- ОЗУ: 128 Гб.
На основании нашей истории можно сделать некоторые выводы. Во-первых, стоит сказать, что процесс настройки и масштабирования интеграционного решения является итерационным: мы настраиваем систему в одном месте, после этого обнаруживается новая проблема, решаем данную проблему и до тех пор пока система не станет работать стабильно с требуемой производительностью. Во-вторых, узкими местами зачастую оказываются сначала интегрируемые системы, а только затем сама шина. Только после того, как обеспечена быстрая и надежная работа слоя интеграции подключенных систем можно разбираться с шиной. Другое дело, что до тех пор, пока шина не заработает в полную силу, проблемы в подключенных системах будут невидны, поэтому процесс настройки и является итерационным. В-третьих, нельзя не отметить отличную масштабируемость Oracle Service Bus, по сути нам удалось в два раза повысить производительность системы, просто разнеся ее по двум физическим машинам.
Буду рад, если описанная история поможет вам в запуске ваших интеграционных решений. Вопросы как обычно можно задавать в комментариях.
Понравилось сообщение - подпишитесь на блог и Twitter
Постановка задачи
Нам необходимо передавать изменения данных из одной системы (системы-источника) в другую - систему-приемник. Данные должны передаваться сразу, а не с помощью пакетной обработки. Соответственно, нужно каким-то образом захватывать события об изменении данных и публиковать их для сервисной шины. Хорошо зарекомендовал себя следующий паттерн: при изменении данных пользователем с помощью триггера в БД системы-источника формируется событие, которое записывается в специальную таблицу. У события есть идентификатор, признак типа - какая именно сущность была создана или изменена, тип операции: создание, изменение или удаление, статус: новое, на обработке, обработанное успешно или обработанное с ошибкой. По идентификатору события мы с помощью соответствующего типу сущности представлению можем получить актуальные на момент обработки события данные. Сервисная шина последовательным опросом таблицы событий (по-другому это называется "полинг") считывает порцию измененных данных, затем для каждого считанного события обращается к соответствующему представлению и получает данные. Данные трансформируются в канонический формат шины, при этом на одну считанную запись может быть сформировано от нуля до пяти сообщений. Сообщения записываются в JMS-очередь, а записям в таблице событий проставляется статус "на обработке". Другой компонент адаптера - прокси-сервис, использующий транспорт JMS, считывает сообщения в каноническом формате из очереди, преобразует их в формат системы-приемника и вызывает веб-сервис, предоставляемый данной системой. Адаптер ждет окончания работы сервиса, после чего получает статус обработки сообщения и записывает его в специальную очередь ответов. Статус считывается шиной из очереди ответов и обновляется в таблице событий системы-источника.
На диаграмме UML данный процесс будет выглядеть следующим образом:
Стоит заметить, что с одной стороны события считываются из базы порциями, по 120 событий за один раз. Таким образом 5 000 000 событий в сутки это 41 600 транзакций. С другой стороны каждая запись раскладывается на от нуля до пяти сообщений, в среднем примем два. Каждое такое сообщение должно быть передано в систему-приемник, получается 10 000 000 транзакций. После этого для каждого события необходимо обновить статус - 5 000 000 транзакций. При этом все транзакции распределенные, т.к. в них участвует по меньшей мере два ресурса: база - очередь или очередь - очередь. Итого получается, что производительность системы должна быть не меньше 180 транзакций в секунду. Это конечно не то, что пафосно называют словом HighLoad, но уже возможно всего в одном - двух порядках от него.
Начало пути
Сразу после подключения системы-источника интеграция толком не заработала. Скорость передачи данных составила около 2-3 событий в секунду, т.е. примерно 200 000 в сутки. Нас такая производительность не устраивала. Плюс к тому же сразу началось забивание источников данных на сервере приложений WebLogic, которое проявляется в том, что пул соединений переходит в состояние Overloaded, а в логах появляются записи о том, что невозможно выделить соединение. К тому же, т.к. мы используем XA-транзакции, то у нас начали срабатывать таймауты транзакций, т.е. наши транзакции считывания длились более трех минут.
Анализ показал, что проблема в работе интеграционного слоя системы-источника. Во-первых, за время с момента включения интеграционного слоя в таблице событий накопилось несколько миллионов записей. Во-вторых, некоторые представления, возвращающие данные, работали очень медленно. Сложность заключалась в том, что на каких-то данных они работали нормально - запись возвращалась за 500 мс, а на каких-то - очень медленно и запись не возвращалась за отведенное для транзакции время.
Первую проблему мы решили следующим образом. Был определен еще один статус для событий: "готов к обработке". События переводятся из статуса "Новое" в статус "Готов к обработке" небольшими порциями - по 50 000 событий. Шина считывает записи не в статусе "Новая", а в статусе "Готов к обработке".
Вторая проблема решилась оптимизацией запросов. Оптимизация запросов к базе данных - это отдельная инженерная дисциплина, в которой есть свои практики и интересные решения. Не буду в данной статье подробно останавливаться на рассмотрении тонкостей выполненной оптимизации, скажу лишь, что после обновления представлений выборка данных по одному событию стала занимать 300 - 400 мс.
Настройка сервисной шины
Оптимизация числа считывателей
На момент запуска интеграции у нас был один физический сервер, на котором было развернуто два экземпляра сервера приложений WebLogic. Соответственно получается, что у нас было два считывателя из базы данных. Если у нас выборка одной записи занимает 300 мс, плюс расходы на полинг таблицы событий, плюс расходы на трансформацию, плюс расходы на управление транзакциями и запись в JMS, то получается, что порцию из 120 записей мы считываем и записываем в JMS-очередь за 40-60 секунд. Если даже считать, что после записи в очередь события обрабатываются мгновенно, то итоговая производительность шины будет составлять примерно 2 * 120 / 60 - 4 сообщения в секунду или 340 000 сообщений в сутки. Данной производительности недостаточно, т.к. система-источник выгружает в сутки в разы больше записей. Соответственно необходимо реализовать параллельный процесс считывания записей из БД.
У DB Adapter'а Oracle Service Bus есть настройка activationInstances, которая отвечает за то, сколько экземпляров адаптера будет создано. Соответственно, чтобы распараллелить процесс полинга, необходимо в настройках прокси-сервиса, построенного по адаптеру, выставить данное значение. Опытным путем было подобрано значение 8. Таким образом производительность считывания будет составлять примерно 8 * 2 * 120/60 - 32 сообщения в секунду или 2 700 000 сообщений в сутки.
Оптимизация RouterRuntimeCache
Кэш скомпилированных прокси-сервисов в Oracle Service Bus состоит из двух частей - статической, т.е. не собираемой сборщиком мусора, и динамической - соответственно, собираемой. По-умолчанию размер статической части кэша равен 100 прокси-сервисам. У нас на шине помимо интеграции с рассматриваемой системой-источником, развернуто еще порядка 50 адаптеров, соответственно кэш в 100 сервисов нас не устраивает. Изменить поведение по-умолчанию можно с помощью настройки
-Dcom.bea.wli.sb.pipeline.RouterRuntimeCache.size=3000
Добавить данную настройку можно в определение переменной окружения EXTRA_JAVA_PROPERTIES в файле setDomainEnv.sh/cmd.
Оптимизация JVM
После проведения всех оптимизаций на графике работы сборщика мусора стало видно, что по сути сборщик мусора работает лишь в небольшом диапазоне буквально в несколько сотен мегабайт. Т.е. куча наполняется, очищается полной сборкой мусора на 800 - 1000 Мб и постепенно снова наполняется. Чтобы сделать запуски сборки мусора более редкими было принято решение увеличить размер кучи, а чтобы мусор в большой куче собирался быстро - включить параллельную сборку в Old Gen. В итоге сервера шины были запущены со следующими настройками:
-Xms16384m -Xmx16384m -Xmn6144m -XX:+AggressiveOpts -Xnoclassgc -XX:ReservedCodeCacheSize=128m -XX:+ParallelRefProcEnabled -XX:+DisableExplicitGC -XX:+UseParallelOldGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:servers/osb_server1/logs/gc.log
С такими настройками полная сборка мусора занимает 1.5 - 2 секунды и выполняется раз в 6 - 8 часов.
Оптимизация передачи данных в систему-приемник
Передача данных в систему-приемник осуществляется после их считывания из JMS-очереди. Считывание осуществляется в несколько потоков. Количество потоков задается в настройках WorkManager'а. Для каждого прокси-сервиса, осуществляющего считывание из JMS настраивается свой WorkManager.
Число потоков было выставлено равным 24-м. Таким образом каждый экземпляр сервера приложений передавал сообщения в 24 потока или суммарно в систему-приемник сообщения передавались в 48 потоков. Интеграционный слой системы-приемника представляет собой кластер из 12-ти экземпляров сервера приложений WebLogic, развернутый на двух физических серверах. Особенностью масштабирования системы-приемника является то, что она может работать только на 32-х битной JVM, соответственно размер кучи для одного экземпляра сервера приложений ограничен двумя гигабайтами. Именно поэтому требуется так много экземпляров сервера приложений.
После запуска интеграции обнаружилось, что база данных системы-приемника сильно нагружает дисковую подсистему. После создания необходимых индексов и оптимизации запросов интеграционный слой заработал с требуемой производительностью.
Масштабирование сервисной шины
Оптимизация настроек сервисной шины и интегрируемых систем позволила добиться требуемой производительности. Однако вскоре выяснилось, что узким местом стала дисковая подсистема сервера, на котором развернута шина. Анализ показал, что диски загружены на 90%, что довольно рискованно. Было принято решение добавить в кластер шины еще один физический сервер, а на существующем уменьшить нагрузку. Таким образом был создан домен Oracle Service Bus с четырьмя экземплярами серверов приложений под JMS, четырьмя серверами OSB и административным сервером. По два экземпляра серверов JMS и серверов OSB были размещены на одном физическом сервере и по два - на другом. Управление серверами обеспечивается с помощью Node Manager'ов - по одному для каждой машины.
В настройках каждого экземпляра OSB-сервера были сделаны следующие изменения. Во-первых, уменьшено число считывателей из БД (значение параметра activationInstances) до четырех. Таким образом всего на кластере развернуто 16 считывателей. Во-вторых, число потоков передачи данных в систему-приемник уменьшено до 16-ти. Таким образом всего данные в систему-приемник передаются в 64 потока.
Стоит отметить, что используемая архитектура интеграции - считывание сообщений с помощью DB Adapter'а - снижает горизонтальную масштабуемость системы, т.к. в системе-источнике могут быть наложены ограничения на число одновременных подключений интеграционного пользователя к БД.
Аппаратная конфигурация
Сейчас шина развернута на двух серверах со следующими характеристиками:
- Процессор: Intel(R) Xeon(R) E5649 2.53GHz, 12 ядер физически, 24 благодаря Hyper-Threading;
- ОЗУ: 128 Гб.
Выводы
На основании нашей истории можно сделать некоторые выводы. Во-первых, стоит сказать, что процесс настройки и масштабирования интеграционного решения является итерационным: мы настраиваем систему в одном месте, после этого обнаруживается новая проблема, решаем данную проблему и до тех пор пока система не станет работать стабильно с требуемой производительностью. Во-вторых, узкими местами зачастую оказываются сначала интегрируемые системы, а только затем сама шина. Только после того, как обеспечена быстрая и надежная работа слоя интеграции подключенных систем можно разбираться с шиной. Другое дело, что до тех пор, пока шина не заработает в полную силу, проблемы в подключенных системах будут невидны, поэтому процесс настройки и является итерационным. В-третьих, нельзя не отметить отличную масштабируемость Oracle Service Bus, по сути нам удалось в два раза повысить производительность системы, просто разнеся ее по двум физическим машинам.
Буду рад, если описанная история поможет вам в запуске ваших интеграционных решений. Вопросы как обычно можно задавать в комментариях.
Понравилось сообщение - подпишитесь на блог и Twitter
Добрый день, Павел.
ОтветитьУдалитьЧто собой представляют канонические типы данных в проекте?
Т.е., описывают ли они сущности доменной области проекта или Вами специфицированы некие общие типы для ESB без привязки к предметной области для обеспечения выполнения технических задач, к примеру, что-то типа конверта входящего сообщения для передачи по JMS?
Здравствуйте, в нашем случае это именно обобщенные сущности предметной области. Например "абонент", "услуга абонента", "элемент справочника домов" и т.д.
ОтветитьУдалитьЕсли каноническая модель разработана специально для проекта, то каковы преимущества её использования в реальных условиях по сравнению с прямой трансформацией входящих сообщений в исходящие? Если я правильно понял, то спецификации веб-сервисов, клиенты которых реализуются на шине, предоставляются со стороны системы-получателя. Команда шины не управляет их изменениями. Типы канонической модели связаны с типами сообщений запросов. Изменения в этих типах могут привести к необходимости не только обновления клиентов веб-сервисов, но и типов модели. Ну, и влияние двойного преобразования на производительность опять же :)
ОтветитьУдалитьУ нас реализована не только интеграция вида N -> 1, т.е. когда разнообразные системы-источники передают все данные в одну систему-приемник, но и интеграция между центральной системой и периферией, в таком случае сервисы выставляет уже шина. К тому же сейчас начинаем разрабатывать интеграцию систем на периферии друг с другом и внедрять новые центральные системы. В общем у нас постепенно выращивается настоящая сервисная шина, обеспечивающая интеграцию "многие-ко-многим". К тому же наша каноническая модель данных не дублирует один в один модель одной из систем, хотя во многом на нее и опирается. Это добавляет гибкость в разработке.
ОтветитьУдалитьЕсли узким местом в нашей интеграции станет трансформация, то это будет обозначать, что производительность систем возросла многократно и они стали генерировать слишком много трафика. Это будет круто, но увы, реалии показывают, что основная нагрузка приходится все же на считывание и запись сообщений.
Не смогла понять из изложенной в посте схемы - а что случается при рассинхронизации данных, если, например, БД одной системы восстановили из бэкапа некоторой данности?
ОтветитьУдалитьОперативные данные, такие как баланс пользователя мы подгружаем синхронно с помощью веб-сервисов. В таких случаях проблем не будет. Если из бэкапа восстановили источник, то там операторам придется срочно вносить изменения и к нам польется поток данных. Впрочем у нас разработана защита от дублей. Система-приемник же резервируется, таких падений, чтобы было нужно восстановление из бэкапа еще не было, но в принципе и данный вопрос можно решить домиграцией данных.
ОтветитьУдалить