Как мне собрать упорядоченные результаты процесса Nextflow?
Я хочу собрать результаты процесса Nextflow в том же порядке, в котором они были введены.
Я знаю, что могу просто передавать значения из всех каналов через все процессы. Это гарантирует, что пары будут переданы всем процессам вместе. Однако это решение не работает, когда вы начинаете добавлять несколько процессов, потому что это нарушает способность этих процессов работать параллельно. Например, в приведенном примере кода, если вы добавляете процесс add_twenty, а затем собираете выходные данные от add_ten, add_twenty и vals2.
Другое возможное решение, с которым я работал, заключалось в добавлении ключа к каждому значению в исходных каналах, что по сути превращает исходные каналы в словарь (то есть хэш). Но я не мог заставить это работать. Если нужно, могу привести пример.
Я создал игрушечный пример, в котором я создаю два канала, отправляю один в процесс, а затем отправляю обработанный вывод и один из исходных каналов новому процессу.
vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)
process add_ten {
input:
val(vals1)
output:
val(new_int) into new_vals1
exec:
new_int = vals1 + 10
}
process pair {
echo true
input:
val(new_vals1)
val(vals2)
script:
"""
echo "${new_vals1}, ${vals2}"
"""
}
Я надеялся увидеть что-то вроде этого, где цифры совпадают:
11, 1
12, 2
13, 3
14, 4
15, 5
Даже если эти строки будут перемешаны, все будет нормально, пока пары сохраняются. Например,
14, 4
11, 1
13, 3
15, 5
12, 2
Однако я вижу следующее:
15, 1
13, 2
11, 3
12, 4
14, 5
1 ответ
Вы можете сделать это с помощью кортежей и оператора объединения nextflow:
https://www.nextflow.io/docs/latest/operator.html
Это пример:
vals1 = Channel.from([1, 'the'], [2, 'brown'], [3, 'jumps'], [4, 'a'], [5, 'fox'])
vals2 = Channel.from([5,'.'], [4, 'lazy'], [3, 'over'], [2, 'fox'], [1, 'quick'])
vals1
.combine(vals2, by: 0)
.println()
Когда вы запустите это, используйте опцию -ansi-log false
. Ваш пример с некоторыми изменениями выглядит следующим образом:
vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)
i=0; vals1.map{[i++, it]}.view().set{keyed_vals1}
j=0; vals2.map{[j++, it]}.view().set{keyed_vals2}
process add_ten {
input: set val(key), val(vals1) from keyed_vals1
output: set val(key), val(new_vals1) into new_vals1
exec: new_vals1 = vals1 + 10
}
process pair {
echo true
tag "$key $one $two"
input: set val(key), val(one), val(two) from new_vals1.combine(keyed_vals2, by: 0).view()
script: "echo '${key} ${one} ${two}'"
}