我需要在aws batch中使用nextflow(dsl 2)管道处理超过1 k个样本。当前版本的工作流处理每次运行的单个输入。我正在寻找工作流语法(Map元组迭代)来处理多个输入,以在parralel中运行。输入应该是json或yaml格式,每个样本的输入文件路径是唯一的。
为了保留输入文件路径"s3://..."
,我在channel中使用了.fromPath
。
以下是我的单个输入配置示例input.yaml
(-parms-file
)
id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz
运行单个样本输入的工作流
process samtools_stats {
tag "${id}"
publishDir "${params.publishdir}/${id}/samtools", mode: "copy"
input:
path bam
output:
path "${id}.stats"
script:
"""
samtools stats ${bam} > ${id}.stats
"""
}
process mosdepth_bam {
tag "${id}"
publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"
input:
path bam
path bam_idx
output:
path "${id}.regions.bed.gz"
script:
"""
mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 ${id} ${bam}
"""
}
process mosdepth_cram {
tag "${id}"
publishDir "${params.publishdir}/${id}/mosdepth", mode: "copy"
input:
path bam
path bam_idx
path reference
path reference_idx
output:
path "${id}.regions.bed.gz"
script:
"""
mosdepth --no-per-base --by 1000 --mapq 20 --threads 4 --fasta ${reference} ${id} ${bam}
"""
}
process bcftools_stats {
tag "${id}"
publishDir "${params.publishdir}/${id}/bcftools", mode: "copy"
input:
path vcf
path vcf_idx
output:
path "*"
script:
"""
bcftools stats -f PASS ${vcf} > ${id}.pass.stats
"""
}
process multiqc {
tag "${id}"
publishDir "${params.publishdir}/${id}/multiqc", mode: "copy"
input:
path "*"
output:
path "multiqc_data/*", emit: multiqc_ch
script:
"""
multiqc . --data-format json --enable-npm-plugin
"""
}
process compile_metrics {
tag "${id}"
publishDir "${params.publishdir}/${id}", mode: "copy"
input:
path multiqc
output:
path "${params.id}.metrics.json", emit: compile_metrics_out
script:
"""
# parse and calculate all the metrics in the multiqc output to compile
compile_metrics.py \
--multiqc_json multiqc_data.json \
--output_json ${params.id}.metrics.json \
--biosample_id ${params.id}
"""
}
/*
----------------------------------------------------------------------
WORKFLOW
---------------------------------------------------------------------
*/
id = params.id
aln_file = file ( params.bam )
aln_file_type = aln_file.getExtension()
vcf_file = ( params.vcf )
vcf_idx = channel.fromPath(params.vcf + ".tbi", checkIfExists: true)
if (aln_file_type == "bam") {
cbam = channel.fromPath(params.bam, checkIfExists: true)
cbam_idx = channel.fromPath(params.bam + ".bai", checkIfExists: true)
}
else if (aln_file_type == "cram") {
cbam = channel.fromPath(params.bam, checkIfExists: true)
cbam_idx = channel.fromPath(params.bam + ".crai", checkIfExists: true)
}
reference = channel.fromPath(params.reference, checkIfExists: true)
reference_idx = channel.fromPath(params.reference + ".fai", checkIfExists: true)
// main
workflow {
if (aln_file_type == "bam") {
samtools_stats( bam )
mosdepth_bam( bam, bam_idx )
bcftools_stats ( vcf, vcf_idx )
multiqc( samtools_stats.out.mix( mosdepth_bam.out ).collect() )
compile_metrics(multiqc.out)
} else if (aln_file_type == "cram") {
samtools_stats( bam )
mosdepth_cram( bam, bam_idx, reference, reference_idx )
bcftools_stats ( vcf, vcf_idx )
multiqc( samtools_stats.out.mix( mosdepth_cram.out ).collect() )
compile_metrics(multiqc.out)
}
}
我想修改工作流,以便在以下多个样本输入中以并行方式运行
samples:
-
id: HLA1001
bam: s3://HLA/data/udf/HLA1001.bam
vcf: s3://HLA/data/udf/HLA1001.vcf.gz
-
id: NHLA1002
bam: s3://HLA/data/sdd/HLA1002.bam
vcf: s3://HLA/data/sdd/HLA1002.vcf.gz
-
id: NHLA1003
bam: s3://HLA/data/klm/HLA1003.bam
vcf: s3://HLA/data/klm/HLA1003.vcf.gz
-
id: NHLA2000
bam: s3://HLA/data/rcb/HLA2000.bam
vcf: s3://HLA/data/rcb/HLA2000.vcf.gz
多个示例的预期最终输出文件夹结构。
s3://mybucket/results/HLA1001/
samtools/
mosdepth/
bcftools/
multiqc/
HLA1001.metrics.json
s3://mybucket/results/HLA1002/
samtools/
mosdepth/
bcftools/
multiqc/
HLA1002.metrics.json
bam/cram、vcf的输入以及multiqc和compile_metrics的输入都必须在每个进程中获取相同的样本。
感谢你的帮助!谢谢
按照@steve回答的方法。main.nf
的内容:更新
include { compile_metrics } from './modules/compile_metrics'
Channel
.fromList( params.samples )
.map { it.biosample_id }
.set { sample_ids }
compile_metrics ( sample_ids, multiqc.out.json_data )
}
modules/compile_metrics/main.nf
的内容:
process compile_metrics {
tag { sample_ids }
input:
val(sample_ids)
path "multiqc_data.json"
output:
tuple val(sample_ids), path("${sample_ids}.metrics.json"), emit: compile_metrics_out
script:
"""
compile_metrics.py \
--multiqc_json multiqc_data.json \
--output_json "${sample_ids}.metrics.json" \\
--biosample_id "${sample_ids}" \\
"""
}
在另一个进程中,mosdepth_datamash
应用datamash处理mosdepth
的一个输出时,未能识别compile_metrics
中使用的同一个变量sample_ids
。
ERROR~无此变量:sample_ids
main.nf
的内容:
include { mosdepth_datamash } from './modules/mosdepth_datamash'
autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
Channel.fromList( params.samples )
.map { it.biosample_id }
.set { sample_ids }
compile_metrics ( sample_ids, multiqc.out.json_data )
mosdepth_datamash( sample_ids, mosdepth.out.regions, autosomes_non_gap_regions )
.mix( mosdepth_datamash.out.coverage ) // multiqc( log_files )
modules/mosdepth_datamash/main.nf
的内容:
process mosdepth_datamash {
tag { sample_ids }
input:
val(sample_ids)
path autosomes_non_gap_regions
path "${sample_ids}.regions.bed.gz"
output:
tuple val(sample_ids), path("${sample_ids}.mosdepth.csv"), emit: coverage
script:
"""
zcat "${sample_ids}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample_ids}.regions.autosomes_non_gap_n_bases.bed.gz"
.......
.......
"""
}
更新main.nf
:
include { mosdepth_datamash } from './modules/mosdepth_datamash'
autosomes_non_gap_regions = file( params.autosomes_non_gap_regions )
mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ).collect() )
更新mosdepth_datamash
:
process mosdepth_datamash {
tag { sample }
input:
path autosomes_non_gap_regions
tuple val(sample), path(regions)
output:
tuple val(sample), path("${sample}.mosdepth.csv"), emit: coverage
script:
"""
zcat "${sample}.regions.bed.gz" | bedtools intersect -a stdin -b ${autosomes_non_gap_regions} | gzip -9c > "${sample}.regions.autosomes_non_gap_n_bases.bed.gz"
.....
}
**警告:**警告:输入元组与进程mosdepth_datamash
声明的输入集基数不匹配--违规值:[NA12878-chr14, /home/ubuntu/sample-qc/tests/NA12878-chr14_1000genomes-dragen-3.7.6/work/34/7f9ec33ef353c0276f71d2d7248ff2/NA12878-chr14.regions.bed.gz, NA12878-chr15, /home/ubuntu/sample-qc/tests/NA12878-chr15_1000genomes-dragen-3.7.6/work/0b/533ee8c0b8ec47418990a6d05f2f3c/NA12878-chr15.regions.bed.gz]
N E X T F L O W ~ version 23.04.1
......
executor > local (5)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14) [100%] 2 of 2 ✔
executor > local (5)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14) [100%] 2 of 2 ✔
executor > local (9)
[0b/533ee8] process > mosdepth_bam (NA12878-chr14) [100%] 2 of 2 ✔
[- ] process > mosdepth_cram -
[d6/3fb2e2] process > samtools_stats_bam (NA12878-chr15) [100%] 2 of 2 ✔
[- ] process > samtools_stats_cram -
[3e/031f1b] process > mosdepth_datamash (NA12878-chr14) [100%] 1 of 1 ✔
[4a/7e44ca] process > verifybamid2_bam (NA12878-chr14) [100%] 1 of 1 failed: 1✔
[- ] process > verifybamid2_cram -
[21/4a221a] process > multiqc [100%] 1 of 1 ✔
[54/e19285] process > compile_metrics (NA12878-chr14) [100%] 2 of 2 ✔
Started : 2023-05-16T05:39:06.759097Z
Completed : 2023-05-16T05:41:11.063286Z
更新main.nf
:修复-使用queue
通道而不是collect
mosdepth_datamash( autosomes_non_gap_regions, mosdepth_bam.out.regions.mix( mosdepth_cram.out.regions ) )
main.nf
`verifybamid`的另一个问题
vbi2_ud = channel.fromPath(params.vbi2_ud, checkIfExists: true)
vbi2_bed = channel.fromPath(params.vbi2_bed, checkIfExists: true)
vbi2_mean = channel.fromPath(params.vbi2_mean, checkIfExists: true)
include { verifybamid2 as verifybamid2_bam } from './modules/verifybamid2'
include { verifybamid2 as verifybamid2_cram } from './modules/verifybamid2'
verifybamid2_bam( aln_inputs.bam, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )
verifybamid2_cram( aln_inputs.cram, ref_fasta, vbi2_ud, vbi2_bed, vbi2_mean )
.mix( verifybamid2_bam.out.freemix )
.mix( verifybamid2_cram.out.freemix )
.mix( verifybamid2_bam.out.ancestry )
.mix( verifybamid2_cram.out.ancestry )
/modules/verifybamid/main.nf
的内容与samtools_stats和mosdepth相同,但只运行一个样本!
process verifybamid2 {
tag { sample }
errorStrategy 'ignore'
input:
tuple val(sample), path(bam), path(bai)
path ref_fasta
path vbi2_ud
path vbi2_bed
path vbi2_mean
output:
tuple val(sample), path("result.selfSM"), emit: freemix
tuple val(sample), path("result.Ancestry"), emit: ancestry
script:
def reference = ref_fasta ? /--reference "${ref_fasta}"/ : ''
"""
# run verifybamid2
# get the percentage of cross-individual contamination rate
verifybamid2 --NumPC 4 --SVDPrefix ${params.vbi2_svdprefix} ${reference} --BamFile ${bam}
"""
}
.....
N E X T F L O W ~ version 23.04.1
......
executor > local (10)
[d6/c4f241] process > mosdepth_bam (NA12878-chr14-AKT2) [100%] 2 of 2 ✔
[- ] process > mosdepth_cram -
[51/3a1d21] process > samtools_stats_bam (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
[- ] process > samtools_stats_cram -
[c7/d20ab6] process > mosdepth_datamash (NA12878-chr14-AKT2) [100%] 2 of 2 ✔
[36/77fbe4] process > **verifybamid2_bam** (NA12878-chr14-AKT1) [100%] 1 of 1, failed: 1 ✔
[- ] process > verifybamid2_cram -
[f9/4fb6b9] process > multiqc [100%] 1 of 1 ✔
[48/a86b33] process > compile_metrics (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
Started : 2023-05-16T06:14:46.797216Z
Completed : 2023-05-16T06:16:47.723708Z
进程verifybamid
与file
配合使用而不是channel.fromPath
vbi2_ud = file( params.vbi2_ud )
vbi2_bed = file( params.vbi2_bed )
vbi2_mean = file( params.vbi2_mean )
N E X T F L O W ~ version 23.04.1 5866
......
executor > local (11)
[e8/1bcf62] process > mosdepth_bam (NA12878-chr14-AKT2) [100%] 2 of 2 ✔
[- ] process > mosdepth_cram -
[8d/c3e461] process > samtools_stats_bam (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
[- ] process > samtools_stats_cram -
[d2/fd3ee6] process > mosdepth_datamash (NA12878-chr14-AKT2) [100%] 2 of 2 ✔
[be/1c2e03] process > verifybamid2_bam (NA12878-chr14-AKT2) [100%] 2 of 2, failed: 2 ✔
[- ] process > verifybamid2_cram -
[d8/31b375] process > multiqc [100%] 1 of 1 ✔
[7b/abd9a7] process > compile_metrics (NA12878-chr14-AKT1) [100%] 2 of 2 ✔
Started : 2023-05-16T12:10:36.684217Z
Completed : 2023-05-16T12:12:41.545206Z
Duration : 2m 5s
Status : true
Publish dir : ./output
如何修改通道以向后处理(工作流单样本输入格式的前一版本)兼容缺少sample
键的单样本输入格式
id: HLA1001
bam: s3://HLA/data/HLA1001.bam
vcf: s3://HLA/data/HLA1001.vcf.gz
处理输入mani.nf
的内容:
Channel
.fromList( params.samples )
.branch { rec ->
def aln_file = file( rec.bam )
bam: aln_file.extension == 'bam'
def bam_idx = file( "${rec.bam}.bai" )
return tuple( rec.id, aln_file, bam_idx )
cram: aln_file.extension == 'cram'
def cram_idx = file( "${rec.bam}.crai" )
return tuple( rec.id, aln_file, cram_idx )
}
.set { aln_inputs }
Channel
.fromList( params.samples )
.map { rec ->
def vcf_file = file( rec.vcf )
def vcf_idx = file( "${rec.vcf}.tbi" )
tuple( rec.id, vcf_file, vcf_idx )
}
.set { vcf_inputs }
1条答案
按热度按时间sbdsn5lh1#
使用通道,可以处理任意数量的样本,包括仅一个样本。这里有一种使用模块来处理BAM和CRAM输入的方法。请注意,下面的每个过程都需要输入
tuple
,其中第一个元素是样本名称或键。为了能够在下游合并通道,我们还应该确保输出具有相同样本名称或键的元组。以下内容尚未在AWS Batch上进行测试,但至少可以让您开始:main.nf
的内容:modules/samtools/main.nf
的内容:modules/mosdepth/main.nf
的内容:modules/bcftools/main.nf
的内容:modules/multiqc/main.nf
的内容:modules/compile_metrics/main.nf
的内容:./modules/mosdepth_datamash/main.nf
的内容:nextflow.config
的内容:并使用类似于以下内容运行: