суббота, 17 апреля 2010 г.

Параллельный Reduce: математические основы и пример реализации на Java


В данной статье рассмотрены понятия свертки и ссылочного гомоморфизма, их взаимосвязь, приведен пример построения многопоточного Reduce и сравнение его скорости работы с однопоточным вариантом на примере анализа данных с сервера РТС. В дальнейшем планирую развить данную работу и реализовать MapReduce с помощью R-OSGi.



Введение


Предложенная учеными из лаборатории компании Google концепция параллельных вычислений MapReduce [2] имеет важное прикладное и научное значение, что демонстрируется, в частности, ее успешным применением для построения поисковой системы Google. Во-первых, данная концепция может считаться универсальной: существует большое количество задач в области генерации и анализа данных, извлечения информации, машинного обучения и сортировки, успешно решаемых при помощи данной модели [2, 3]. Во-вторых, применение концепции MapReduce позволяет сделать прозрачными для разработчиков программного обеспечения такие механизмы, как параллелизация, динамическое распределение нагрузки по узлам кластера и обеспечение устойчивости к ошибкам передачи данных и выходу из строя оборудования [2]. В-третьих, т.к. MapReduce позволяет сделать прозрачным распараллеливание программы и распределенную обработку данных, то от программиста не требуется знаний и опыта, специфичного для разработки многопоточных и распределенных приложений [2].

Еще одним преимуществом концепции MapReduce является масштабируемость. Если в научных учреждениях проводятся эксперименты по использованию MapReduce на десятках машин [3], то у компании Google есть кластер, состоящий из тысяч машин [2].

Однако, чтобы эффективно использовать данную концепцию, необходимо понимать накладываемые ею ограничения и область ее применимости. Для этого необходимо знать математические понятия, лежащие в ее основе и их взаимосвязь между собой.

В данной статье мы рассмотрим понятия списочного гомоморфизма и свертки, разберем их взаимосвязь и вытекающие из этого свойства, которые можно применять для оптимизации алгоритмов. Затем, на примере приложения, обрабатывающего котировки акций, продемонстрируем построение работающей в несколько потоков операции Reduce и произведем сравнение времени исполнения данного варианта с однопоточным.

Операция свертки и списочный гомоморфизм


Рассмотрим два алгоритма. Первый - алгоритм, который вычисляет значение следующего выражения:

a1 # (a2 # (... (an # e)...)) (1)
где ai - i-й элемент списка a,
n - длина списка a,
e - некоторое начальное значение. Данное значение может быть единицей операции #, но это не обязательно.

В случае, если для списка не определена операция обращения к элементу по индексу, то реализовать такой алгоритм можно только рекурсивно.

Второй алгоритм вычисляет значение следующего выражения:

((...(e # a1)...) # an - 1) # an (2)

Первый алгоритм называется правой сверткой, а второй, соответственно, - левой сверткой. Из выражений (1) и (2) видно, что свертка имеет два параметра - e, являющийся значением свертки на нулевом шаге алгоритма и # - бинарная операция, выполняемая над элементами структуры и результатом свертки на предыдущем шаге алгоритма. Будем обозначать первый алгоритм как foldr # e, а второй - как foldl # e.

Применительно к программированию операция свертки позволяет отделить процедуру обхода сколь угодно сложной структуры данных от процедуры выполнения некоторой операции над элементами этой структуры. Такое разделение помогает снизить сложность как конкретной программы, так и процесса программирования в целом, потому что процедуру обхода (т.е. реализацию конкретной функции foldr или foldl) можно написать и отладить один раз, а затем только использовать, параметризируя конкретными значениями # и e.

Например, следующим образом можно вычислить длину и произведение элементов списка

len = foldr (+ 1) 0, где + 1 - функция аргументов x и y, прибавляющая единицу к y. Если foldr заменить на foldl, то результат операции будет тем же самым.

prod = foldr (*) 1 = foldl (*) 1

Найти максимальный элемент списка можно так:

max = foldr max -inf = foldl max -inf,
где -inf - минус бесконечность,
max a b - функция, возвращающая a, если a > b и b в противном случае.

Теперь рассмотрим понятие списочного гомоморфизма, которое лежит в основе концепции MapReduce.

Определение 1

Списочным гомоморфизмом называется функция h от результата конкатенации конечных списков, для которой существует бинарный оператор #, такой, что для всех списков x и y выполняется равенство

h (x++y) = h (x) # h (y) (3)
где ++ - операция конкатенации списков [4].

Примерами списочных гомоморфизмов являются следующие функции:

    Функция идентификации id, т.е. функция, которая возвращает свой аргумент.
    Функция map (определяется как map f), которая применяет функцию f к каждому элементу списка.
    Функция concat, которая преобразует список списков в длинный список, состоящий из элементов исходных списков.
    Функция head, которая возвращает первый элемент ("голову") списка.
    Функция length, которая возвращает длину списка.
    Функции sum, min, max и т.д., которые возвращают сумму элементов списка, минимальный и максимальный элементы [4].

Однако, существуют функции, которые не являются списочными гомоморфизмами, например, функция lsp, которая возвращает первую с начала списка наиболее длинную отсортированную последовательность [4] (например, для списка 1 2 3 1 2 3 4 5 она вернет 1 2 3 4 5).

Важно: из определения гомоморфизма следует, что операция # должна быть ассоциативной, поскольку операция ++ является ассоциативной. Операция h [] требует наличия единицы для операции # на области определения функции h, поскольку [] является единицей для операции ++ (здесь и далее [] обозначает пустой список). Если операция # не имеет единицы, то значение выражения h [] не определено, например значение выражения head [] не определено [4].

Обозначать гомоморфизм будем следующим образом: hom (#) f e, где # ассоциативная операция, f - функция, являющаяся суперпозицией функции h и функции построения списка из одного элемента (обозначается так: [.]). Т.е. f (a) = h ([a]) (a - атомарный элемент). e - единица операции #. Например,

sum = hom (+) id 0
length = hom (+) one 0, где one - функция, возвращающая единицу для всех элеменов списка [4].

Существует три теоремы о гомоморфизме, которые очень важны для понимания концепции MapReduce и области ее применимости. Первая теорема, фактически, вводит определение MapReduce, а вторая и третья показывают связь между гомоморфизмом и свертками. Рассмотрим эти теоремы подробнее, но для этого нам потребуется ввести следующие определения:

Определение 2

Функция, которая может быть представлена в форме hom (#) id e для некоторой операции # называется редукцией [4].

Определение 3

Для заданной функции f, функция hom (++) ([.]^f) [] записывается как map f и называется отображением [4]. В определении символ [.] обозначает функцию создания списка из одного элемента, а символ ^ обозначает суперпозицию (композицию) функций. Для атомарного элемента a функция [.]^f вернет [f (a)].

Теорема 1 (Первая теорема о гомоморфизме)

Каждый гомоморфизм может быть записан как композиция редукции и отображения:

hom (#) f e = hom (#) id e ^ map f

Верно и наоборот: каждая такая композиция является гомоморфизмом.

Теорема 2 (Вторая теорема о гомоморфизме)

Каждый гомоморфизм можно представить в виде и левой, и правой свертки. Это верно, потому что операция # ассоциативна.

hom (#) f e = foldr (@) e, где a @ s = f (a) # s
             = foldl (%) e, где r % a = r # f (a)

Две вышеизложенные теоремы отвечают на вопрос "Как вычислить гомоморфизм". Однако, перед тем как вычислить гомоморфизм, его необходимо распознать и/или построить. Ответ на вопрос "какая функция является гомоморфной" дает третья теорема о гомоморфизме.

Теорема 3 (Третья теорема о гомоморфизме)

Если функция h представима в виде и правой, и левой свертки с разными операциями @ и %, но с одинаковым начальным элементом свертки e, то функция h является гомоморфизмом.

Доказательство теоремы 3 приведено в работах [1] и [4]. В работе [4] приведен пример использования определения гомоморфизма для составления алгоритма сортировки "слиянием" (со сложностью O (n log n)) из алгоритма сортировки "вставкой" со сложностью (O (n2).

Частным случаем применения третьей теоремы о гомоморфизме является левая и правая свертки с одинаковой операцией (#). Для того, чтобы левая и правая свертки давали одинаковый результат, необходимо, чтобы выполнялось несколько условий:

1. Операция # должна получать и возвращать аргументы одного и того же типа.
2. Операция # должна быть ассоциативной.
3. Начальное значение e должно коммутировать с функцией #, т.е. e # x = x # e. В общем случае это возможно, если операция # коммутативна, однако коммутативность операции # не является необходимым условием [5].

Прикладное значение третьей теоремы о гомоморфизме следующее: если необходимо выполнить гомоморфную функцию h над списком x, то можно разделить список x на подсписки u и w, вычислить значение функции h от каждого подсписка параллельно и объединить эти значения с помощью операции #. При этом, т.к. операция от подсписка тоже является гомоморфизмом, то ее можно представить как суперпозицию функций отображения и редукции.

Пример: обработка котировок акций


Рассмотрим пример применения концепции MapReduce для решения следующей задачи: необходимо подсчитать суммарный объем торгов по акциям крупнейших металлургических компаний, осуществляющихся в период с 01.01.2009 по 01.01.2010 на бирже РТС. Биржа РТС предоставляет историю торгов в виде CSV-файла, в котором приведены данные за каждый операционный день.

Задача сводится к следующей последовательности операций, которую необходимо осуществить:

1. Получить с сайта РТС данные о торгах в виде массива строк.
2. Разобрать каждую строку, выделив из нее объем торгов в конкретный день.
3. Просуммировать все значения объема торгов.

Запрограммируем получение и обход массива строк в виде левой свертки, которая будет принимать бинарную функцию, с помощью которой непосредственно осуществляется свертка, начальное значение и функцию map, которую необходимо применить к каждой строке с данными, чтобы извлечь из нее интересующее нас значение в нужном формате.

Сигнатура соответствующего метода следующая:

public R reduce(List<String> issuers, R nil, IBinaryFunction<R, E, R> reduce, IFunction<String, E> map);


Здесь R - тип результата свертки, E - тип элементов по которым осуществляется свертка, issuers - коллекция кодов эмитентов, nil - начальное значение свертки, operator - операция свертки (соответствует операции # в формулах (1) и (2)), map - операция извлечения данных из полученной строки.

Данная операция является очень общей, т.к. позволяет использовать разные типы для данных, по которым осуществляется свертка, и для результата этой свертки.

Код метода, выполняющего свертку в один поток следующий:

    public R reduce(List<String> issuers, R nil, IBinaryFunction<R, E, R> operator, IFunction<String, E> map)

    {

        R result = nil;



        List<String> data = getAllData(issuers);



        for (String line : data)

        {

            result = operator.apply(result, map.apply(line));

        }



        return result;

    }


Данный код является реализацией описанной выше последовательности действий. Метод getAllData скачивает по сети данные для всех эмитентов, затем в цикле осуществляется обход коллекции строк и для каждой вычисляется функция operator. Нетрудно заметить, что данный код реализует левую свертку.

Если передать в метод в качестве параметра operator класс-псевдофункцию (т.к. язык Java не поддерживает функции как объекты первого рода), реализующий операцию сложения, а в качестве параметра map - псевдофункцию, извлекающую из строки объем торгов, и начальное значение - нуль, то наша задача будет решена.

Теперь посмотрим, можно ли вычислить суммарный объем торгов с помощью правой свертки с тем же самым начальным значением - нуль. Исходя из вышеописанных требований к операциям, имеем следующее:

1. Операция сложения (+) оперирует над числами (например в формате с плавающей точкой) и возвращает число.

2. Операция сложения является ассоциативной (конечно же в случае машинной арифметики эта ассоциативность ограничена разрядной сеткой машины, однако для решения данной задачи такое ограничение не является существенным).

3. Начальное значение свертки - 0 коммутирует с операцией сложения: x + 0 = 0 + x = x.

Таким образом можно сделать вывод: суммарный объем торгов можно вычислить с помощью операции правой свертки с начальным значением нуль. Соответственно, по третьей теореме о гомоморфизме, вычисление суммарного объема торгов есть гомоморфизм и его можно вычислять параллельно, объединяя результаты с помощью общей операции правой и левой сверток, в данном случае это - операция сложения.

Данный вывод позволяет нам переписать метод reduce таким образом, чтобы он работал в несколько потоков. Очевидно, что наиболее медленной операцией является получение данных о котировках по сети. Получение данных в несколько потоков позволит максимально использовать пропускную способность канала. Алгоритм в данном случае будет следующим:

1. Для каждого эмитента запустить поток, получающий его котировки по сети и вычисляющий объем торгов для эмитента.

2. Дождаться выполнения всех потоков.

3. Сложить результаты, полученные в каждом потоке.

Сигнатура метода reduce поменяется, потому что теперь подразумевается, что тип результата свертки будет совпадать с типом элементов, по которым она осуществляется:

public R reduce(List<String> issuers, R nil, IBinaryFunction<R, R, R> operator, IFunction<String, R> map);


Код многопоточного метода reduce состоит из двух участков: код управления потоками и код самого потока. Код управления потоками следующий:

    public R reduce(List<String> issuers, R nil, IBinaryFunction<R, R, R> operator, IFunction<String, R> map)

    {

        List<FoldIssuerThread<R>> threads = new ArrayList<FoldIssuerThread<R>>();



        R result = nil;



        for (String issuer : issuers)

            threads.add(new FoldIssuerThread<R>(issuer, nil, operator, map));



        for (FoldIssuerThread<R> thread : threads)

            thread.start();



        for (FoldIssuerThread<R> thread : threads)

            thread.join();



        for (FoldIssuerThread<R> thread : threads)

            result = operator.apply(thread.getResult(), result);



        return result;

    }


Данный код имеет регулярную структуру, что позволяет непосредственно выделить этапы создания потоков, их запуска, ожидания исполнения и подсчета итогового результата.

Непосредственно операция свертки осуществляется в классе, инкапсулирующем поток. Код данного класса выглядит так:

    public static class FoldIssuerThread<R> implements Runnable

    {

        private static final String URL_FMT = "http://www.rts.ru/ru/archive/agrsecuritycsvresults.html?rtype=1&ss=1&issue={0}&day1=20090101&day2=20100101";



        private IBinaryFunction<R, R, R> reduce;



        private IFunction<String, R> map;



        private String issuer;



        private R result;



        private Thread thread;



        public FoldIssuerThread(String issuer, R nil, IBinaryFunction<R, R, R> reduce, IFunction<String, R> map)

        {

            this.reduce = reduce;

            this.map = map;

            this.issuer = issuer;

            this.result = nil;

            thread = new Thread(this);

        }



        public void start()

        {

            thread.start();

        }



        public void join()

        {

            try

            {

                thread.join();

            }

            catch (InterruptedException e)

            {

                e.printStackTrace();

            }

        }



        private String getUrl(String issuer)

        {

            return MessageFormat.format(URL_FMT, issuer);

        }



        private List<String> getData(String issuer) throws IOException

        {

            URL url = new URL(getUrl(issuer));

            URLConnection connection = url.openConnection();

            BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));



            List<String> list = new ArrayList<String>();

            String line;

            while ((line = in.readLine()) != null)

            {

                list.add(line);

            }



            in.close();



            return list;

        }



        public void run()

        {

            try

            {

                List<String> data = getData(issuer);

                for (String line : data)

                    result = reduce.apply(result, map.apply(line));

            }

            catch (IOException e)

            {

                e.printStackTrace();

            }

        }



        public R getResult()

        {

            return result;

        }

    }


В конструктор класса передаются параметры, необходимые для осуществления свертки: код эмитента issuer, начальное значение свертки nil, бинарная псевдо-функция operator и псевдо-функция map. После запуска процесса в методе run происходит получение по сети данных об изменении котировок, которые затем обходятся в цикле и с помощью функций map и operator вычисляется значение свертки. Значение сохраняется в переменной result, которая затем будет возвращена главному потоку.

Стоит отметить, что изменение переменной result ненужно синхронизировать, т.к. данная переменная является локальной для потока. Т.е., несмотря на то, что переменная определена как поле класса, каждый поток будет ссылаться на свой экземпляр переменной. Это связано с особенностями передачи параметров в платформе Java.

Сравнение времени работы однопоточного и многопоточного вариантов reduce


Сравнение производилось на IBM/PC-совместимом компьютере, обладающем следующими параметрами:

- Процессор: Intell Core 2 Duo, 2200 МГц
- Объем ОЗУ: 2048 МБ
- Жесткий диск: 128 ГБ, 5200 об/с
- Сеть: SkyLink ADU 300A, 2,4 МБ/с
- Операционная система: Gentoo Linux 2010.0, ядро 2.6.31
- Java-машина: Sun JDK 1.6.0 u17

Время исполнения программы снималось с помощью утилиты time, все данные в таблицах - значения параметра real в секундах. Замеры снимались для котировок акций следующих компаний: "MAGN", "NLMK", "SGMA", "AMEZ", "VSMZ", "KMTZ", "TAMZ", "CHMK".

Однопоточный Reduce:






Среднее
6,0995,8649,70210,2616,3514,5914,9134,5124,6654,2956,125


Многопоточный Reduce. На каждого эмитента создавался свой поток, соответственно для восьми эмитентов система работала в восемь потоков:






Среднее
2.0332.1832.0292.3272.3202.5201.8851.9792.1832.1372.186

Из приведенных значений видно, что многопоточный Reduce осуществляется быстрее в среднем в 2.8 раза. Данное значение зависит от пропускной способности сети - параметра, являющегося ограничивающим фактором.

Выводы


Рассмотрев понятие свертки и списочного гомоморфизма, а также взаимосвязь между данными понятиями, можно сделать следующие выводы.

1. Если функция является списочным гомоморфизмом, то ее можно вычислить параллельно для каждого подсписка и объединить результаты вычислений с помощью некоторой операции. Функция является списочным гомоморфизмом, если ее можно вычислить как с помощью правой, так и с помощью левой свертки, которые могут определяться через разную операцию, но одинаковые начальные элементы (третья теорема о гомоморфизме).

2. Если операция свертки получает и возвращает данные с одинаковыми типами и является ассоциативной, а единица данной операции с ней коммутирует, то результаты вычисления левой и правой сверток для данной операции и единицы равны. Соответственно, по данной операции можно построить гомоморфизм, который затем можно вычислить с помощью функций отображения (map) и редукции (reduce).

3. Построен вариант параллельной операции Reduce, предназначенной для подсчета общего объема торгов по акциям нескольких предприятий. Значения для обработки передаются по сети. Данный вариант на восьми потоках работает в среднем в 2.8 раза быстрее, чем однопоточный.

Библиография


1. S. Gorlatch. Extracting and implementing list homomorphisms in parallel program development
Science of Computer Programming, Volume 33, Issue 1, Pages 1-27

2. J. Dean, S. Ghemawat. MapReduce: Simplified Data Processing on Large Clusters.
In Proceedings of the USENIX Symposium on Operating SystemsDesign & Implementation (OSDI), pp. 137-147. 2004

3. J. Urbani, S. Kotoulas, E. Oren, F. van Harmelen. Scalable Distributed Reasoning using Map Reduce. Proceedings of the International Semantic Web Conference (2009) Volume: 5823, Publisher: Springer, Pages: 293-309

4. J. Gibbons. The Third Homomorphism Theorem. J. Functional Programming 1 (1): 1 - 000, May 1995.

5. Е. Кирпичев. Свертки и их друзья

Понравилось сообщение - подпишитесь на блог или читайте меня в twitter

13 комментариев:

  1. Спасибо за статью. (прочёл пока только теоретическую часть) Отлично помогла формализовать то, что знаю. Не понятен был термин "суперпозиция функций", так как раньше сталкиволся только с термином "композиции функий", и кажется уместна была бы сносочка.

    ОтветитьУдалить
  2. Спасибо за отзыв. По поводу суперпозиции и композиции функций, т.к. это - термины-синонимы, то просто указал на это в скобочках.

    ОтветитьУдалить
  3. А где статью планируешь публиковать? В зависимости от этого можно высказать комментарии :)

    ОтветитьУдалить
  4. Я бы хотел сначала получить комментарии, а потом подумать на тему "где". Может нигде ))

    ОтветитьУдалить
  5. Ну первое что хотелось бы отметить это общее впечатление. После прочтения у меня не покидало ощущение что я читал доклад а не статью. Всё же это немного разные жанры, согласись. Ну и второе. Статья, как я понял, описывает больше технологию и уже потом подбирает под неё задачу. Скажу сразу что я не ITшник( я больше САУшник) и не знаю всех традиций в IT, но мне такой подход как-то не привычен. Как-то привычнее структура проблема->как решать->выводы.

    PS все выше изложенное не сколько критика, сколько мои мысли возникшие после прочтения
    PPS вопрос-провокация :). Павел, почему я тебя не видел на конференции аспирантов?

    ОтветитьУдалить
  6. @Yanchick, спасибо за конструктивное мнение.

    Да, согласен. У меня нет опыта написания научных статей+ я хотел объединить блогпост и статью и наверное зря. Все же надо отделять котлет от мух. Например, из статьи код я уберу.

    Статья действительно описывает технологию и демонстрирует ее на примере решения задачи. В данном случае на первое место выходит технология, что довольно характерно для мира IT.

    ОтветитьУдалить
  7. Про конференцию: меня там банально не было :) Какую-то работу к тому моменту подготовить не успел, а просто сходить послушать было некогда.

    ОтветитьУдалить
  8. >> в 2.8 раза быстрее
    понятно, что тут решающий фактор - количество ядер в процессоре.
    На одном ядре результаты были бы примерно одинаковыми. А самый быстрый результат - на >= 8 ядрах.
    Думаю стоит об этом сказать в статье.

    ОтветитьУдалить
  9. Скорее всего вместо

    len = foldr (+ 1) 0, где + 1 - функция аргументов x и y, прибавляющая единицу к x.

    лучше написать

    len = foldr (+ 1) 0, где + 1 - функция аргументов x и y, прибавляющая единицу к y.

    как как обычно (и в Haskell, и в Nemerle) foldr принимает на вход функцию, в который первый аргумент - элемент, а второй аккумулятор.

    Если цель - показать взаимосвязь алгебры и программирования, то мне кажеться, что нужно добавить больше алгебры) например, сказать, что множество списков - это группа, а списки из одного элемента - её образующие) и полностью перевести теоремы о гомоморфизме с языка алгебраического (http://ru.wikipedia.org/wiki/Теоремы_об_изоморфизме) на язык программистов (http://ru.wikipedia.org/wiki/MapReduce).

    Если это грамотно сделать, то становиться понятно, почему процедура map может содержать фильтрацию и выдавать на выход меньше элементов, чем ей было передано на вход. Согласно примеру в статье это не так, но статья в вики про map reduce это допускает.

    ОтветитьУдалить
  10. @ankstoo, нет, здесь решающий фактор пропускная способность сети. Сегодня попробую на более быстром интернете, нежели SkyLink

    @Денис, спасибо за совет. Про len я понял, остальное посмотрю.

    ОтветитьУдалить
  11. для java FJTask будет работать в 30 раз быстрее Thread`ов.
    http://software.intel.com/ru-ru/articles/writing-parallel-programs-a-multi-language-tutorial-introduction/
    https://docs.google.com/viewer?url=http://gee.cs.oswego.edu/dl/papers/fj.pdf

    ОтветитьУдалить
  12. >> функция идентификации

    По-русски это называется тождественная функция

    ОтветитьУдалить

Любой Ваш комментарий важен для меня, однако, помните, что действует предмодерация. Давайте уважать друг друга!