morfizm (morfizm) wrote,
morfizm
morfizm

Category:

Map-reduce

Я хотел написать большой красивый пост с картинками, объясняющий map reduce на классическом примере сортировки (обобщённый quicksort).
Нет времени рисовать картинки, расскажу кратко на словах, вдруг кому-нибудь пригодится. Например, Металлер меня спрашивал.

Для начала, quicksort: берём массив A размером N, выбираем число x, назовём его барьером. Делим массив на две части так, чтобы слева всё было меньше x, справа больше. Пусть L это правый конец левой части, а R это левый конец правой. Получаем два куска: A[1:L] и A[R:N]. Сортируем их in place рекурсивно.

В чём тут главный прикол?
Приколов несколько:

*) Этот барьер, число x, его надо угадать. Можно угадать удачно, а можно неудачно. Удачное угадывание это такое, которое разделит массив примерно пополам. Тогда суммарная сложность будет O(N log N), т.к. получится бинарное дерево вызовов, в котором log N уровней. Неудачное угадывание делит на тривиальный кусок и "всё остальное". Сложность будет O(N^2) - вместо дерева мысленно рисуем перевёрнутый треугольник. В качестве x можно брать совершенно любое число, не обязательно один из элементов массива. Но, ничего заранее не зная про массив, очевидно, брать из массива выгоднее - как минимум мы попадаем в диапазон значений, а не вне его. Можно тупо брать A[1]. Это не очень хорошо, т.к. вариант когда "массив уже отсортирован" встречается часто, и это будет "плохой случай". Можно брать A[N/2] - для среднего случая это так же хорошо (или так же плохо) как A[1], но для "уже отсортированного массива" будет хотя бы N log N, а не квадрат. На самом деле случаев, когда quicksort вырождается в квадратичный алгоритм асимптотически мало, и можно не заморачиваться оптимизациями, но существуют известные усложнения quicksort'а, выбирающие барьер "более умно", чтобы гарантировать N log N в любом случае.

*) После того, как мы отсортировали in place обе части, в отличие от алгоритма слияния, ничего сливать не надо. Благодаря принципу разделения, которое мы сделали вначале (каждый элемент левого массива меньше каждого элемента правого), весь массив целиком уже будет в точности отсортированным.

*) Quicksort очень быстрый, потому что он обладает замечательной локальностью. На нижних уровнях рекурсии сортируется лишь небольшой сегмент, это отлично для кэша процессора, и именно это делает quicksort более быстрым, чем многие другие O(N log N) сортировки, не смотря на неоптимальность количества сравнений.

Что можно делать, когда массив очень большой, огромный? Как применить похожую идею, в современных терминах работы с big data?

Для начала, определимся, что такое "очень большой". "Очень большой" = не влезает в оперативную память одного компьютера. Лезть на диск заведомо очень медленно, мы хотим сортировать всё в памяти. Компов много, в принципе, сколько хочешь. Суммарно всё в память влезет. Просто у каждого отдельного памяти лишь ограниченное количество. Допустим, доступно лишь 8 GB и, соответственно, влезает 1 млрд элементов типа int64.

Далее. Как нам даётся исходный массив и куда мы пишем результат?
Представим себе массив как абстракцию. Это такой сервис, у которого можно попросить элемент по номеру (прочесть A[i]) и записать по номеру (присвоить A[i] = v). Это уже достаточно к реальности приближение, потому что такие storage сервисы всегда есть. Скорость работы с индивидуальными ячейками не зависит от размеров массива, потому что они sharded. Пусть там хоть триллион газиллионов ячеек. Главный вопрос - что мы с ними будем делать. Если мы будем триллион газиллионов зачитывать последовательно на одном компьютере, вроде for i in range(1, триллион газиллионов): читать A[i] ... , то будет бесконечно медленно. Читать надо одновременно с разных компов. Что-то вычислять надо одновременно на разных компах. Писать результат нужно тоже одновременно с разных компов. Тогда на каждом компе будет for i in range (1, 1 млрд), что выполнимо за разумное время, и за это же время закончат все остальные компы. Вот, собственно, и всё волшебство с big data - придумать, как сделать так, чтобы работа заканчивалась за разумное время на огромных объёмах данных. Ничего сверъестественного. Чтение всего массива последовательно на одном компьютере считается заведомо неразумным :)

Применение принципов quicksort рассмотрим на конкретном примере. Допустим, данных 80 GB. Т.е. в 10 раз больше, чем память.
Берём 20 компьютеров. 10 из них называем mapper-ами, остальные 10 называем reducer'ами. Вот вам и map-reduce.
У каждого маппера будет конфигурация задачи - он знает свой порядковый номер, и он будет работать лишь с отдельным куском массива, соответствующим своему номеру. Первый маппер с первыми 8 GB (1 млрд элементов), второй с вторыми и т.п. Т.е. разбиваем исходный массив из 10 млрд элементов на интервалы 0 - 1 млрд, 1 млрд - 2 млрд, ..., 9 млрд - 10 млрд.

Мапперы могут зачитать весь массив в память параллельно, поэтому время на чтение всего массива будет равно времени на чтение одного куска.

Далее. В случае quicksort'а мы выбирали барьер, который, как мы надеялись, делит диапазон значений массива по возможности поровну на две части. Мы хотим сделать то же самое, но разделить на десять частей, как можно более равных. В массиве есть определённый диапазон значений, скажем, min и max, и мы вполне могли бы поделить пропорционально. У такого подхода есть ряд проблемы:
- Даже в случае случайных данных будет достаточно большой разброс, придётся резервировать заметно больше памяти, чем 1/10, или использовать больше компьютеров. Дорого.
- Реальные данные обычно не совсем случайны, распределение там не всегда равномерное.
- Чтобы достоверно посчитать min и max, надо зачитать весь массив. Делать это на одном компьютере заведомо неразумно. Т.е. придётся заранее запускать ещё 10 компьютеров просто чтобы посчитать min и max. Дороговато.
Вместо этого обычно используется sampling. Лезем в массив в случайные места, пока не наберём, скажем, 1% элементов. Это, в нашем случае, 100 млн. Сортируем их в памяти, делим получившийся массив на 10 равных частей и смотрим, какие значения на границах - составляем таблицу этих значений, для разбития на 10 частей у нас будет 9 значений-барьеров. Это всё можно сделать на одном компьютере довольно быстро, и эффективность будет намного выше, чем если делать через min, max.

Добавляем таблицу барьеров в конфигурацию мапперов.
Пишем код, чтобы мапперы, зачитывая свой кусок массива, для каждого элемента определяли по барьерам, в какой сегмент его следует направить, и отправляли его соответствующему reducer'у. В памяти хранить вообще ничего не надо, кроме, разве что, буферов, чтобы не по одному значению всё пересылать.
Каждый из редьюсеров будет получать потоки значений со всех мапперов, но эти значения будут только из его сегмента, из его диапазона.
Когда редьюсер всё собрал, он сортирует это в памяти, и записывает отсортированный кусок обратно в большой массив. Редьюсерам нужно только в конце сбора данных договориться о том, сколько элементов собрано другими редьюсерами, просто чтобы посчитать индекс, начиная с которого можно писать свой отсортированный кусок обратно в большой массив (для редьюсера номер два этот индекс совсем не обязательно будет равен 1 млрд, т.к. редьюсер номер один мог собрать меньше значений - так уж сложилось, что барьеры через sampling вышли не идеальными). Это сравнительно легко имплементировать.

Вот, собственно, и всё. Когда все редьюсеры закончат работу, массив будет отсортирован.
Добро пожаловать в элитный клуб понимающих map-reduce ;)

Все остальные применения map-reduce либо аналогичны, либо проще. Обычно мапперы зачитывают исходные данные и для каждого элемента делают какое-то действие (как-то их перемешивая отправляют одному или нескольким редьюсерам). Редьюсеры принимают данные от одного или нескольких мапперов, делают какие-то вычисления и пишут ответ. Из комбинаций мапперов и редьюсеров можно делать цепочки длиной больше, чем 2 шага. Главное стараться делать так, чтобы задачи для каждого отдельного маппера или редьюсера были не слишком долгими - желательно пропорциональны по сложности объёму его памяти, а не параметрам исходной задачи или размерам входных данных. Тогда всё будет работать быстро, а от размеров исходных данных будет зависеть только стоимость в машинных часах (удвоился объём данных - удвоим количество мапперов и редьюсеров, и всё путём). Названия "маппер" или "редьюсер" обычно соответствуют разветвлениям или сборам потоков данных на диаграмме: мапперы обычно читают из одного источника и пишут в один или несколько, редьюсеры наоборот: собирают из одного или нескольких, а пишут только в один.
Tags: 1, software development
Subscribe
  • Post a new comment

    Error

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

    When you submit the form an invisible reCAPTCHA check will be performed.
    You must follow the Privacy Policy and Google Terms of use.
  • 13 comments