R - doRedis - перезаписать getTask для управления порядком выполнения в параллельных циклах foreach.
Проблема: мне нужно контролировать порядок выполнения, в котором задачи обрабатываются параллельно циклом foreach. К сожалению, это не поддерживается foreach.
Решение на ум: использование doRedis для использования базы данных для хранения всех задач, которые выполняются в цикле foreach. Чтобы контролировать порядок, я хочу перезаписать getTask с помощью setGetTask, чтобы получить задачи, основанные на предварительно указанном порядке. Хотя я не мог найти много документации о том, как это сделать.
Дополнительная информация:
В setGetTask есть небольшой абзац с примером в документации redis.
getTask <- function ( queue , job_id , ...) { key <- sprintf(" redisEval("local x=redis.call('hkeys',KEYS[1])[1]; if x==nil then return nil end; local ans=redis.call('hget',KEYS[1],x); redis.call('hdel',KEYS[1],x);i return ans",key) } setGetTask(getTask)
Хотя я думаю, что код в документации синтаксически неверен (отсутствует imho "и закрывающая скобка")"). Я подумал, что это невозможно в CRAN, так как код для документации выполняется при представлении.
Изменение функции getTask ничего не меняет в отношении рабочих, получающих задачи (даже если вводит очевидный бессмысленный смысл в redisEval, например, изменяя его на redisEval("dddddddddd((("))
У меня был доступ к функции setGetTask только после установки пакета из исходного кода (который я скачал со официальной страницы пакета CRAN версии 1.1.1 (что imho не должно иметь никакого значения, чем установка его непосредственно из CRAN)
Данные: Dataframe задач для выполнения выглядит следующим образом:
taskName;taskQueuePosition;parameter1;paramterN
taskT;1;val1;10
taskK;2;val2;8
taskP;3;val3;7
taskA;4;val4;7
Я хочу использовать 'taskQueuePosition' для управления порядком, задачи с меньшими числами должны выполняться в первую очередь.
Вопросы:
- Кто-нибудь знает источники, где я могу получить больше информации о том, как сделать это с помощью doRedis или setGetTask?
- Кто-нибудь знает, как мне нужно изменить getTask для достижения вышеописанного?
- Любые другие умные идеи, чтобы управлять порядком выполнения в цикле foreach? Желательно, чтобы в какой-то момент я мог использовать doRedis в качестве параллельного бэкенда (изменение этого означало бы значительное изменение в обработке из-за сложных технических причин инфраструктуры).
Код (для удобства воспроизведения):
Далее предполагается, что redis-сервер запущен на локальной машине.
Redis DB Filling:
library(doRedis)
library(foreach)
options('redis:num'=TRUE) # needed for proper execution
REDIS_JOB_QUEUE = "jobs"
registerDoRedis(REDIS_JOB_QUEUE)
# filling up the data frame
taskDF = data.frame(taskName=c("taskT","taskK","taskP","taskA"),
taskQueuePosition=c(1,2,3,4),
parameter1=c("val1","val2","val3","val4"),
parameterN=c(10,8,7,7))
foreach(currTask=iter(taskDF, by='row'),
.verbose = T
) %dopar% {
print(paste("Executing task: ",currTask$taskName))
Sys.sleep(currTask$parameterN)
}
removeQueue(REDIS_JOB_QUEUE)
рабочий:
library(doRedis)
REDIS_JOB_QUEUE = "jobs"
startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)
1 ответ
Я мог бы решить проблему и теперь могу контролировать порядок выполнения задач.
Дополнительная информация:
1. В документации есть опечатка, из-за которой пример getTask не работает. Учитывая форму функции default_getTask из файла task.R в пакете, она должна выглядеть примерно так:
getTaskDefault <- function ( queue , job_id , ...)
{
key <- sprintf("%s:%s",queue, job_id)
return(redisEval("local x=redis.call('hkeys',KEYS[1])[1];
if x==nil then return nil end;
local ans=redis.call('hget',KEYS[1],x);
redis.call('set', KEYS[1] .. '.start.' .. x, x);
redis.call('hdel',KEYS[1],x);
return ans",key))
}
Кажется, что буквы за первым знаком процента в первой строке функции потерялись. Это объясняет неравномерное количество скобок и кавычек.
2) setGetTask до сих пор не имеет никакого эффекта для меня. Когда я устанавливаю функцию getTask через.option, когда БД заполнена (как это описано в виньетке пакета), она успешно вызывается.
3) Информация о 2) означает, что мне не нужна функция getTask, поэтому я могу использовать пакет из CRAN.
----- Вопросы -----
1) Виньетка doRedis описывает, как можно успешно установить пользовательскую getTask.
2 и 3) Когда скрипт LUA в функции getTask изменяется, как показано ниже, задачи извлекаются из базы данных в порядке их отправки. Это не совсем то, о чем я просил, но из-за нехватки времени и того факта, что у меня есть (или, что лучше, было), не первая идея о сценарии LUA, это хорошее решение для управления порядком отправки в столбце taskQueuePosition.
getTaskInOrder <- function ( queue , job_id , ...)
{
key <- sprintf("%s:%s",queue, job_id)
return(redisEval("
local tasks=redis.call('hkeys',KEYS[1]); -- get all tasks
local x=tasks[1]; -- get first task available task
if x==nil then -- if there are no tasks left, stop processing
return nil
end;
local xMin = 65535; -- if we have more tasks than 65535, getting the
-- task with the lowest taskID is not guaranteed to be the first one
local i = 1;
-- local iMinFound = -1;
while (x ~= nil) do -- search the array until there are no tasks left
-- print('x: ',x)
local xNum = tonumber(x);
if(xNum<xMin) then
xMin = xNum;
-- iMinFound = i;
end
i=i+1;
-- print('i is now: ',i);
x=tasks[i];
end
-- print('Minimum is task number',xMin,' found at i ', iMinFound)
x=tostring(xMin) -- convert it back to a string (maybe it would
-- be better to keep the original string somewhere,
-- in case we loose some information whilst converting to number)
-- print('x is now:',x);
-- print(KEYS[1] .. '.start.' .. x, x);
-- print('');
local ans=redis.call('hget',KEYS[1],x);
redis.call('set', KEYS[1] .. '.start.' .. x, x);
redis.call('hdel',KEYS[1],x);
return ans",key))
}
Важное примечание: я заметил, что если задача отменяется, порядок облажается, и повторно отправленная задача (даже если номер задачи остается прежним) будет выполнена после первоначально отправленных задач. Это нормально для меня.
------ Код (для удобного воспроизведения):------
Это приводит к следующему примеру кода (с 12 записями во фрейме данных задачи вместо оригинальных 4):
Redis DB Filling:
library(doRedis)
library(foreach)
options('redis:num'=TRUE) # needed for proper execution
REDIS_JOB_QUEUE = "jobs"
getTaskInOrder <- function ( queue , job_id , ...)
{
...like above
}
registerDoRedis(REDIS_JOB_QUEUE)
# filling up the data frame already in order of tasks to be executed
# otherwise the dataframe has to be sorted by taskQueuePosition
taskDF = data.frame(taskName=c("taskA","taskB","taskC","taskD","taskE","taskF","taskG","taskH","taskI","taskJ","taskK","taskL"),
taskQueuePosition=c(1,2,3,4,5,6,7,8,9,10,11,12),
parameter1=c("val1","val2","val3","val4","val1","val2","val3","val4","val1","val2","val3","val4"),
parameterN=c(5,5,5,4,4,4,4,3,3,3,2,2))
foreach(currTask=iter(taskDF, by='row'),
.verbose = T,
.options.redis = list(getTask = getTaskInOrder
) %dopar% {
print(paste("Executing task: ",currTask$taskName))
Sys.sleep(currTask$parameterN)
}
removeQueue(REDIS_JOB_QUEUE)
рабочий:
library(doRedis)
REDIS_JOB_QUEUE = "jobs"
startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE)
Еще одно замечание: на случай, если вы обрабатываете длинные задания, как я, обратите внимание на ошибку в redis 1.1.1 (текущая версия в CRAN), которая приводит к повторной отправке задач (из-за тайм-аута), несмотря на то, что работники все еще работают на них.