Минимизация накладных расходов с параллельными функциями в R
Я попытался сообщить об ошибке, с которой я столкнулся в mclapply, относительно того, что большие возвращаемые значения недопустимы.
Очевидно ошибка была исправлена в версиях разработки, но меня больше интересует комментарий, который сделал ответчик:
размер сериализованных объектов был ограничен 2 ГБ, например, mclapply может возвращать разветвленные процессы, и в этом примере делается попытка 16 ГБ. Это было отменено (для 64-битных сборок) в R-devel, но такое использование является очень необычным и довольно неэффективным (пример требует около 150 ГБ из-за всех копий, вовлеченных в (не) сериализацию)
Если использование mclapply для параллельных вычислений с большими данными неэффективно, то как лучше это сделать? Моя потребность делать подобные вещи только увеличивается, и я определенно сталкиваюсь с узкими местами везде. Учебники, которые я видел, были довольно простыми введениями о том, как использовать функции, но не обязательно, как эффективно использовать функции в управлении компромиссами. В документации есть небольшая реклама по этому компромиссу:
mc.preschedule: если установлено значение "ИСТИНА", то вычисление сначала делится на (самое большее) количество заданий, если есть ядра, а затем задания запускаются, причем каждое задание может охватывать более одного значения. Если установлено значение "ЛОЖЬ", то одно задание разветвляется для каждого значения "Х". Первое лучше для коротких вычислений или большого числа значений в "X", второе лучше для заданий, которые имеют высокую дисперсию времени завершения и не слишком много значений "X" по сравнению с "mc.cores"
а также
По умолчанию ("mc.preschedule = TRUE") вход "X" делится на столько частей, сколько имеется ядер (в настоящее время значения распределяются по ядрам последовательно, т.е. первое значение для ядра 1, второе для ядра 2, ... (ядро + 1)-ое значение для ядра 1 и т. д.), а затем один процесс разветвляется на каждое ядро, и результаты собираются.
Без предварительного планирования отдельное задание разветвляется для каждого значения "X". Чтобы убедиться, что одновременно выполняется не более mc.cores, после того, как это число было разветвлено, главный процесс ожидает завершения дочернего процесса до следующей разветвления.
Надежный сравнительный анализ этих вещей занимает много времени, поскольку некоторые проблемы проявляются только в масштабе, и тогда трудно понять, что происходит. Поэтому было бы полезно лучше понять поведение функций.
редактировать:
У меня нет конкретного примера, потому что я много использую mclapply и хотел лучше узнать, как думать о влиянии на производительность. И хотя запись на диск обошла бы ошибку, я не думаю, что это поможет в отношении (де) сериализации, которая должна произойти, которая также должна была бы пройти через дисковый ввод-вывод.
Один рабочий процесс будет следующим: взять большую разреженную матрицу M
и записать его на диск кусками (скажем, M1-M100
) потому что сама М не вписывается в память.
Теперь скажем, для каждого пользователя i
в I
имеются Ci
столбцы в M
что я хочу сложить и агрегировать на уровне пользователя. С меньшими данными это было бы относительно тривиально:
m = matrix(runif(25), ncol=5)
df = data.frame(I=sample(1:6, 20, replace=T), C=sample(1:5, 20, replace=T))
somefun = function(m) rowSums(m)
res = sapply(sort(unique(df$I)), function(i) somefun(m[,df[df$I == i,]$C]))
Но с большими данными мой подход состоял в том, чтобы разделить data.frame пользователя / столбцов на разные data.frames в зависимости от того, какая матрица M1-M100
столбец будет в, делать параллельный цикл над этими data.frames, читать в связанной матрице, а затем циклически перебирать пользователей, извлекать столбцы и применять мою функцию, а затем брать список вывода, повторять цикл снова и повторно объединять,
Это не идеально, если у меня есть функция, которая не может быть сгруппирована таким образом (на данный момент это не проблема), но я, очевидно, перебираю слишком много данных с этим подходом.
2 ответа
Я надеюсь, что мой ответ не слишком поздно, но я думаю, что ваш пример может быть обработан с помощью общей памяти / файлов через bigmemory
пакет.
Давайте создадим данные
library(bigmemory)
library(parallel)
#your large file-backed matrix (all values initialized to 0)
#it can hold more than your RAM as it is written to a file
m=filebacked.big.matrix(nrow=5,
ncol=5,
type="double",
descriptorfile="file_backed_matrix.desc",
backingfile="file_backed_matrix",
backingpath="~")
#be careful how to fill the large matrix with data
set.seed(1234)
m[]=c(matrix(runif(25), ncol=5))
#print the data to the console
m[]
#your user-col mapping
#I have added a unique idx that will be used below
df = data.frame(unique_idx=1:20,
I=sample(1:6, 20, replace=T),
C=sample(1:5, 20, replace=T))
#the file-backed matrix that will hold the results
resm=filebacked.big.matrix(nrow=nrow(df),
ncol=2,
type="double",init = NA_real_,
descriptorfile="res_matrix.desc",
backingfile="res_backed_matrix",
backingpath="~")
#the first column of resm will hold the unique idx of df
resm[,1]=df$unique_idx
resm[]
Теперь перейдем к функции, которую вы хотите выполнить. Вы написали rowSums
но исходя из вашего текста вы имели в виду colSums
, Я изменил это соответственно.
somefun = function(x) {
#attach the file-backed big.matrix
#it makes the matrix "known" to the R process (no copying involved!)
#input
tmp=attach.big.matrix("~/file_backed_matrix.desc")
#output
tmp_out=attach.big.matrix("~/res_matrix.desc")
#store the output in the file-backed matrix resm
tmp_out[x$unique_idx,2]=c(colSums(tmp[,x$C,drop=FALSE]))
#return a little more than the colSum result
list(pid=Sys.getpid(),
user=x$I[1],
col_idx=x$C)
}
Делаем параллельный расчет на всех ядрах
#perform colSums using different threads
res=mclapply(split(df,df$I),somefun,mc.cores = detectCores())
Результаты проверки
#processes IDs
unname(sapply(res,function(x) x$pid))
#28231 28232 28233 28234 28231 28232
#users
unname(sapply(res,function(x) x$user))
#1 2 3 4 5 6
#column indexes
identical(sort(unname(unlist(sapply(res,function(x) x$col_idx)))),sort(df$C))
#[1] TRUE
#check result of colSums
identical(lapply(split(df,df$I),function(x) resm[x$unique_idx,2]),
lapply(split(df,df$I),function(x) colSums(m[,x$C,drop=FALSE])))
#[1] TRUE
Редактировать: я обратился к вашему комментарию в моем редактировании. Сохранение результатов в выходной матрице на основе файла resm
работает как положено.
Чтобы ограничить накладные расходы для умеренно большого N, почти всегда лучше использовать mc.preschedule = TRUE
(т.е. разделить работу на столько частей, сколько есть ядер).
Похоже, ваш главный компромисс между использованием памяти и процессором. То есть, вы можете распараллеливать только до тех пор, пока текущие процессы не исчерпают вашу оперативную память. Следует учитывать, что разные работники могут читать один и тот же объект в сеансе R без дублирования. Таким образом, только объекты, измененные / созданные в параллельном вызове функции, имеют добавленную память для каждого ядра.
Если вы используете максимум памяти, я бы предложил разделить все ваши вычисления на несколько подзадач и обойти их последовательно (например, с ошибкой), вызвать в этом цикле mclapply, чтобы распараллелить каждую подзадачу, и, возможно, сохранить выходные данные подзадача к диску, чтобы не хранить все это в памяти.