Требуется сделать набор файлов перед запуском функции в конвейере Ruffus

Я использую Ruffus, чтобы написать конвейер. У меня есть функция, которая вызывается параллельно много раз, и она создает несколько файлов. Я хотел бы сделать функцию "combFiles()", которая вызывается после того, как все эти файлы были сделаны. Поскольку они работают параллельно в кластере, они не будут все вместе заканчиваться. Я написал функцию 'getFilenames()', которая возвращает набор имен файлов, которые должны быть созданы, но как я могу заставить Объединить -файл () ждать их появления?

Я попробовал следующее:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

Я также попробовал декоратор:

@merge(getFilenames)

но это тоже не работает. Объединение файлов по-прежнему вызывается по ошибке до создания файлов, заданных в getFilenames. Как я могу сделать combFiles условным для тех файлов, которые там находятся?

Благодарю.

1 ответ

Я разработчик Ruffus. Я не уверен, что полностью понимаю, что вы пытаетесь сделать, но здесь идет речь:

Ожидание заданий, которые занимают различное количество времени, чтобы завершить следующую стадию вашего конвейера, - это именно то, чем занимается Раффус, так что, надеюсь, это просто.

Первый вопрос: знаете ли вы, какие файлы создаются заранее, то есть до запуска конвейера? Давайте начнем с предположения, что вы делаете.

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

Давайте напишем фиктивную функцию, которая создает файл каждый раз, когда он вызывается. В Ruffus любые имена входных и выходных файлов содержатся в первых двух параметрах соответственно. У нас нет имени входного файла, поэтому вызовы наших функций должны выглядеть так:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

Определение create_file будет выглядеть так:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

Каждый из этих файлов будет создан за 3 отдельных вызова create_file. Они могут быть запущены параллельно, если вы хотите.

pipeline_run([create_file], multiprocess = 5)

Теперь для объединения файлов. Декоратор "@Merge" действительно настроен именно для этого. Нам просто нужно связать его с предыдущей функцией:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

Это вызовет merge_file только тогда, когда все файлы готовы после трех вызовов create_file().

Весь код выглядит следующим образом:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

И вот результат:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file
Другие вопросы по тегам