data.table и параллельные вычисления
После этого поста: многоядерный и data.table в R, мне было интересно, есть ли способ использовать все ядра при использовании data.table, обычно выполнение расчетов по группам можно распараллелить. Кажется, что plyr
позволяет такие операции по замыслу.
2 ответа
Первое, что нужно проверить, это то, что data.table
FAQ 3.1 пункт 2 утонул в:
Одно выделение памяти делается только для самой большой группы, затем эта память используется повторно для других групп. Существует очень мало мусора для сбора.
Это одна из причин, по которой группировка data.table происходит быстро. Но этот подход не поддается параллелизации. Распараллеливание означает копирование данных в другие потоки вместо затрат времени. Но я понимаю, что data.table
группировка обычно быстрее, чем plyr
с .parallel
в любом случае. Это зависит от времени вычисления задачи для каждой группы, и может ли это время вычисления быть легко уменьшено или нет. Перемещение данных часто доминирует (при сравнении 1 или 3 прогонов задач с большими данными).
Чаще, на самом деле, это какая-то гоча, которая кусает в j
выражение [.data.table
, Например, недавно мы увидели низкую производительность data.table
группировка, но виновник оказался min(POSIXct)
( Агрегирование в R более 80K уникальных идентификаторов). Избежание этого гота привело к ускорению в 50 раз.
Итак, мантра это: Rprof
, Rprof
, Rprof
,
Кроме того, пункт 1 из того же FAQ может быть значительным:
Только этот столбец сгруппирован, остальные 19 игнорируются, потому что data.table проверяет выражение j и понимает, что не использует другие столбцы.
Так, data.table
на самом деле не следует парадигме "разделяй-применяй-комбинируй". Это работает по-другому. split-apply-comb поддается параллелизации, но на самом деле не масштабируется до больших данных.
Также см. Сноску 3 в виньетке data.table:
Интересно, сколько людей внедряют параллельные методы в код, который использует векторное сканирование?
Это пытается сказать: "Конечно, параллель значительно быстрее, но сколько времени на самом деле нужно с эффективным алгоритмом?".
НО если вы профилировали (используя Rprof
), и задача для каждой группы действительно требует больших вычислительных ресурсов, тогда могут помочь 3 сообщения в datatable-help, включая слово "многоядерный":
многоядерные посты на datatable-help
Конечно, есть много задач, где распараллеливание было бы хорошо в data.table, и есть способ сделать это. Но это еще не было сделано, так как обычно другие факторы кусают, поэтому это был низкий приоритет. Если вы можете опубликовать воспроизводимые фиктивные данные с эталонами и результатами Rprof, это поможет повысить приоритет.
Я провел несколько тестов в соответствии с предыдущей мантрой @matt Dowle о Rprof, Rprof, Rprof.
Я обнаружил, что решение о распараллеливании зависит от контекста; но, вероятно, имеет значение. В зависимости от тестовых операций (например, foo
ниже, который можно настроить) и количество используемых ядер (я пробую и 8 и 24), я получаю разные результаты.
Ниже результаты:
- используя 8 ядер, я вижу улучшение на 21% в этом примере для распараллеливания
- используя 24 ядра, я вижу улучшение на 14%.
Я также смотрю на некоторые реальные (не разделяемые) данные / операции, которые показывают большие (33%
или же 25%
, два разных теста) улучшение паралелизации с 24 ядрами. Редактировать май 2018 г. Новый набор примеров из реальных примеров демонстрирует улучшение, достигающее 85% от параллельных операций с 1000 группами.
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
Пример ниже:
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
Профилирование:
Мы можем использовать этот ответ, чтобы дать более прямой ответ на оригинальный комментарий @matt dowle к профилированию.
В результате мы видим, что большая часть вычислительного времени обрабатывается base
и не data.table
, data.table
Сами операции, как и ожидалось, исключительно быстрые. Хотя некоторые могут утверждать, что это свидетельствует о том, что нет необходимости в параллелизме внутри data.table
Я утверждаю, что этот рабочий процесс / набор операций не является нетипичным. То есть я сильно подозреваю, что большинство крупных data.table
агрегация включает в себя значительное количество data.table
код; и что это связано с интерактивным использованием против разработки / производственного использования. Поэтому я прихожу к выводу, что параллелизм был бы полезен data.table
для больших скоплений.
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
Результаты:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
перекрестно размещен в github/data.table
Да (хотя, возможно, это того не стоит, на что указывает @Alex W).
Ниже приведен простой шаблон для этого. Для простоты изложения я использую пример, в котором это не стоит (используя mean
функция), но это показывает, из шаблона.
Пример:
Предположим, вы хотите вычислить среднее значение Petal.Length по видам в наборе данных радужной оболочки.
Вы можете сделать это довольно напрямую, используя data.table как:
as.data.table(iris)[by=Species,,.(MPL=mean(Petal.Length))]
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
Но если mean
вместо этого было достаточно длительное и дорогое вычисление (возможно, как определено профилированием, хотя иногда это просто "очевидно"), вы можете использовать parallel::mclapply
, Поскольку сведение к минимуму взаимодействия со всеми подпроцессами, порождаемые mclapply, могут значительно сократить общие вычисления, вместо передачи выборок из data.table в каждый подпроцесс, вы хотите передать только индексы выбора. Далее, сначала отсортировав таблицу data.table, вы можете передать только диапазон (max и min) этих индексов. Как это:
> o.dt<-as.data.table(iris)[order(Species)] # note: iris happens already to be ordered
> i.dt<-o.dt[,by=Species,.(irange=.(range(.I)))]
> i.dt
Species irange
1: setosa 1,50
2: versicolor 51,100
3: virginica 101,150
> result<-mclapply(seq(nrow(i.dt)),function(r) o.dt[do.call(seq,as.list(i.dt[r,irange][[1]])),.(MPL=mean(Petal.Length))])
> result
[[1]]
MPL
1: 1.462
[[2]]
MPL
1: 4.26
[[3]]
MPL
1: 5.552
> result.dt<-cbind(i.dt,rbindlist(result))[,-2]
> result.dt
Species MPL
1: setosa 1.462
2: versicolor 4.260
3: virginica 5.552
Рассматривая шаблон:
- Заказать вход.
- Вычислить диапазон индекса для каждой группы.
- Определить аноним
function
чтобы извлечь строки, содержащие членов группы, и выполнить необходимые вычисления (в данном случае, среднее). - Примените функцию к каждой группе, используя mclapply для индексов строк диапазонов индексов.
- использование
rbindlist
чтобы получить результаты в виде data.table,cbind
это к входу, и отбросьте это индексные столбцы (если Вы не должны держать их вокруг по некоторой другой причине).
Заметки:
- Финал
rbindlist
как правило, дорого и может быть пропущено в зависимости от вашего приложения).
Сделать:
- убедите команду data.table, что этот шаблон достаточно общий и достаточно полезный, чтобы его могли вызывать дополнительные параметры индексации data.table. Представьте, что передача mc=TRUE вызовет этот шаблон и поддержит дополнительные параллельные опции в...
iris.dt[by=Species,,.(MPL=mean(Petal.Length)), mc=TRUE, mc.preschedule=FALSE, mc.set.seed=TRUE,...]