Обеспечение зависимости функции для распараллеливания через multidplyr на всех узлах кластера
Я написал функцию, которая помогает мне в подготовке всех узлов кластера, охватываемых multidplyr::get_default_cluster()
для выполнения параллельного задания.
Это работает, но теперь я пропускаю следующий шаг: любые "готовые" функции в multidplyr
(или друзей), который помогает мне определить все функции (и в идеале также пакет) зависимости функции, которую я хочу распараллелить.
В идеале, был бы вызов к чему-то identify_dependencies_of("foo")
который дал бы мне список с элементами
functions
(перечисляя все функции, которые вызываютсяfoo()
звонит стеком)packages
(перечисление всех зависимостей пакета, используемых вfoo()
звонит стеком)
В настоящее время выяснение того, какие зависимости на самом деле необходимы для узлов, полностью "методом проб и ошибок", и я ищу что-то, что упрощает этот процесс.
пример
Определение функции подготовки
# devtools::install_github("hadley/multidplyr")
library(multidplyr)
library(magrittr)
#> Warning: package 'magrittr' was built under R version 3.5.2
prepare_cluster_nodes <- function(
...,
funs = Character(),
pkgs = Character()
) {
# Capture dots -----
dots <- rlang::dots_list(...)
dots_env <- rlang::as_env(dots)
# Initialize cluster -----
cl <- multidplyr::get_default_cluster()
# Variables -----
dots %>%
names() %>%
purrr::map(function(.x) {
value <- rlang::env_get(dots_env, nm = .x)
multidplyr::cluster_assign_value(cl, .x, value)
})
# Functions -----
funs %>%
purrr::map(function(.x) {
value <- get(.x)
multidplyr::cluster_assign_value(cl, .x, value)
})
# Packages -----
pkgs %>%
purrr::map(function(.x) {
multidplyr::cluster_library(cl, .x)
})
cl
}
Определение функций для примерного распараллеленного вызова
# Function defs -----
foo <- function(x, y) {
x %>%
dplyr::mutate(
value = bar(value) * y
)
}
bar <- function(x) x * 1000
Подготовка узлов
# Prepare nodes -----
y <- -1
cl <- prepare_cluster_nodes(
y = y,
funs = c("foo", "bar"),
pkgs = c("dplyr", "ggplot2")
)
#> Initialising 3 core cluster.
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
cl %>% multidplyr::cluster_ls()
#> [[1]]
#> [1] "bar" "foo" "y"
#>
#> [[2]]
#> [1] "bar" "foo" "y"
#>
#> [[3]]
#> [1] "bar" "foo" "y"
Параллельный звонок
# Parallelized call -----
df <- tibble::tibble(id = 1:10000, value = 1:10000)
df %>%
multidplyr::partition(id) %>%
dplyr::do(foo(., y = y))
#> Warning: group_indices_.grouped_df ignores extra arguments
#> Source: party_df [10,000 x 2]
#> Groups: id
#> Shards: 3 [3,333--3,334 rows]
#>
#> # Description: S3: party_df
#> id value
#> <int> <dbl>
#> 1 6 -6000
#> 2 8 -8000
#> 3 9 -9000
#> 4 12 -12000
#> 5 13 -13000
#> 6 16 -16000
#> 7 18 -18000
#> 8 28 -28000
#> 9 33 -33000
#> 10 37 -37000
#> # ... with 9,990 more rows
Создано 2019-02-20 пакетом представлением (v0.2.1)