Ошибка в десериализации (список соединений [[n]]): ошибка чтения из соединения

Я использую R и запускаю параллельный код на 7 ядрах на машине с Linux.

При использовании небольшого набора данных мой код занимает около 2 часов и работает нормально. При использовании набора данных большего размера, в 6 раз, коды занимают гораздо больше времени (вероятно, потому что их нужно поменять местами), но затем они заканчиваются случайным образом, иногда при 10%, иногда при 18, 20, 30% или около того. Выглядит совершенно случайно. Использование ОЗУ обычно составляет около 90%, использование SWAP <50%.

я использую foreach с doSNOW бэкенд. Это код ошибки:

Error in unserialize(socklist[[n]]) : error reading from connection
Calls: %dopar% ... tryCatch -> tryCatchList -> tryCatchOne -> <Anonymous>
Execution halted

 *** caught bus error ***
address 0x7f829d2adbd0, cause 'non-existent physical address'
An irrecoverable exception occurred. R is aborting now ...

И это файл SNOW, полученный путем установки outfile="outfile.out" в makeCluster вызов:

starting worker for localhost:11567 
Type: EXEC 
Loading required package: MASS
Loading required package: survival
Loading required package: sp

Attaching package: 'raster'

The following objects are masked from 'package:MASS':

    area, select


Attaching package: 'data.table'

The following object is masked from 'package:raster':

    shift

Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 
Type: EXEC 

 *** caught bus error ***
address 0x7ffada863636, cause 'non-existent physical address'

Traceback:
 1: .Call("Rsx_nc4_get_vara_double", as.integer(ncid), as.integer(varid),     as.integer(c.start), as.integer(c.count), fixmiss, imvstate,     as.double(passed_missval), PACKAGE = "ncdf4")
 2: ncvar_get_inner(ncid2use, varid2use, nc$var[[li]]$missval, addOffset,     scaleFact, start = start, count = count, verbose = verbose,     signedbyte = signedbyte, collapse_degen = collapse_degen)
 3: getfun(nc, varid = zvar, start = start, count = count)
 4: .readBrickCellsNetCDF(x, cells, layer, nl)
 5: .cellValues(x, i)
 6: .doExtract(x, i, drop = drop)
 7: por[i]
 8: por[i]
 9: as.vector(por[i])
10: mainF(as.vector(por[i]))
11: eval(expr, envir, enclos)
12: eval(.doSnowGlobals$expr, envir = .doSnowGlobals$exportenv)
13: doTryCatch(return(expr), name, parentenv, handler)
14: tryCatchOne(expr, names, parentenv, handlers[[1L]])
15: tryCatchList(expr, classes, parentenv, handlers)
16: tryCatch(eval(.doSnowGlobals$expr, envir = .doSnowGlobals$exportenv),     error = function(e) e)
17: fun(quote(list(i = 23339L)))
18: do.call("fun", lapply(args, enquote))
19: docall(msg$data$fun, msg$data$args)
20: doTryCatch(return(expr), name, parentenv, handler)
21: tryCatchOne(expr, names, parentenv, handlers[[1L]])
22: tryCatchList(expr, classes, parentenv, handlers)
23: tryCatch(docall(msg$data$fun, msg$data$args), error = handler)
24: doTryCatch(return(expr), name, parentenv, handler)
25: tryCatchOne(expr, names, parentenv, handlers[[1L]])
26: tryCatchList(expr, classes, parentenv, handlers)
27: tryCatch({    msg <- recvData(master)    cat(paste("Type:", msg$type, "\n"))    if (msg$type == "DONE") {        closeNode(master)        break    }    else if (msg$type == "EXEC") {        success <- TRUE        handler <- function(e) {            success <<- FALSE            structure(conditionMessage(e), class = c("snow-try-error",                 "try-error"))        }        t1 <- proc.time()        value <- tryCatch(docall(msg$data$fun, msg$data$args),             error = handler)        t2 <- proc.time()        value <- list(type = "VALUE", value = value, success = success,             time = t2 - t1, tag = msg$data$tag)        sendData(master, value)    }}, interrupt = function(e) NULL)
28: slaveLoop(makeSOCKmaster(master, port))
29: eval(expr, envir, enclos)
30: eval(quote({    master <- "localhost"    port <- ""    snowlib <- Sys.getenv("R_SNOW_LIB")    outfile <- Sys.getenv("R_SNOW_OUTFILE")    args <- commandArgs()    pos <- match("--args", args)    args <- args[-(1:pos)]    for (a in args) {        pos <- regexpr("=", a)        name <- substr(a, 1, pos - 1)        value <- substr(a, pos + 1, nchar(a))        switch(name, MASTER = master <- value, PORT = port <- value,             SNOWLIB = snowlib <- value, OUT = outfile <- value)    }    if (!(snowlib %in% .libPaths())) .libPaths(c(snowlib, .libPaths()))    library(methods)    library(snow)    if (port == "") port <- getClusterOption("port")    sinkWorkerOutput(outfile)    cat("starting worker for", paste(master, port, sep = ":"),         "\n")    slaveLoop(makeSOCKmaster(master, port))}), new.env())
31: eval(expr, envir, enclos)
32: eval(expr, p)
33: eval.parent(substitute(eval(quote(expr), envir)))
34: local({    master <- "localhost"    port <- ""    snowlib <- Sys.getenv("R_SNOW_LIB")    outfile <- Sys.getenv("R_SNOW_OUTFILE")    args <- commandArgs()    pos <- match("--args", args)    args <- args[-(1:pos)]    for (a in args) {        pos <- regexpr("=", a)        name <- substr(a, 1, pos - 1)        value <- substr(a, pos + 1, nchar(a))        switch(name, MASTER = master <- value, PORT = port <- value,             SNOWLIB = snowlib <- value, OUT = outfile <- value)    }    if (!(snowlib %in% .libPaths()))         .libPaths(c(snowlib, .libPaths()))    library(methods)    library(snow)    if (port == "")         port <- getClusterOption("port")    sinkWorkerOutput(outfile)    cat("starting worker for", paste(master, port, sep = ":"),         "\n")    slaveLoop(makeSOCKmaster(master, port))})
An irrecoverable exception occurred. R is aborting now ...

Код использует 7 потоков из 8 локальных машин. Вызов foreach сделан так:

#Packages
packageVec <- c("RcppRoll", "FAdist", "fitdistrplus", "minpack.lm", "raster", "foreach", "data.table")
#Register cluster
cl <- makeCluster(nThreads, outfile=paste0(dischargefile, ".out"))
registerDoSNOW(cl)
#Create progress bar
pb <- txtProgressBar(max = ncells, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
SNOWopts <- list(progress = progress)
#Compute
a <-
    foreach(i=cells, .packages=packageVec, .combine='rbind', .options.snow = SNOWopts) %dopar% {
      mainF(as.vector(por[i]))
    }
stopCluster(cl)

Описание функции mainF()

Трудно произвести MRE для этой функции, так как это сложно. Код здесь, и я опишу, что функция делает ниже. Я хочу подчеркнуть тот факт, что все это прекрасно работает на небольшом наборе данных, но не работает на большом наборе данных, даже если он содержит одни и те же данные небольшого набора данных, реплицированные несколько раз.

mainF() это функция, чей вход, por[i], является 125000 элементным вектором, и чей выходной сигнал составляет 244 элементного вектора. Эта функция в основном выполняет 240 скользящих средств на входном векторе (используя RcppRoll) и принимает максимумы (используя data.table) для каждого временного среза (обычно 15) для каждого среднего значения, соответствует данным для данного распределения (используя FAdist, raster а также fitdistrplus), выполняет еще один цикл для расчета еще 240 значений и подгонки (используя minpack.lm) эти значения для функции, чтобы получить больше параметров. Возвращено 240 элементов и возвращены 4 параметра подгонки (по 2 для каждого подбора).

Как я могу исправить вышеуказанную ошибку? Что это значит?

0 ответов

Другие вопросы по тегам