Управляйте переменными Nextflow вне скриптов
У меня есть процесс iterate_list. Процесс iterate_list принимает список и что-то делает с каждым элементом в списке. При запуске скрипта требуется два входа. Список и элемент, который он должен обработать (который он получает как потребитель из очереди rabbitmq)
В настоящее время я даю скрипту python весь список, и он перебирает каждый из них, выполняет обработку (как один большой кусок) и возвращается после завершения. Это нормально, однако, если система перезагружается, она запускается заново.
Мне было интересно, как я могу сделать так, чтобы каждый раз, когда мой скрипт python обрабатывал один элемент, он возвращал этот элемент, я удалял его из списка, а затем передавал новый список процессу. Таким образом, в случае перезапуска / сбоя системы nextflow знает, где он остановился, и может продолжить работу оттуда.
import groovy.json.JsonSlurper
def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = []
items = Channel.from(analysis_config.items.keySet())
for (String item : items) {
list_of_items_to_process << item
}
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
"""
}
process signal_completion{
echo true
input:
val typing_cur
script:
"""
echo "all done!"
"""
}
По сути, процесс iterate_list берет один "элемент" из очереди в брокере сообщений. Процесс iterate_list должен выглядеть примерно так:
process iterate_list{
echo true
input:
list_of_items_to_process
output:
val 1 into typing_cur
script:
"""
python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
list_of_items_to_process.remove(<output from python script>)
"""
}
И поэтому для каждого из них он запускается, удаляет обработанный элемент и перезапускается с новым списком.
initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.
1 ответ
Похоже, что вы действительно пытаетесь манипулировать глобальным ArrayList
из процесса Nextflow. AFAIK, нет способа сделать это точно. Вот для чего нужны каналы.
Неясно, действительно ли вам нужно удалить какие-либо элементы из списка элементов для обработки. Nextflow уже может использовать кешированные результаты, используя-resume
вариант. Так почему бы просто не передать полный список и один элемент для обработки?
items = Channel.from(['foo', 'bar', 'baz'])
items.into {
items_ch1
items_ch2
}
process iterate_list{
input:
val item from items_ch1
val list_of_items_to_process from items_ch2.collect()
"""
python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
"""
}
Я могу только догадываться о том, как ваш скрипт Python использует свои аргументы, но если ваш список элементов для обработки является просто заполнителем, вы даже можете ввести один элемент списка элементов для обработки:
items = Channel.from(['foo', 'bar', 'baz'])
process iterate_list{
input:
val item from items
"""
python3.7 process_list_items.py "${item}" '[${item}]'
"""
}