R - doRedis - перезаписать getTask для управления порядком выполнения в параллельных циклах foreach.

Проблема: мне нужно контролировать порядок выполнения, в котором задачи обрабатываются параллельно циклом foreach. К сожалению, это не поддерживается foreach.

Решение на ум: использование doRedis для использования базы данных для хранения всех задач, которые выполняются в цикле foreach. Чтобы контролировать порядок, я хочу перезаписать getTask с помощью setGetTask, чтобы получить задачи, основанные на предварительно указанном порядке. Хотя я не мог найти много документации о том, как это сделать.

Дополнительная информация:

  1. В setGetTask есть небольшой абзац с примером в документации redis.

    getTask <- function ( queue , job_id , ...)
    {
    
      key <- sprintf("
      redisEval("local x=redis.call('hkeys',KEYS[1])[1];
                   if x==nil then return nil end;
                   local ans=redis.call('hget',KEYS[1],x);
                   redis.call('hdel',KEYS[1],x);i
                   return ans",key)
    }
    
    setGetTask(getTask)
    

    Хотя я думаю, что код в документации синтаксически неверен (отсутствует imho "и закрывающая скобка")"). Я подумал, что это невозможно в CRAN, так как код для документации выполняется при представлении.

  2. Изменение функции getTask ничего не меняет в отношении рабочих, получающих задачи (даже если вводит очевидный бессмысленный смысл в redisEval, например, изменяя его на redisEval("dddddddddd((("))

  3. У меня был доступ к функции setGetTask только после установки пакета из исходного кода (который я скачал со официальной страницы пакета CRAN версии 1.1.1 (что imho не должно иметь никакого значения, чем установка его непосредственно из CRAN)

Данные: Dataframe задач для выполнения выглядит следующим образом:

taskName;taskQueuePosition;parameter1;paramterN
taskT;1;val1;10
taskK;2;val2;8
taskP;3;val3;7
taskA;4;val4;7

Я хочу использовать 'taskQueuePosition' для управления порядком, задачи с меньшими числами должны выполняться в первую очередь.

Вопросы:

  1. Кто-нибудь знает источники, где я могу получить больше информации о том, как сделать это с помощью doRedis или setGetTask?
  2. Кто-нибудь знает, как мне нужно изменить getTask для достижения вышеописанного?
  3. Любые другие умные идеи, чтобы управлять порядком выполнения в цикле foreach? Желательно, чтобы в какой-то момент я мог использовать doRedis в качестве параллельного бэкенда (изменение этого означало бы значительное изменение в обработке из-за сложных технических причин инфраструктуры).

Код (для удобства воспроизведения):

Далее предполагается, что redis-сервер запущен на локальной машине.

Redis DB Filling:

library(doRedis)
library(foreach)

options('redis:num'=TRUE) # needed for proper execution

REDIS_JOB_QUEUE = "jobs"
registerDoRedis(REDIS_JOB_QUEUE)

# filling up the data frame
taskDF = data.frame(taskName=c("taskT","taskK","taskP","taskA"),
           taskQueuePosition=c(1,2,3,4),
           parameter1=c("val1","val2","val3","val4"),
           parameterN=c(10,8,7,7))

foreach(currTask=iter(taskDF, by='row'), 
        .verbose = T
) %dopar% {
  print(paste("Executing task: ",currTask$taskName))
  Sys.sleep(currTask$parameterN)
}

removeQueue(REDIS_JOB_QUEUE)

рабочий:

library(doRedis)
REDIS_JOB_QUEUE = "jobs"

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)

1 ответ

Решение

Я мог бы решить проблему и теперь могу контролировать порядок выполнения задач.

Дополнительная информация:

1. В документации есть опечатка, из-за которой пример getTask не работает. Учитывая форму функции default_getTask из файла task.R в пакете, она должна выглядеть примерно так:

getTaskDefault <- function ( queue , job_id , ...)
{
  key <- sprintf("%s:%s",queue, job_id)
  return(redisEval("local x=redis.call('hkeys',KEYS[1])[1];
                   if x==nil then return nil end;
                   local ans=redis.call('hget',KEYS[1],x);
                   redis.call('set', KEYS[1] .. '.start.' .. x, x);
                   redis.call('hdel',KEYS[1],x);
                   return ans",key))
}

Кажется, что буквы за первым знаком процента в первой строке функции потерялись. Это объясняет неравномерное количество скобок и кавычек.

2) setGetTask до сих пор не имеет никакого эффекта для меня. Когда я устанавливаю функцию getTask через.option, когда БД заполнена (как это описано в виньетке пакета), она успешно вызывается.

3) Информация о 2) означает, что мне не нужна функция getTask, поэтому я могу использовать пакет из CRAN.

----- Вопросы -----

1) Виньетка doRedis описывает, как можно успешно установить пользовательскую getTask.

2 и 3) Когда скрипт LUA в функции getTask изменяется, как показано ниже, задачи извлекаются из базы данных в порядке их отправки. Это не совсем то, о чем я просил, но из-за нехватки времени и того факта, что у меня есть (или, что лучше, было), не первая идея о сценарии LUA, это хорошее решение для управления порядком отправки в столбце taskQueuePosition.

getTaskInOrder <- function ( queue , job_id , ...)
{

  key <- sprintf("%s:%s",queue, job_id)
  return(redisEval("

        local tasks=redis.call('hkeys',KEYS[1]); -- get all tasks

        local x=tasks[1];           -- get first task available task
        if x==nil then              -- if there are no tasks left, stop processing
          return nil 
        end;  

        local xMin = 65535;         -- if we have more tasks than 65535, getting the 
        -- task with the lowest taskID is not guaranteed to be the first one
        local i = 1;
        -- local iMinFound = -1;
        while (x ~= nil) do         -- search the array until there are no tasks left
        -- print('x: ',x)
        local xNum = tonumber(x);
        if(xNum<xMin) then
          xMin = xNum;
          -- iMinFound = i;
        end
        i=i+1;
        -- print('i is now: ',i);
        x=tasks[i];
        end
        -- print('Minimum is task number',xMin,' found at i ', iMinFound)
        x=tostring(xMin)            -- convert it back to a string (maybe it would 
                                    -- be better to keep the original string somewhere, 
                                    -- in case we loose some information whilst converting to number)

        -- print('x is now:',x);
        -- print(KEYS[1] .. '.start.' .. x, x);
        -- print('');
        local ans=redis.call('hget',KEYS[1],x);
        redis.call('set', KEYS[1] .. '.start.' .. x, x);
        redis.call('hdel',KEYS[1],x);
        return ans",key))
}

Важное примечание: я заметил, что если задача отменяется, порядок облажается, и повторно отправленная задача (даже если номер задачи остается прежним) будет выполнена после первоначально отправленных задач. Это нормально для меня.

------ Код (для удобного воспроизведения):------

Это приводит к следующему примеру кода (с 12 записями во фрейме данных задачи вместо оригинальных 4):

Redis DB Filling:

library(doRedis)
library(foreach)

options('redis:num'=TRUE) # needed for proper execution

REDIS_JOB_QUEUE = "jobs"

getTaskInOrder <- function ( queue , job_id , ...)
{
  ...like above
}

registerDoRedis(REDIS_JOB_QUEUE)

# filling up the data frame already in order of tasks to be executed
# otherwise the dataframe has to be sorted by taskQueuePosition
taskDF = data.frame(taskName=c("taskA","taskB","taskC","taskD","taskE","taskF","taskG","taskH","taskI","taskJ","taskK","taskL"),
       taskQueuePosition=c(1,2,3,4,5,6,7,8,9,10,11,12),
       parameter1=c("val1","val2","val3","val4","val1","val2","val3","val4","val1","val2","val3","val4"),
       parameterN=c(5,5,5,4,4,4,4,3,3,3,2,2))

foreach(currTask=iter(taskDF, by='row'), 
        .verbose = T,
        .options.redis = list(getTask = getTaskInOrder
) %dopar% {
  print(paste("Executing task: ",currTask$taskName))
  Sys.sleep(currTask$parameterN)
}

removeQueue(REDIS_JOB_QUEUE)

рабочий:

library(doRedis)
REDIS_JOB_QUEUE = "jobs"

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)

Еще одно замечание: на случай, если вы обрабатываете длинные задания, как я, обратите внимание на ошибку в redis 1.1.1 (текущая версия в CRAN), которая приводит к повторной отправке задач (из-за тайм-аута), несмотря на то, что работники все еще работают на них.

Другие вопросы по тегам