data.table и параллельные вычисления

После этого поста: многоядерный и data.table в R, мне было интересно, есть ли способ использовать все ядра при использовании data.table, обычно выполнение расчетов по группам можно распараллелить. Кажется, что plyr позволяет такие операции по замыслу.

2 ответа

Решение

Первое, что нужно проверить, это то, что data.table FAQ 3.1 пункт 2 утонул в:

Одно выделение памяти делается только для самой большой группы, затем эта память используется повторно для других групп. Существует очень мало мусора для сбора.

Это одна из причин, по которой группировка data.table происходит быстро. Но этот подход не поддается параллелизации. Распараллеливание означает копирование данных в другие потоки вместо затрат времени. Но я понимаю, что data.table группировка обычно быстрее, чем plyr с .parallel в любом случае. Это зависит от времени вычисления задачи для каждой группы, и может ли это время вычисления быть легко уменьшено или нет. Перемещение данных часто доминирует (при сравнении 1 или 3 прогонов задач с большими данными).

Чаще, на самом деле, это какая-то гоча, которая кусает в j выражение [.data.table, Например, недавно мы увидели низкую производительность data.table группировка, но виновник оказался min(POSIXct) ( Агрегирование в R более 80K уникальных идентификаторов). Избежание этого гота привело к ускорению в 50 раз.

Итак, мантра это: Rprof , Rprof , Rprof ,

Кроме того, пункт 1 из того же FAQ может быть значительным:

Только этот столбец сгруппирован, остальные 19 игнорируются, потому что data.table проверяет выражение j и понимает, что не использует другие столбцы.

Так, data.table на самом деле не следует парадигме "разделяй-применяй-комбинируй". Это работает по-другому. split-apply-comb поддается параллелизации, но на самом деле не масштабируется до больших данных.

Также см. Сноску 3 в виньетке data.table:

Интересно, сколько людей внедряют параллельные методы в код, который использует векторное сканирование?

Это пытается сказать: "Конечно, параллель значительно быстрее, но сколько времени на самом деле нужно с эффективным алгоритмом?".

НО если вы профилировали (используя Rprof), и задача для каждой группы действительно требует больших вычислительных ресурсов, тогда могут помочь 3 сообщения в datatable-help, включая слово "многоядерный":

многоядерные посты на datatable-help

Конечно, есть много задач, где распараллеливание было бы хорошо в data.table, и есть способ сделать это. Но это еще не было сделано, так как обычно другие факторы кусают, поэтому это был низкий приоритет. Если вы можете опубликовать воспроизводимые фиктивные данные с эталонами и результатами Rprof, это поможет повысить приоритет.

Я провел несколько тестов в соответствии с предыдущей мантрой @matt Dowle о Rprof, Rprof, Rprof.

Я обнаружил, что решение о распараллеливании зависит от контекста; но, вероятно, имеет значение. В зависимости от тестовых операций (например, foo ниже, который можно настроить) и количество используемых ядер (я пробую и 8 и 24), я получаю разные результаты.

Ниже результаты:

  1. используя 8 ядер, я вижу улучшение на 21% в этом примере для распараллеливания
  2. используя 24 ядра, я вижу улучшение на 14%.

Я также смотрю на некоторые реальные (не разделяемые) данные / операции, которые показывают большие (33% или же 25%, два разных теста) улучшение паралелизации с 24 ядрами. Редактировать май 2018 г. Новый набор примеров из реальных примеров демонстрирует улучшение, достигающее 85% от параллельных операций с 1000 группами.

R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods
[8] base

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4

R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4     

Пример ниже:

library(data.table)
library(stringi)
library(microbenchmark)

set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)

my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)

foo <- function(dt) {
  dt2 <- dt ## needed for .SD lock
  nr <- nrow(dt2)

  idx <- sample.int(nr, 1, replace=FALSE)

  dt2[idx,][, `:=` (
    new_var1= V1 / V2,
    new_var2= V4 * V3 / V10,
    new_var3= sum(V12),
    new_var4= ifelse(V10 > 0, V11 / V13, 1),
    new_var5= ifelse(V9 < 0, V8 / V18, 1)
  )]


  return(dt2[idx,])
}

split_df <- function(d, var) {
  base::split(d, get(var, as.environment(d)))
}

foo2 <- function(dt) {
  dt2 <- split_df(dt, "grps")

  require(parallel)
  cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  clusterExport(cl, varlist= "foo")
  clusterExport(cl, varlist= "dt2", envir = environment())
  clusterEvalQ(cl, library("data.table"))

  dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)

  parallel::stopCluster(cl)
  return(rbindlist(dt2))
}

print(parallel::detectCores()) # 8

microbenchmark(
  serial= dt[,foo(.SD), by= "grps"],
  parallel= foo2(dt),
  times= 10L
)

Unit: seconds
     expr      min       lq     mean   median       uq      max neval cld
   serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b
 parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a 

print(parallel::detectCores()) # 24

Unit: seconds
     expr       min        lq     mean   median       uq      max neval cld
   serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a
 parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a

Профилирование:

Мы можем использовать этот ответ, чтобы дать более прямой ответ на оригинальный комментарий @matt dowle к профилированию.

В результате мы видим, что большая часть вычислительного времени обрабатывается base и не data.table, data.table Сами операции, как и ожидалось, исключительно быстрые. Хотя некоторые могут утверждать, что это свидетельствует о том, что нет необходимости в параллелизме внутри data.table Я утверждаю, что этот рабочий процесс / набор операций не является нетипичным. То есть я сильно подозреваю, что большинство крупных data.table агрегация включает в себя значительное количество data.table код; и что это связано с интерактивным использованием против разработки / производственного использования. Поэтому я прихожу к выводу, что параллелизм был бы полезен data.table для больших скоплений.

library(profr)

prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
                       simplify = FALSE)

pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
  fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
  pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}

sort(sapply(fun_timing, sum)) #  no large outliers

fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
  ret <- data.table(fun= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
  ret <- data.table(pkg= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

fun_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "fun"][
  order(avg_time, decreasing = TRUE),][1:10,]

pkg_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "pkg"][
  order(avg_time, decreasing = TRUE),]

Результаты:

                      fun total_time avg_time avg_pct
 1:               base::[    670.362  6.70362  0.2694
 2:      NA::[.data.table    667.350  6.67350  0.2682
 3:       .GlobalEnv::foo    335.784  3.35784  0.1349
 4:              base::[[    163.044  1.63044  0.0655
 5:   base::[[.data.frame    133.790  1.33790  0.0537
 6:            base::%in%    120.512  1.20512  0.0484
 7:        base::sys.call     86.846  0.86846  0.0348
 8: NA::replace_dot_alias     27.824  0.27824  0.0112
 9:           base::which     23.536  0.23536  0.0095
10:          base::sapply     22.080  0.22080  0.0089

          pkg total_time avg_time avg_pct
1:       base   1397.770 13.97770  0.7938
2: .GlobalEnv    335.784  3.35784  0.1908
3: data.table     27.262  0.27262  0.0155

перекрестно размещен в github/data.table

Да (хотя, возможно, это того не стоит, на что указывает @Alex W).

Ниже приведен простой шаблон для этого. Для простоты изложения я использую пример, в котором это не стоит (используя mean функция), но это показывает, из шаблона.

Пример:

Предположим, вы хотите вычислить среднее значение Petal.Length по видам в наборе данных радужной оболочки.

Вы можете сделать это довольно напрямую, используя data.table как:

as.data.table(iris)[by=Species,,.(MPL=mean(Petal.Length))]
      Species   MPL
1:     setosa 1.462
2: versicolor 4.260
3:  virginica 5.552

Но если mean вместо этого было достаточно длительное и дорогое вычисление (возможно, как определено профилированием, хотя иногда это просто "очевидно"), вы можете использовать parallel::mclapply, Поскольку сведение к минимуму взаимодействия со всеми подпроцессами, порождаемые mclapply, могут значительно сократить общие вычисления, вместо передачи выборок из data.table в каждый подпроцесс, вы хотите передать только индексы выбора. Далее, сначала отсортировав таблицу data.table, вы можете передать только диапазон (max и min) этих индексов. Как это:

> o.dt<-as.data.table(iris)[order(Species)] # note: iris happens already to be ordered
> i.dt<-o.dt[,by=Species,.(irange=.(range(.I)))]
> i.dt
      Species  irange
1:     setosa    1,50
2: versicolor  51,100
3:  virginica 101,150


> result<-mclapply(seq(nrow(i.dt)),function(r) o.dt[do.call(seq,as.list(i.dt[r,irange][[1]])),.(MPL=mean(Petal.Length))])
> result
[[1]]
     MPL
1: 1.462

[[2]]
    MPL
1: 4.26

[[3]]
     MPL
1: 5.552

> result.dt<-cbind(i.dt,rbindlist(result))[,-2]
> result.dt
      Species   MPL
1:     setosa 1.462
2: versicolor 4.260
3:  virginica 5.552

Рассматривая шаблон:

  • Заказать вход.
  • Вычислить диапазон индекса для каждой группы.
  • Определить аноним function чтобы извлечь строки, содержащие членов группы, и выполнить необходимые вычисления (в данном случае, среднее).
  • Примените функцию к каждой группе, используя mclapply для индексов строк диапазонов индексов.
  • использование rbindlist чтобы получить результаты в виде data.table, cbind это к входу, и отбросьте это индексные столбцы (если Вы не должны держать их вокруг по некоторой другой причине).

Заметки:

  • Финал rbindlist как правило, дорого и может быть пропущено в зависимости от вашего приложения).

Сделать:

  • убедите команду data.table, что этот шаблон достаточно общий и достаточно полезный, чтобы его могли вызывать дополнительные параметры индексации data.table. Представьте, что передача mc=TRUE вызовет этот шаблон и поддержит дополнительные параллельные опции в...
iris.dt[by=Species,,.(MPL=mean(Petal.Length)), mc=TRUE, mc.preschedule=FALSE, mc.set.seed=TRUE,...]
Другие вопросы по тегам