?

Log in

No account? Create an account
   Journal    Friends    Archive    Profile    Memories
 

Map-reduce - morfizm


Jun. 6th, 2014 03:48 am Map-reduce

Я хотел написать большой красивый пост с картинками, объясняющий map reduce на классическом примере сортировки (обобщённый quicksort).
Нет времени рисовать картинки, расскажу кратко на словах, вдруг кому-нибудь пригодится. Например, Металлер меня спрашивал.

Для начала, quicksort: берём массив A размером N, выбираем число x, назовём его барьером. Делим массив на две части так, чтобы слева всё было меньше x, справа больше. Пусть L это правый конец левой части, а R это левый конец правой. Получаем два куска: A[1:L] и A[R:N]. Сортируем их in place рекурсивно.

В чём тут главный прикол?
Приколов несколько:

*) Этот барьер, число x, его надо угадать. Можно угадать удачно, а можно неудачно. Удачное угадывание это такое, которое разделит массив примерно пополам. Тогда суммарная сложность будет O(N log N), т.к. получится бинарное дерево вызовов, в котором log N уровней. Неудачное угадывание делит на тривиальный кусок и "всё остальное". Сложность будет O(N^2) - вместо дерева мысленно рисуем перевёрнутый треугольник. В качестве x можно брать совершенно любое число, не обязательно один из элементов массива. Но, ничего заранее не зная про массив, очевидно, брать из массива выгоднее - как минимум мы попадаем в диапазон значений, а не вне его. Можно тупо брать A[1]. Это не очень хорошо, т.к. вариант когда "массив уже отсортирован" встречается часто, и это будет "плохой случай". Можно брать A[N/2] - для среднего случая это так же хорошо (или так же плохо) как A[1], но для "уже отсортированного массива" будет хотя бы N log N, а не квадрат. На самом деле случаев, когда quicksort вырождается в квадратичный алгоритм асимптотически мало, и можно не заморачиваться оптимизациями, но существуют известные усложнения quicksort'а, выбирающие барьер "более умно", чтобы гарантировать N log N в любом случае.

*) После того, как мы отсортировали in place обе части, в отличие от алгоритма слияния, ничего сливать не надо. Благодаря принципу разделения, которое мы сделали вначале (каждый элемент левого массива меньше каждого элемента правого), весь массив целиком уже будет в точности отсортированным.

*) Quicksort очень быстрый, потому что он обладает замечательной локальностью. На нижних уровнях рекурсии сортируется лишь небольшой сегмент, это отлично для кэша процессора, и именно это делает quicksort более быстрым, чем многие другие O(N log N) сортировки, не смотря на неоптимальность количества сравнений.

Что можно делать, когда массив очень большой, огромный? Как применить похожую идею, в современных терминах работы с big data?

Для начала, определимся, что такое "очень большой". "Очень большой" = не влезает в оперативную память одного компьютера. Лезть на диск заведомо очень медленно, мы хотим сортировать всё в памяти. Компов много, в принципе, сколько хочешь. Суммарно всё в память влезет. Просто у каждого отдельного памяти лишь ограниченное количество. Допустим, доступно лишь 8 GB и, соответственно, влезает 1 млрд элементов типа int64.

Далее. Как нам даётся исходный массив и куда мы пишем результат?
Представим себе массив как абстракцию. Это такой сервис, у которого можно попросить элемент по номеру (прочесть A[i]) и записать по номеру (присвоить A[i] = v). Это уже достаточно к реальности приближение, потому что такие storage сервисы всегда есть. Скорость работы с индивидуальными ячейками не зависит от размеров массива, потому что они sharded. Пусть там хоть триллион газиллионов ячеек. Главный вопрос - что мы с ними будем делать. Если мы будем триллион газиллионов зачитывать последовательно на одном компьютере, вроде for i in range(1, триллион газиллионов): читать A[i] ... , то будет бесконечно медленно. Читать надо одновременно с разных компов. Что-то вычислять надо одновременно на разных компах. Писать результат нужно тоже одновременно с разных компов. Тогда на каждом компе будет for i in range (1, 1 млрд), что выполнимо за разумное время, и за это же время закончат все остальные компы. Вот, собственно, и всё волшебство с big data - придумать, как сделать так, чтобы работа заканчивалась за разумное время на огромных объёмах данных. Ничего сверъестественного. Чтение всего массива последовательно на одном компьютере считается заведомо неразумным :)

Применение принципов quicksort рассмотрим на конкретном примере. Допустим, данных 80 GB. Т.е. в 10 раз больше, чем память.
Берём 20 компьютеров. 10 из них называем mapper-ами, остальные 10 называем reducer'ами. Вот вам и map-reduce.
У каждого маппера будет конфигурация задачи - он знает свой порядковый номер, и он будет работать лишь с отдельным куском массива, соответствующим своему номеру. Первый маппер с первыми 8 GB (1 млрд элементов), второй с вторыми и т.п. Т.е. разбиваем исходный массив из 10 млрд элементов на интервалы 0 - 1 млрд, 1 млрд - 2 млрд, ..., 9 млрд - 10 млрд.

Мапперы могут зачитать весь массив в память параллельно, поэтому время на чтение всего массива будет равно времени на чтение одного куска.

Далее. В случае quicksort'а мы выбирали барьер, который, как мы надеялись, делит диапазон значений массива по возможности поровну на две части. Мы хотим сделать то же самое, но разделить на десять частей, как можно более равных. В массиве есть определённый диапазон значений, скажем, min и max, и мы вполне могли бы поделить пропорционально. У такого подхода есть ряд проблемы:
- Даже в случае случайных данных будет достаточно большой разброс, придётся резервировать заметно больше памяти, чем 1/10, или использовать больше компьютеров. Дорого.
- Реальные данные обычно не совсем случайны, распределение там не всегда равномерное.
- Чтобы достоверно посчитать min и max, надо зачитать весь массив. Делать это на одном компьютере заведомо неразумно. Т.е. придётся заранее запускать ещё 10 компьютеров просто чтобы посчитать min и max. Дороговато.
Вместо этого обычно используется sampling. Лезем в массив в случайные места, пока не наберём, скажем, 1% элементов. Это, в нашем случае, 100 млн. Сортируем их в памяти, делим получившийся массив на 10 равных частей и смотрим, какие значения на границах - составляем таблицу этих значений, для разбития на 10 частей у нас будет 9 значений-барьеров. Это всё можно сделать на одном компьютере довольно быстро, и эффективность будет намного выше, чем если делать через min, max.

Добавляем таблицу барьеров в конфигурацию мапперов.
Пишем код, чтобы мапперы, зачитывая свой кусок массива, для каждого элемента определяли по барьерам, в какой сегмент его следует направить, и отправляли его соответствующему reducer'у. В памяти хранить вообще ничего не надо, кроме, разве что, буферов, чтобы не по одному значению всё пересылать.
Каждый из редьюсеров будет получать потоки значений со всех мапперов, но эти значения будут только из его сегмента, из его диапазона.
Когда редьюсер всё собрал, он сортирует это в памяти, и записывает отсортированный кусок обратно в большой массив. Редьюсерам нужно только в конце сбора данных договориться о том, сколько элементов собрано другими редьюсерами, просто чтобы посчитать индекс, начиная с которого можно писать свой отсортированный кусок обратно в большой массив (для редьюсера номер два этот индекс совсем не обязательно будет равен 1 млрд, т.к. редьюсер номер один мог собрать меньше значений - так уж сложилось, что барьеры через sampling вышли не идеальными). Это сравнительно легко имплементировать.

Вот, собственно, и всё. Когда все редьюсеры закончат работу, массив будет отсортирован.
Добро пожаловать в элитный клуб понимающих map-reduce ;)

Все остальные применения map-reduce либо аналогичны, либо проще. Обычно мапперы зачитывают исходные данные и для каждого элемента делают какое-то действие (как-то их перемешивая отправляют одному или нескольким редьюсерам). Редьюсеры принимают данные от одного или нескольких мапперов, делают какие-то вычисления и пишут ответ. Из комбинаций мапперов и редьюсеров можно делать цепочки длиной больше, чем 2 шага. Главное стараться делать так, чтобы задачи для каждого отдельного маппера или редьюсера были не слишком долгими - желательно пропорциональны по сложности объёму его памяти, а не параметрам исходной задачи или размерам входных данных. Тогда всё будет работать быстро, а от размеров исходных данных будет зависеть только стоимость в машинных часах (удвоился объём данных - удвоим количество мапперов и редьюсеров, и всё путём). Названия "маппер" или "редьюсер" обычно соответствуют разветвлениям или сборам потоков данных на диаграмме: мапперы обычно читают из одного источника и пишут в один или несколько, редьюсеры наоборот: собирают из одного или нескольких, а пишут только в один.

13 comments - Leave a commentPrevious Entry Share Next Entry

Comments:

From:me_milady
Date:June 6th, 2014 10:59 am (UTC)
(Link)
Вау, я теперь тоже в клубе понимающих. : P
From:morfizm
Date:June 6th, 2014 11:13 am (UTC)
(Link)
Поздравляю! :P
P.S. Я там чуть-чуть обновил/добавил :)
From:rezkiy
Date:June 6th, 2014 05:07 pm (UTC)
(Link)
Хорошо объяснил кстати. Мне тоже недавно кто-то объяснял, но у тебя более что ли сложный пример, более интересная картинка получается.
From:morfizm
Date:June 6th, 2014 05:56 pm (UTC)
(Link)
Ага. Казалось бы, банально сортировка, но ничего сложнее из мап-редьюса я не видел. Это именно тот пример, на котором я сам в своё время понял мап-редьюс. Именно так работает периодическое строительство поискового индекса на моём предыдущем рабочем месте (замени действие на редьюсерах с "сортировки" на "индексирование" и вот тебе оно) :) Раскидывание по мап-редьюсу нужно, потому что sharding индекса идёт не по тем же doc.ids, по которым документы хранятся в storage, а по каким-то атрибутам из этих документов, причём которые мутабельны.
From:archaicos
Date:June 7th, 2014 01:18 am (UTC)
(Link)
Пример с quicksort хороший, но объяснение не производит явную параллель, опуская важную деталь имплементации, как я уже говорил объяснителю. Классический quicksort после выбора центрального значения/элемента, делящего данный (под)массив на две части, производит перемещение элементов (под)массива так чтобы действительно по одну сторону от центрального значения/элемента оказались не бóльшие его элементы, а по другую — не меньшие. Без этого шага quicksort не работает.

В объяснении фигурирует эквивалентный шаг (выбор сортировщика следующего уровня, которому нужно посылать числа в зависимости от их попадания между барьерами, полученными семплингом).

Было бы очень хорошо явно указать на соответствие обоих друг другу.

И вообще, для тупых лучше больше нагдядных картинок! :)
From:morfizm
Date:June 9th, 2014 07:03 am (UTC)
(Link)
Ну, я же сказал, что хотел с картинками, но лениво. Хочешь - сделай картинки и будет общая статья, отдадим копирайт в public domain :)

Перемещение элементов это важный момент, но я его опустил, понадеявшись, что читатель это сам поймёт. Наверное, стоило для "перехода" сказать, что quicksort не обязательно делать in-place, а можно переход на следующий уровень делать, "отправляя" элементы в одну или в другую кучку.
From:_m_e_
Date:June 7th, 2014 06:28 am (UTC)
(Link)
Главное в объяснении - не давать клиентам времени задуматься о деталях того, как написать эффективный shuffle N x M где оба числа порядка десятков тысяч или больше :)
From:morfizm
Date:June 9th, 2014 07:01 am (UTC)
(Link)
О! Расскажи про эффективный shuffle :)
(Я нахожусь на уровне "это всё фреймворк делает behind the scenes")
From:_m_e_
Date:June 9th, 2014 07:10 am (UTC)
(Link)
Я сам не знаю, у нас shuffle свой :) И мы с ним постоянно боремся, уже 4-ая версия (это не считая .5-ые).

Для начала, первая дилемма как писать в сотню тысяч файлов одновременно - хорошие буфера держать в памяти накладно, а писать по малу - тоже плохо.
From:morfizm
Date:June 9th, 2014 07:29 am (UTC)
(Link)
Да, хорошая задачка. Но меня даже не тянет о ней думать с нуля, т.к. я почти уверен, что не изобрету более хороший велосипед. Я бы сначала ознакомился с существующими решениями :)
From:_m_e_
Date:June 9th, 2014 07:31 am (UTC)
(Link)
надо перенести разговор во внутреннюю почту :)
From:morfizm
Date:June 9th, 2014 07:34 am (UTC)
(Link)
Давай :) Я на днях напишу тебе там.
From:livejournal
Date:December 31st, 2014 10:32 am (UTC)

Map/Reduce вместо JOIN

(Link)
User ermouth referenced to your post from Map/Reduce вместо JOIN saying: [...] Полгода назад morfizm написал хороший пост [...]