Требуется сделать набор файлов перед запуском функции в конвейере Ruffus
Я использую Ruffus, чтобы написать конвейер. У меня есть функция, которая вызывается параллельно много раз, и она создает несколько файлов. Я хотел бы сделать функцию "combFiles()", которая вызывается после того, как все эти файлы были сделаны. Поскольку они работают параллельно в кластере, они не будут все вместе заканчиваться. Я написал функцию 'getFilenames()', которая возвращает набор имен файлов, которые должны быть созданы, но как я могу заставить Объединить -файл () ждать их появления?
Я попробовал следующее:
@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
# I should only be called if every file in the list 'filenames' exists
Я также попробовал декоратор:
@merge(getFilenames)
но это тоже не работает. Объединение файлов по-прежнему вызывается по ошибке до создания файлов, заданных в getFilenames. Как я могу сделать combFiles условным для тех файлов, которые там находятся?
Благодарю.
1 ответ
Я разработчик Ruffus. Я не уверен, что полностью понимаю, что вы пытаетесь сделать, но здесь идет речь:
Ожидание заданий, которые занимают различное количество времени, чтобы завершить следующую стадию вашего конвейера, - это именно то, чем занимается Раффус, так что, надеюсь, это просто.
Первый вопрос: знаете ли вы, какие файлы создаются заранее, то есть до запуска конвейера? Давайте начнем с предположения, что вы делаете.
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
Давайте напишем фиктивную функцию, которая создает файл каждый раз, когда он вызывается. В Ruffus любые имена входных и выходных файлов содержатся в первых двух параметрах соответственно. У нас нет имени входного файла, поэтому вызовы наших функций должны выглядеть так:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
Определение create_file будет выглядеть так:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
Каждый из этих файлов будет создан за 3 отдельных вызова create_file. Они могут быть запущены параллельно, если вы хотите.
pipeline_run([create_file], multiprocess = 5)
Теперь для объединения файлов. Декоратор "@Merge" действительно настроен именно для этого. Нам просто нужно связать его с предыдущей функцией:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
Это вызовет merge_file только тогда, когда все файлы готовы после трех вызовов create_file().
Весь код выглядит следующим образом:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
И вот результат:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file