Ускорьте работу группы data.table с помощью нескольких ядер и параллельного программирования

У меня большой код, и шаг агрегации является текущим узким местом с точки зрения скорости.

В моем коде я хотел бы ускорить шаг группировки данных, чтобы быть быстрее. SNOTE (простой нетривиальный пример) моих данных выглядит так:

library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
   user  system elapsed 
 60.107   3.143  63.534

Это довольно быстро для такого большого объема данных, но в моем случае я все еще ищу дальнейшее ускорение. В моем случае у меня несколько ядер, поэтому я почти уверен, что должен быть способ использовать такие вычислительные возможности.

Я открыт для изменения моего типа данных на объекты data.frame или idata.frame (в теории idata.frame предположительно работает быстрее, чем data.frames).

Я провел некоторое исследование и, похоже, пакет plyr обладает некоторыми параллельными возможностями, которые могут быть полезны, но я все еще пытаюсь понять, как это сделать для группы, которую я пытаюсь сделать. В другом посте SO они обсуждают некоторые из этих идей. Я до сих пор не уверен, насколько больше я достигну с этим распараллеливанием, поскольку он использует функцию foreach. По моему опыту, функция foreach не является хорошей идеей для миллионов быстрых операций, потому что обмен данными между ядрами в конечном итоге замедляет процесс распараллеливания.

2 ответа

Решение

Если у вас есть несколько доступных ядер, почему бы не использовать тот факт, что вы можете быстро фильтровать и группировать строки в data.table, используя его ключ:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

Обратите внимание, что если количество уникальных групп (т.е. length(unique(a))) относительно мала, быстрее будет .combine аргумент, получить результаты обратно в список, а затем вызвать rbindlist по результатам. В моем тестировании на двух ядрах и 8 ГБ ОЗУ порог составлял около 9 000 уникальных значений. Вот что я использовал для сравнения:

# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
  foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3) 
# [1]  1.243 elapsed for N ==  1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
    results <- 
      foreach(x=unique(dt[["a"]])) %dopar% 
         dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
# ------- #
}))), 3)
# [1]  1.117 elapsed for N ==  1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")

Можете ли вы распараллелить агрегацию с data.table? Да.

Стоит ли оно того? Это ключевой момент, который предыдущий ответ не смог выделить.

Как объясняет Matt Dowle Доул в таблицах data.table и параллельных вычислениях, копии ("чанки") необходимо делать перед распределением при параллельном выполнении операций. Это замедляет ход событий. В некоторых случаях, когда вы не можете использовать data.table (например, запуск множества линейных регрессий), стоит разделить задачи между ядрами. Но не агрегация - по крайней мере, когда data.table вовлечен.

Короче говоря (и до тех пор, пока не доказано обратное), с помощью data.table и перестать беспокоиться о потенциальном увеличении скорости, используя doMC, data.table уже агрегирует быстро по сравнению с чем-либо еще доступным, когда дело доходит до агрегации - даже если это не многоядерный!


Вот некоторые тесты, которые вы можете запустить для себя, сравнивая data.table внутренняя агрегация с использованием by с foreach а также mclapply, Результаты перечислены первыми.

#-----------------------------------------------

# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1)  0.007 -- data.table using `by`
# (2)  3.548 -- mclapply with rbindlist
# (3)  5.557 -- foreach with rbindlist
# (4)  5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply

# ----------------------------------------------

library(data.table)

## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")

# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617

## using `lapply`
round(rowMeans(replicate(3, system.time({
    results <- lapply(unique(dt[["a"]]), function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
    })
    rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000

# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
    results <- mclapply(unique(dt[["a"]]),
    function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    }, mc.cores=4)
    rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000


# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4

## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000

## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    results <-
      foreach(x=unique(dt[["a"]])) %dopar%
        dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000

registerDoSEQ()
getDoParWorkers()
# [1] 1
Другие вопросы по тегам