Применить функцию к нескольким группам, используя функции Rcpp и R

Я пытаюсь применить функцию к нескольким группам / идентификаторам в r, используя foreach пакет. Работа с параллельной обработкой через %dopar%так что мне было интересно, можно ли запустить apply или для части петли в c++ с помощью rcpp или другие пакеты, чтобы сделать это быстрее. Я не знаком с c++ или другие пакеты, которые могут сделать это, поэтому я надеюсь узнать, возможно ли это. Пример кода приведен ниже. Моя фактическая функция длиннее с более чем 20 входами и занимает даже больше времени, чем то, что я публикую

Я ценю помощь.

РЕДАКТИРОВАТЬ: я понял, что мой первоначальный вопрос был расплывчатым, поэтому я постараюсь сделать лучше. У меня есть таблица с данными временных рядов по группам. Каждая группа имеет> 10K строк. Я написал функцию в c++ с помощью rcpp которая фильтрует таблицу по группам и применяет функцию. Я хотел бы просмотреть уникальные группы и объединить результаты, например: rbind использует rcpp чтобы он работал быстрее. Смотрите пример кода ниже (моя реальная функция длиннее)

library(data.table)
library(inline)
library(Rcpp)
library(stringi)
library(Runuran)

# Fake data
DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                   pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = urnorm(180, mean = 500, sd = 1, lb = 5, ub = 1000), 
      Col2 = urnorm(180, mean = 1000, sd = 1, lb = 5, ub = 1000), 
      Col3 = urnorm(180, mean = 300, sd = 1, lb = 5, ub = 1000)), 
  by = Group
  ]

# Rcpp function
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::plugins(cpp11)]]

// [[Rcpp::export]]
DataFrame testFunc(DataFrame df, StringVector ids, double var1, double var2) {

  // Filter by group
  using namespace std;  
  StringVector sub = df["Group"];
  std::string level = Rcpp::as<std::string>(ids[0]);
  Rcpp::LogicalVector ind(sub.size());
  for (int i = 0; i < sub.size(); i++){
    ind[i] = (sub[i] == level);
  }

  // Access the columns
  CharacterVector Group = df["Group"];
  DoubleVector Month = df["Month"];
  DoubleVector Col1 = df["Col1"];
  DoubleVector Col2 = df["Col2"];
  DoubleVector Col3 = df["Col3"];


  // Create calculations
  DoubleVector Cola = Col1 * (var1 * var2);
  DoubleVector Colb = Col2 * (var1 * var2);
  DoubleVector Colc = Col3 * (var1 * var2);
  DoubleVector Cold = (Cola + Colb + Colc);

  // Result summary
  std::string Group_ID = level;
  double SumCol1 = sum(Col1);
  double SumCol2 = sum(Col2);
  double SumCol3 = sum(Col3);
  double SumColAll = sum(Cold);

  // return a new data frame
  return DataFrame::create(_["Group_ID"]= Group_ID, _["SumCol1"]= SumCol1,
                            _["SumCol2"]= SumCol2, _["SumCol3"]= SumCol3, _["SumColAll"]= SumColAll);
}

# Test function
Rcpp::sourceCpp('sample.cpp')
testFunc(df, ids = "BFTHU1315C", var1 = 24, var2 = 76) # ideally I would like to loop through all groups (unique(df$Group))

#     Group_ID  SumCol1 SumCol2  SumCol3  SumColAll
# 1 BFTHU1315C 899994.6 1798561 540001.6 5907129174

Заранее спасибо.

1 ответ

Я бы предложил переосмыслить наш подход. Ваш набор тестовых данных, который, как я полагаю, сопоставим с вашим реальным набором данных, содержит 3e8 строк. Я оцениваю около 10 ГБ данных. Похоже, вы делаете следующее с этими данными:

  • Определить список уникальных идентификаторов (около 5e5)
  • Создайте одну задачу на уникальный идентификатор
  • Каждая из этих задач получает полный набор данных и отфильтровывает все данные, которые не относятся к рассматриваемому идентификатору.
  • Каждая из этих задач добавляет несколько дополнительных столбцов, которые не зависят от идентификатора
  • Каждая из задач делает group_b(ID), но в наборе данных остался только один идентификатор
  • Каждая из задач рассчитывает несколько простых средств

Мне это кажется очень неэффективным в отношении использования памяти. Вообще говоря, для таких проблем вы бы хотели "параллелизм общей памяти", но foreach дает вам только "параллелизм процесса". Недостатком параллелизма процессов является то, что он увеличивает стоимость памяти.

Кроме того, вы отбрасываете весь код группировки и агрегации, который существует в базе данных R / dplyr / data.table / SQL engine / ... Очень маловероятно, что вы или кто-либо, кто читает ваш вопрос здесь, сможет улучшить эти существующие кодовые базы.

Мои предложения:

  • Забудьте о "параллелизме процесса" (пока)
  • Если у вас достаточно оперативной памяти, попробуйте с простым dplyr труба с mutate / group_by / summarize,
  • Если это не достаточно быстро, узнайте, как работает агрегация data.table, который, как известно, быстрее и предлагает "параллелизм совместно используемой памяти" через OpenMP.
  • Если на вашем компьютере недостаточно памяти и выполняется обмен данными, изучите возможности вычислений вне памяти. Лично я бы использовал (встроенную) базу данных.

Чтобы сделать это более явным. Здесь data.table Единственное решение:

library(data.table)
library(stringi)

# Fake data
set.seed(42)
var1 <- 24
var2 <- 76

DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                 pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))
setkey(df, Group)

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = rnorm(180, mean = 500, sd = 1), 
      Col2 = rnorm(180, mean = 1000, sd = 1), 
      Col3 = rnorm(180, mean = 300, sd = 1)), 
  by = Group
  ][, c("Cola", "Colb", "Colc") := .(Col1 * (var1 * var2), 
                                     Col2 * (var1 * var2),
                                     Col3 * (var1 * var2))
    ][, Cold := Cola + Colb + Colc]


# aggregagation
df[, .(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold)), by = Group]

Я добавляю вычисленные столбцы по ссылке. На этапе агрегирования используются функции группировки, предоставляемые data.table, Если ваша агрегация сложнее, вы также можете использовать функцию:

# aggregation function
mySum <- function(Col1, Col2, Col3, Cold) {
  list(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold))
}

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

И если агрегация может быть быстрее при использовании C++ (это не относится к таким вещам, как sum!), вы даже можете использовать это:

# aggregation function in C++
Rcpp::cppFunction('
Rcpp::List mySum(Rcpp::NumericVector Col1, 
                 Rcpp::NumericVector Col2, 
                 Rcpp::NumericVector Col3, 
                 Rcpp::NumericVector Cold) {
    double SumCol1 = Rcpp::sum(Col1);
    double SumCol2 = Rcpp::sum(Col2);
    double SumCol3 = Rcpp::sum(Col3);
    double SumColAll = Rcpp::sum(Cold);             
    return Rcpp::List::create(Rcpp::Named("SumCol1") = SumCol1,
                              Rcpp::Named("SumCol2") = SumCol2,
                              Rcpp::Named("SumCol3") = SumCol3,
                              Rcpp::Named("SumColAll") = SumColAll);
}
')

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

Во всех этих примерах нащупывание и зацикливание оставлено data.table, так как вы ничего не получите, делая это самостоятельно.

Другие вопросы по тегам