Улучшение параллельной производительности с помощью пакетной обработки в статико-динамическом ветвящемся конвейере

BLUF: Я изо всех сил пытаюсь понять, как использовать пакетную обработку в пакете целей R для повышения производительности в статическом и динамическом конвейере ветвления, обрабатываемом параллельно с использованием. Я предполагаю, что мне нужно выполнять пакетную обработку в каждой динамической ветке, но я не уверен, как это сделать.

Вот репрезент, который использует динамическое ветвление, вложенное в статическое ветвление, аналогично тому, что делает мой настоящий конвейер. Сначала он статически разветвляется для каждого значения в all_types, а затем динамически разветвляется внутри каждой категории. Этот код создает 1000 ветвей и 1010 целей. В реальном рабочем процессе я явно не использую replicate, а количество динамических ветвей различается в зависимости от type ценность.

      # _targets.r

library(targets)
library(tarchetypes)
library(future)
library(future.callr)

plan(callr)

all_types = data.frame(type = LETTERS[1:10])

tar_map(values = all_types, names = "type",
  tar_target(
    make_data,
    replicate(100,
      data.frame(x = seq(1000) + rnorm(1000, 0, 5),
                 y = seq(1000) + rnorm(1000, 20, 20)),
      simplify = FALSE
    ),
    iteration = "list"
  ),
  tar_target(
    fit_model,
    lm(make_data),
    pattern = map(make_data),
    iteration = "list"
  )
)

А вот временное сравнение tar_make() против tar_make_future() с восемью рабочими:

      # tar_destroy()
t1 <- system.time(tar_make())
# tar_destroy()
t2 <- system.time(tar_make_future(workers = 8))

rbind(serial = t1, parallel = t2)

##          user.self sys.self elapsed user.child sys.child
## serial        2.12     0.11   25.59         NA        NA
## parallel      2.07     0.24  184.68         NA        NA

Я не думаю user или system здесь полезны, так как задание отправляется отдельным процессам R, но elapsed время на параллельное задание примерно в 7 раз больше, чем на последовательное задание.

Я предполагаю, что это замедление вызвано большим количеством целей. Улучшит ли пакетная обработка производительность в этом случае, и если да, то как я могу реализовать пакетную обработку в динамической ветви?

1 ответ

Вы на правильном пути с пакетированием. В вашем случае это вопрос разбиения вашего списка из 100 наборов данных на группы, скажем, по 10 или около того. Вы можете сделать это с помощью вложенного списка наборов данных, но это много работы. К счастью, есть более простой способ.

Ваш вопрос на самом деле очень своевременный. Я только что написал несколько новых целевых фабрик, которые могут помочь. Чтобы получить к ним доступ, вам понадобится версия для разработчиков tarchetypesиз Гитхаба:

      remotes::install_github("ropensci/tarchetypes")

Затем, с tar_map2_count(), будет намного проще составить список из 100 наборов данных для каждого сценария.

      library(targets)
tar_script({
  library(broom)
  library(targets)
  library(tarchetypes)
  library(tibble)

  make_data <- function(n) {
    datasets_per_batch <- replicate(
      100,
      tibble(
        x = seq(n) + rnorm(n, 0, 5),
        y = seq(n) + rnorm(n, 20, 20)
      ),
      simplify = FALSE
    )
    tibble(dataset = datasets_per_batch, rep = seq_along(datasets_per_batch))
  }

  tar_map2_count(
    name = model,
    command1 = make_data(n = rows),
    command2 = tidy(lm(y ~ x, data = dataset)), # Need dataset[[1]] in tarchetypes 0.4.0
    values = data_frame(
      scenario = LETTERS[seq_len(10)],
      rows = seq(10, 100, length.out = 10)
    ),
    columns2 = NULL,
    batches = 10
  )
})
tar_make(reporter = "silent")
#> Warning message:
#> `data_frame()` was deprecated in tibble 1.1.0.
#> Please use `tibble()` instead.
#> This warning is displayed once every 8 hours.
#> Call `lifecycle::last_lifecycle_warnings()` to see where this warning was generated.
tar_read(model)
#> # A tibble: 2,000 × 8
#>    term        estimate std.error statistic   p.value scenario  rows tar_group
#>    <chr>          <dbl>     <dbl>     <dbl>     <dbl> <chr>    <dbl>     <int>
#>  1 (Intercept)   17.1      12.8       1.34  0.218     A           10        10
#>  2 x              1.39      1.35      1.03  0.333     A           10        10
#>  3 (Intercept)    6.42     14.0       0.459 0.658     A           10        10
#>  4 x              1.75      1.28      1.37  0.209     A           10        10
#>  5 (Intercept)   32.8       7.14      4.60  0.00176   A           10        10
#>  6 x             -0.300     1.14     -0.263 0.799     A           10        10
#>  7 (Intercept)   29.7       3.24      9.18  0.0000160 A           10        10
#>  8 x              0.314     0.414     0.758 0.470     A           10        10
#>  9 (Intercept)   20.0      13.6       1.47  0.179     A           10        10
#> 10 x              1.23      1.77      0.698 0.505     A           10        10
#> # … with 1,990 more rows

Создано 10 декабря 2021 г. reprex (v2.0.1)

Существует также tar_map_rep(), что может быть проще, если все ваши наборы данных генерируются случайным образом, но я не уверен, что я переоснащаю ваш вариант использования.

      library(targets)
tar_script({
  library(broom)
  library(targets)
  library(tarchetypes)
  library(tibble)

  make_one_dataset <- function(n) {
    tibble(
      x = seq(n) + rnorm(n, 0, 5),
      y = seq(n) + rnorm(n, 20, 20)
    )
  }

  tar_map_rep(
    name = model,
    command = tidy(lm(y ~ x, data = make_one_dataset(n = rows))),
    values = data_frame(
      scenario = LETTERS[seq_len(10)],
      rows = seq(10, 100, length.out = 10)
    ),
    batches = 10,
    reps = 10
  )
})
tar_make(reporter = "silent")
#> Warning message:
#> `data_frame()` was deprecated in tibble 1.1.0.
#> Please use `tibble()` instead.
#> This warning is displayed once every 8 hours.
#> Call `lifecycle::last_lifecycle_warnings()` to see where this warning was generated.
tar_read(model)
#> # A tibble: 2,000 × 10
#>    term    estimate std.error statistic p.value scenario  rows tar_batch tar_rep
#>    <chr>      <dbl>     <dbl>     <dbl>   <dbl> <chr>    <dbl>     <int>   <int>
#>  1 (Inter…   37.5        7.50     5.00  0.00105 A           10         1       1
#>  2 x         -0.701      1.17    -0.601 0.564   A           10         1       1
#>  3 (Inter…   21.5        9.64     2.23  0.0567  A           10         1       2
#>  4 x         -0.213      1.55    -0.138 0.894   A           10         1       2
#>  5 (Inter…   20.6        9.51     2.17  0.0620  A           10         1       3
#>  6 x          1.40       1.79     0.783 0.456   A           10         1       3
#>  7 (Inter…   11.6       11.2      1.04  0.329   A           10         1       4
#>  8 x          2.34       1.39     1.68  0.131   A           10         1       4
#>  9 (Inter…   26.8        9.16     2.93  0.0191  A           10         1       5
#> 10 x          0.288      1.10     0.262 0.800   A           10         1       5
#> # … with 1,990 more rows, and 1 more variable: tar_group <int>

Создано 10 декабря 2021 г. пакетомпакетом reprex (v2.0.1)

К сожалению, фьючерсы сопряжены с накладными расходами. Может быть, в вашем случае будет быстрее, если вы попробуете tar_make_clustermq()?

Другие вопросы по тегам