Как мне собрать упорядоченные результаты процесса Nextflow?

Я хочу собрать результаты процесса Nextflow в том же порядке, в котором они были введены.

Я знаю, что могу просто передавать значения из всех каналов через все процессы. Это гарантирует, что пары будут переданы всем процессам вместе. Однако это решение не работает, когда вы начинаете добавлять несколько процессов, потому что это нарушает способность этих процессов работать параллельно. Например, в приведенном примере кода, если вы добавляете процесс add_twenty, а затем собираете выходные данные от add_ten, add_twenty и vals2.

Другое возможное решение, с которым я работал, заключалось в добавлении ключа к каждому значению в исходных каналах, что по сути превращает исходные каналы в словарь (то есть хэш). Но я не мог заставить это работать. Если нужно, могу привести пример.

Я создал игрушечный пример, в котором я создаю два канала, отправляю один в процесс, а затем отправляю обработанный вывод и один из исходных каналов новому процессу.

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)


process add_ten {
    input:
    val(vals1)

    output:
    val(new_int) into new_vals1

    exec:
    new_int = vals1 + 10
}

process pair {
    echo true

    input:
    val(new_vals1)
    val(vals2)

    script:
    """
    echo "${new_vals1}, ${vals2}"
    """
}

Я надеялся увидеть что-то вроде этого, где цифры совпадают:

11, 1
12, 2
13, 3
14, 4
15, 5

Даже если эти строки будут перемешаны, все будет нормально, пока пары сохраняются. Например,

14, 4
11, 1
13, 3
15, 5
12, 2

Однако я вижу следующее:

15, 1
13, 2
11, 3
12, 4
14, 5

1 ответ

Решение

Вы можете сделать это с помощью кортежей и оператора объединения nextflow:

https://www.nextflow.io/docs/latest/operator.html

Это пример:

vals1 = Channel.from([1, 'the'], [2, 'brown'], [3, 'jumps'], [4, 'a'], [5, 'fox'])
vals2 = Channel.from([5,'.'], [4, 'lazy'], [3, 'over'], [2, 'fox'], [1, 'quick'])

vals1
  .combine(vals2, by: 0)
  .println()

Когда вы запустите это, используйте опцию -ansi-log false. Ваш пример с некоторыми изменениями выглядит следующим образом:

vals1 = Channel.from(1,2,3,4,5)
vals2 = Channel.from(1,2,3,4,5)

i=0; vals1.map{[i++, it]}.view().set{keyed_vals1}
j=0; vals2.map{[j++, it]}.view().set{keyed_vals2}

process add_ten {

  input: set val(key), val(vals1) from keyed_vals1
  output: set val(key), val(new_vals1) into new_vals1

  exec: new_vals1 = vals1 + 10
}

process pair {
  echo true
  tag "$key $one $two"

  input: set val(key), val(one), val(two) from new_vals1.combine(keyed_vals2, by: 0).view()

  script: "echo '${key} ${one} ${two}'"
}
Другие вопросы по тегам