Несколько выходов для ввода одного списка - объединение файлов BAM в Nextflow

Я пытаюсь объединить x файлов bam, созданных путем одновременного выполнения нескольких выравниваний (в пакетах из y файлов fastq), в один файл bam в Nextflow.

Пока у меня есть следующее при выполнении выравнивания и сортировки/индексации полученного файла bam:

      //Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        val dirString from dirStr
        val runString from stringRun
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        val runString into stringRun1
        file("${batchFastq}.bam") into bamFiles
        val dirString into dirStrSam

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

Где ${batchFastq}.bamпредставляет собой bam-файл, содержащий пакет из y файлов fastq.

Однако этот конвейер завершается нормально при попытке выполнить samtools mergeдля этих bam-файлов в другом процессе (samToolsMerge) процесс запускается каждый раз при выполнении выравнивания (в данном случае 4), а не один раз для всех собранных bam-файлов:

      //Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        val runString from stringRun1
        file bamFile from bamFiles.collect()
        val dirString from dirStrSam

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """
}

С выходом:

      executor >  lsf (9)
[49/182ec0] process > catFastqs (1)     [100%] 1 of 1 ✔
[-        ] process > nanoPlotSummary   -
[0e/609a7a] process > miniMap2Bam (1)   [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔




Completed at: 04-Mar-2021 14:54:21
Duration    : 5m 41s
CPU hours   : 0.2
Succeeded   : 9

Как я могу взять только полученные бам-файлы из miniMap2Bamи запустить их через samToolsMergeодин раз вместо того, чтобы процесс выполнялся несколько раз?

Заранее спасибо!

РЕДАКТИРОВАТЬ: Благодаря Pallie в комментариях ниже проблема заключалась в том, что значения runString и dirString из предыдущего процесса передаются в miniMap2Bam, а затем в samToolsMerge, что приводит к тому, что процесс повторяется каждый раз, когда значение передается.

Решение было таким же простым, как удаление vals из miniMap2Bam (следующим образом):

      //Run minimap2 on concatenated fastqs
process miniMap2Bam {
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

1 ответ

Самое простое исправление, вероятно, состоит в том, чтобы прекратить передачу статической строки каталога и строки запуска по каналам:

      // Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
dirString = file("/path/to/fastqs/")
runString = file("/path/to/fastqs/").getParent()
fastqBatch = Channel.from("/path/to/fastqs/")

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        file bamFile from bamFiles.collect()

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """

Другие вопросы по тегам