shell 如何使用Nextflow dsl 2处理来自S3的多个输入(yaml/json)

svujldwt  于 2023-05-18  发布在  Shell
关注(0)|答案(1)|浏览(190)

我需要在aws batch中使用nextflow(dsl 2)管道处理超过1 k个样本。当前版本的工作流处理每次运行的单个输入。我正在寻找工作流语法(Map元组迭代)来处理多个输入,以在parralel中运行。输入应该是json或yaml格式,每个样本的输入文件路径是唯一的。
为了保留输入文件路径"s3://...",我在channel中使用了.fromPath
以下是我的单个输入配置示例input.yaml-parms-file

id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz

运行单个样本输入的工作流

process samtools_stats {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/samtools", mode: "copy"

    input:
    path bam
    
    output:
    path "${id}.stats"

    script:
    """
      samtools stats ${bam} > ${id}.stats
    """
}

process mosdepth_bam {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"

    input:
    path bam
    path bam_idx

    output:
    path "${id}.regions.bed.gz"

    script:
    """
    mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 ${id} ${bam}
    """
}

process mosdepth_cram {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"

    input:
    path bam
    path bam_idx
    path reference
    path reference_idx

    output:
    path "${id}.regions.bed.gz"

    script:
    """
    mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 --fasta ${reference} ${id} ${bam}

    """
}

process bcftools_stats {
    tag "${id}"
    publishDir "${params.publishdir}/${id}/bcftools", mode: "copy"

    input:
    path vcf
    path vcf_idx

    output:
    path "*"

    script:
    """
    bcftools stats -f PASS ${vcf} > ${id}.pass.stats
    """
}

process multiqc {

    tag "${id}"
    publishDir "${params.publishdir}/${id}/multiqc", mode: "copy"
    
    input:
    path "*"

    output:
    path  "multiqc_data/*", emit: multiqc_ch

    script:
    """
    multiqc . --data-format json --enable-npm-plugin
    """
}

process compile_metrics {

    tag "${id}"
    publishDir "${params.publishdir}/${id}", mode: "copy"

    input:
    path multiqc

    output:
    path "${params.id}.metrics.json", emit: compile_metrics_out

    script:
    """
    # parse and calculate all the metrics in the multiqc output to compile

    compile_metrics.py \
        --multiqc_json multiqc_data.json \
        --output_json ${params.id}.metrics.json \
        --biosample_id ${params.id}
    """
}

/*
----------------------------------------------------------------------
WORKFLOW
---------------------------------------------------------------------
*/

id = params.id

aln_file = file ( params.bam )
aln_file_type = aln_file.getExtension()
vcf_file = ( params.vcf )
vcf_idx = channel.fromPath(params.vcf + ".tbi", checkIfExists: true)

if (aln_file_type == "bam") {
    cbam = channel.fromPath(params.bam, checkIfExists: true)
    cbam_idx = channel.fromPath(params.bam + ".bai", checkIfExists: true)
}
else if (aln_file_type == "cram") {
    cbam = channel.fromPath(params.bam, checkIfExists: true)
    cbam_idx = channel.fromPath(params.bam + ".crai", checkIfExists: true)
}

reference = channel.fromPath(params.reference, checkIfExists: true)
reference_idx = channel.fromPath(params.reference + ".fai", checkIfExists: true)

// main
workflow {
    if (aln_file_type == "bam") {
        samtools_stats( bam )
        mosdepth_bam( bam, bam_idx )
        bcftools_stats ( vcf, vcf_idx )
        multiqc( samtools_stats.out.mix( mosdepth_bam.out ).collect() )
        compile_metrics(multiqc.out)
    } else if (aln_file_type == "cram") {
        samtools_stats( bam )
        mosdepth_cram( bam, bam_idx, reference, reference_idx )
        bcftools_stats ( vcf, vcf_idx )
        multiqc( samtools_stats.out.mix( mosdepth_cram.out ).collect() )
        compile_metrics(multiqc.out)
    }
}

我想修改工作流,以便在以下多个样本输入中以并行方式运行

samples:
-
 id: HLA1001
 bam: s3://HLA/data/udf/HLA1001.bam
 vcf: s3://HLA/data/udf/HLA1001.vcf.gz
-
 id: NHLA1002
 bam: s3://HLA/data/sdd/HLA1002.bam
 vcf: s3://HLA/data/sdd/HLA1002.vcf.gz
-
 id: NHLA1003
 bam: s3://HLA/data/klm/HLA1003.bam
 vcf: s3://HLA/data/klm/HLA1003.vcf.gz
-
 id: NHLA2000
 bam: s3://HLA/data/rcb/HLA2000.bam
 vcf: s3://HLA/data/rcb/HLA2000.vcf.gz

多个示例的预期最终输出文件夹结构。

s3://mybucket/results/HLA1001/
samtools/
mosdepth/
bcftools/
multiqc/
HLA1001.metrics.json

s3://mybucket/results/HLA1002/
samtools/
mosdepth/
bcftools/
multiqc/
HLA1002.metrics.json

bam/cram、vcf的输入以及multiqc和compile_metrics的输入都必须在每个进程中获取相同的样本。
感谢你的帮助!谢谢
按照@steve回答的方法。
main.nf的内容:更新

include { compile_metrics } from './modules/compile_metrics'

    Channel
        .fromList( params.samples )
        .map { it.biosample_id }
        .set { sample_ids }

    compile_metrics ( sample_ids, multiqc.out.json_data )
}

modules/compile_metrics/main.nf的内容:

process compile_metrics {

    tag { sample_ids }

    input:
    val(sample_ids)
    path "multiqc_data.json"

    output:
    tuple val(sample_ids), path("${sample_ids}.metrics.json"), emit: compile_metrics_out

    script:
    """
    compile_metrics.py \
        --multiqc_json multiqc_data.json \
        --output_json "${sample_ids}.metrics.json" \\
        --biosample_id "${sample_ids}" \\
    """
}

在另一个进程中,mosdepth_datamash应用datamash处理mosdepth的一个输出时,未能识别compile_metrics中使用的同一个变量sample_ids

ERROR~无此变量:sample_ids

main.nf的内容:

include { mosdepth_datamash } from './modules/mosdepth_datamash'

autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )

Channel.fromList( params.samples )
       .map { it.biosample_id }
       .set { sample_ids }

compile_metrics ( sample_ids, multiqc.out.json_data )
 
mosdepth_datamash( sample_ids, mosdepth.out.regions, autosomes_non_gap_regions ) 

.mix( mosdepth_datamash.out.coverage ) // multiqc( log_files )

modules/mosdepth_datamash/main.nf的内容:

process mosdepth_datamash {

    tag { sample_ids }

    input:
    val(sample_ids)
    path autosomes_non_gap_regions
    path "${sample_ids}.regions.bed.gz"

    output:
    tuple val(sample_ids), path("${sample_ids}.mosdepth.csv"), emit: coverage

    script:
    """

    zcat "${sample_ids}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample_ids}.regions.autosomes_non_gap_n_bases.bed.gz"
    .......
    .......
    """
}

更新main.nf

include { mosdepth_datamash } from './modules/mosdepth_datamash'

autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )

mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ).collect() )

更新mosdepth_datamash

process mosdepth_datamash {

    tag { sample }

    input:
    path autosomes_non_gap_regions
    tuple val(sample), path(regions)

    output:
    tuple val(sample), path("${sample}.mosdepth.csv"), emit: coverage

    script:
    """
    zcat "${sample}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample}.regions.autosomes_non_gap_n_bases.bed.gz"
    .....
}

**警告:**警告:输入元组与进程mosdepth_datamash声明的输入集基数不匹配--违规值:[NA12878-chr14, /home/ubuntu/sample-qc/tests/NA12878-chr14_1000genomes-dragen-3.7.6/work/34/7f9ec33ef353c0276f71d2d7248ff2/NA12878-chr14.regions.bed.gz, NA12878-chr15, /home/ubuntu/sample-qc/tests/NA12878-chr15_1000genomes-dragen-3.7.6/work/0b/533ee8c0b8ec47418990a6d05f2f3c/NA12878-chr15.regions.bed.gz]

N E X T F L O W  ~  version 23.04.1
......
executor >  local (5)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14)       [100%] 2 of 2 ✔
executor >  local (5)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14)       [100%] 2 of 2 ✔
executor >  local (9)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14)       [100%] 2 of 2 ✔
[-        ] process > mosdepth_cram                           -
[d6/3fb2e2] process > samtools_stats_bam (NA12878-chr15) [100%] 2 of 2 ✔
[-        ] process > samtools_stats_cram                     -
[3e/031f1b] process > mosdepth_datamash (NA12878-chr14)  [100%] 1 of 1 ✔
[4a/7e44ca] process > verifybamid2_bam (NA12878-chr14)   [100%] 1 of 1  failed: 1✔
[-        ] process > verifybamid2_cram                       -
[21/4a221a] process > multiqc                                 [100%] 1 of 1 ✔
[54/e19285] process > compile_metrics (NA12878-chr14)    [100%] 2 of 2 ✔
Started     : 2023-05-16T05:39:06.759097Z
Completed   : 2023-05-16T05:41:11.063286Z

更新main.nf:修复-使用queue通道而不是collect

mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ) )

main.nf`verifybamid`的另一个问题

vbi2_ud = channel.fromPath(params.vbi2_ud, checkIfExists: true)
    vbi2_bed = channel.fromPath(params.vbi2_bed, checkIfExists: true)
    vbi2_mean = channel.fromPath(params.vbi2_mean, checkIfExists: true)

    include { verifybamid2 as verifybamid2_bam } from './modules/verifybamid2'
    include { verifybamid2 as verifybamid2_cram } from './modules/verifybamid2'
    
    verifybamid2_bam( aln_inputs.bam, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )
    verifybamid2_cram( aln_inputs.cram, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )

        .mix( verifybamid2_bam.out.freemix )
        .mix( verifybamid2_cram.out.freemix )
        .mix( verifybamid2_bam.out.ancestry )
        .mix( verifybamid2_cram.out.ancestry )

/modules/verifybamid/main.nf的内容与samtools_stats和mosdepth相同,但只运行一个样本!

process verifybamid2 {
    tag { sample }
    errorStrategy 'ignore'

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta
    path vbi2_ud
    path vbi2_bed
    path vbi2_mean

    output:
    tuple val(sample), path("result.selfSM"), emit: freemix
    tuple val(sample), path("result.Ancestry"), emit: ancestry

    script:
    def reference = ref_fasta ? /--reference "${ref_fasta}"/ : ''
     
    """
   # run verifybamid2
   # get the percentage of cross-individual contamination rate

    verifybamid2 --NumPC 4 --SVDPrefix ${params.vbi2_svdprefix} ${reference} --BamFile ${bam}

    """
}

.....

N E X T F L O W  ~  version 23.04.1
    ......
    executor >  local (10)
    [d6/c4f241] process > mosdepth_bam (NA12878-chr14-AKT2)       [100%] 2 of 2 ✔
    [-        ] process > mosdepth_cram                           -
    [51/3a1d21] process > samtools_stats_bam (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
    [-        ] process > samtools_stats_cram                     -
    [c7/d20ab6] process > mosdepth_datamash (NA12878-chr14-AKT2)  [100%] 2 of 2 ✔
    [36/77fbe4] process > **verifybamid2_bam** (NA12878-chr14-AKT1)   [100%] 1 of 1, failed: 1 ✔
    [-        ] process > verifybamid2_cram                       -
    [f9/4fb6b9] process > multiqc                                 [100%] 1 of 1 ✔
    [48/a86b33] process > compile_metrics (NA12878-chr14-AKT1)    [100%] 2 of 2 ✔
    Started     : 2023-05-16T06:14:46.797216Z
    Completed   : 2023-05-16T06:16:47.723708Z

进程verifybamidfile配合使用而不是channel.fromPath

vbi2_ud = file( params.vbi2_ud )
vbi2_bed = file( params.vbi2_bed )
vbi2_mean = file( params.vbi2_mean )

N E X T F L O W  ~  version 23.04.1 5866
......
executor >  local (11)
[e8/1bcf62] process > mosdepth_bam (NA12878-chr14-AKT2)       [100%] 2 of 2 ✔
[-        ] process > mosdepth_cram                           -
[8d/c3e461] process > samtools_stats_bam (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
[-        ] process > samtools_stats_cram                     -
[d2/fd3ee6] process > mosdepth_datamash (NA12878-chr14-AKT2)  [100%] 2 of 2 ✔
[be/1c2e03] process > verifybamid2_bam (NA12878-chr14-AKT2)   [100%] 2 of 2, failed: 2 ✔
[-        ] process > verifybamid2_cram                       -
[d8/31b375] process > multiqc                                 [100%] 1 of 1 ✔
[7b/abd9a7] process > compile_metrics (NA12878-chr14-AKT1)    [100%] 2 of 2 ✔
Started     : 2023-05-16T12:10:36.684217Z
Completed   : 2023-05-16T12:12:41.545206Z
Duration    : 2m 5s
Status      : true
Publish dir : ./output

如何修改通道以向后处理(工作流单样本输入格式的前一版本)兼容缺少sample键的单样本输入格式

id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz

处理输入mani.nf的内容:

Channel
        .fromList( params.samples )
        .branch { rec ->
            def aln_file = file( rec.bam )

            bam: aln_file.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.id, aln_file, bam_idx )

            cram: aln_file.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.id, aln_file, cram_idx )
        }
        .set { aln_inputs }

    Channel
        .fromList( params.samples )
        .map { rec ->
            def vcf_file = file( rec.vcf )
            def vcf_idx = file( "${rec.vcf}.tbi" )

            tuple( rec.id, vcf_file, vcf_idx )
        }
        .set { vcf_inputs }
sbdsn5lh

sbdsn5lh1#

使用通道,可以处理任意数量的样本,包括仅一个样本。这里有一种使用模块来处理BAM和CRAM输入的方法。请注意,下面的每个过程都需要输入tuple,其中第一个元素是样本名称或键。为了能够在下游合并通道,我们还应该确保输出具有相同样本名称或键的元组。以下内容尚未在AWS Batch上进行测试,但至少可以让您开始:
main.nf的内容:

include { bcftools_stats } from './modules/bcftools'
include { mosdepth as mosdepth_bam } from './modules/mosdepth'
include { mosdepth as mosdepth_cram } from './modules/mosdepth'
include { multiqc } from './modules/multiqc'
include { samtools_stats as samtools_stats_bam } from './modules/samtools'
include { samtools_stats as samtools_stats_cram } from './modules/samtools'
include { mosdepth_datamash } from './modules/mosdepth_datamash'
workflow {

    ref_fasta = file( params.ref_fasta )
    autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )

    Channel
        .fromList( params.samples )
        .ifEmpty { ['id': params.id, 'bam': params.bam] }
        .branch { rec ->
            def aln_file = rec.bam ? file( rec.bam ) : null

            bam: rec.id && aln_file?.extension == 'bam'
                def bam_idx = file( "${rec.bam}.bai" )

                return tuple( rec.id, aln_file, bam_idx )

            cram: rec.id && aln_file?.extension == 'cram'
                def cram_idx = file( "${rec.bam}.crai" )

                return tuple( rec.id, aln_file, cram_idx )
        }
        .set { aln_inputs }

    Channel
        .fromList( params.samples )
        .ifEmpty { ['id': params.id, 'vcf': params.vcf] }
        .branch { rec ->
            def vcf_file = rec.vcf ? file( rec.vcf ) : null

            output: rec.id && vcf_file
                def vcf_idx = file( "${rec.vcf}.tbi" )

                return tuple( rec.id, vcf_file, vcf_idx )
        }
        .set { vcf_inputs }

    mosdepth_bam( aln_inputs.bam, [] )
    mosdepth_cram( aln_inputs.cram, ref_fasta )

    samtools_stats_bam( aln_inputs.bam, [] )
    samtools_stats_cram( aln_inputs.cram, ref_fasta )

    bcftools_stats( vcf_inputs )

    Channel
        .empty()
        .mix( mosdepth_bam.out.regions )
        .mix( mosdepth_cram.out.regions )
        .set { mosdepth_regions }

    mosdepth_datamash( mosdepth_regions, autosomes_non_gap_regions )

    Channel
        .empty()
        .mix( mosdepth_bam.out.dists )
        .mix( mosdepth_bam.out.summary )
        .mix( mosdepth_cram.out.dists )
        .mix( mosdepth_cram.out.summary )
        .mix( samtools_stats_bam.out )
        .mix( samtools_stats_cram.out )
        .mix( bcftools_stats.out )
        .mix( mosdepth_datamash.out )
        .map { sample, files -> files }
        .collect()
        .set { log_files }

    multiqc( log_files )
}

modules/samtools/main.nf的内容:

process samtools_stats {

    tag { sample }

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta

    output:
    tuple val(sample), path("${sample}.stats")

    script:
    def reference = ref_fasta ? /--reference "${ref_fasta}"/ : ''

    """
    samtools stats \\
        ${reference} \\
        "${bam}" \\
        > "${sample}.stats"
    """
}

modules/mosdepth/main.nf的内容:

process mosdepth {

    tag { sample }

    input:
    tuple val(sample), path(bam), path(bai)
    path ref_fasta

    output:
    tuple val(sample), path("*.regions.bed.gz"), emit: regions
    tuple val(sample), path("*.dist.txt"), emit: dists
    tuple val(sample), path("*.summary.txt"), emit: summary

    script:
    def fasta = ref_fasta ? /--fasta "${ref_fasta}"/ : ''

    """
    mosdepth \\
        --no-per-base \\
        --by 1000 \\
        --mapq 20 \\
        --threads ${task.cpus} \\
        ${fasta} \\
        "${sample}" \\
        "${bam}"
    """
}

modules/bcftools/main.nf的内容:

process bcftools_stats {

    tag { sample }

    input:
    tuple val(sample), path(vcf), path(tbi)

    output:
    tuple val(sample), path("${sample}.pass.stats")

    """
    bcftools stats \\
        -f PASS \\
        "${vcf}" \\
        > "${sample}.pass.stats"
    """
}

modules/multiqc/main.nf的内容:

process multiqc {

    input:
    path 'data/*'

    output:
    path "multiqc_report.html", emit: report
    path "multiqc_data", emit: data

    """
    multiqc \\
        --data-format json \\
        .
    """
}

modules/compile_metrics/main.nf的内容:

process compile_metrics {

    tag { sample_id }

    input:
    val sample_id
    path multiqc_json

    output:
    tuple val(sample_id), path("${sample_id}.metrics.json")

    """
    compile_metrics.py \\
        --multiqc_json "${multiqc_json}" \\
        --output_json "${sample_id}.metrics.json" \\
        --biosample_id "${sample_id}"
    """
}

./modules/mosdepth_datamash/main.nf的内容:

process mosdepth_datamash {

    tag { sample_id }

    input:
    tuple val(sample_id), path(regions_bed)
    path autosomes_non_gap_regions

    output:
    tuple val(sample_id), path("${sample_id}.mosdepth.csv")

    """
    zcat -f "${regions_bed}" |
        bedtools intersect -a stdin -b "${autosomes_non_gap_regions}" |
        gzip -9 > "${sample_id}.regions.autosomes_non_gap_n_bases.bed.gz"

    # do something

    touch "${sample_id}.mosdepth.csv"
    """
}

nextflow.config的内容:

plugins {

    id 'nf-amazon'
}

params {

    ref_fasta = null
    autosomes_non_gap_regions = null

    samples = null

    id = null
    bam = null
    vcf = null

    publish_dir = './results'
    publish_mode = 'copy'
}

process {

    executor = 'awsbatch'
    queue = 'test-queue'

    errorStrategy = 'retry'
    maxRetries = 3

    withName: 'samtools_stats' {

        publishDir = [
            path: "${params.publish_dir}/samtools",
            mode: params.publish_mode,
        ]
    }

    withName: 'bcftools_stats' {

        publishDir = [
            path: "${params.publish_dir}/bcftools",
            mode: params.publish_mode,
        ]
    }

    withName: 'mosdepth' {

        cpus = 4

        publishDir = [
            path: "${params.publish_dir}/mosdepth",
            mode: params.publish_mode,
        ]
    }

    withName: 'multiqc' {

        publishDir = [
            path: "${params.publish_dir}/multiqc",
            mode: params.publish_mode,
        ]
    }
}

aws {

    region = 'us-east-1'

    batch {
        cliPath = '/home/ec2-user/miniconda/bin/aws'
    }
    
}

并使用类似于以下内容运行:

$ nextflow run main.nf \
    -ansi-log false \
    -params-file input.yaml \
    -work 's3://mybucket/work' \
    --publish_dir 's3://mybucket/results' \
    --ref_fasta 's3://mybucket/ref.fa'

相关问题