对于nextflow管道,我想读入一个包含五列的CSV文件:
sample1,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq
sample2,path/normal_R1.fastq,path/normal_R2.fastq,path/tumor_R1.fastq,path/tumor_R2.fastq
我在文件中读取,创建一个linkedHashMap。对于每个元素,我想运行几个过程。这些过程在没有CSV迭代的情况下一直工作得很好,因为它们是由肿瘤文件通道和正常文件通道提供的。
当我用CSV编辑代码时,我得到的错误是:
流程'FASTP'已被使用--如果您需要重用相同的组件,请使用不同的名称将其包括在内,或者将其包括在不同的工作流上下文中
下面是代码:
include { FASTP} from './fastp_process.nf'
include {bwa_index} from './index_process.nf'
include { align_bwa_mem} from './bwamem_process_already_index.nf'
include { gatk_markduplicates} from './gatk_markduplicates_process.nf'
include {setupnmdtags} from './setupnmdtags_process.nf'
include { recalibrate_bam } from './recalibratebam_process.nf'
include { applybqsr } from './applybqsr_process.nf'
include { mutect2 } from './mutect2_process.nf'
include { lancet } from './lancet_process.nf'
include { manta } from './manta_process.nf'
include { strelka } from './strelka_process.nf'
include { gatk_merge_vcfs } from './gatk_merge_vcfs.nf'
workflow {
def csvFile = file("input_nextflow_files.csv")
def csvLines = csvFile.text.readLines()
def sampleMap = csvLines.collectEntries { line ->
def lineCols = line.split(',')
if (lineCols.size() >= 5) {
def sampleName = lineCols[0]
def normalR1 = file(lineCols[1])
def normalR2 = file(lineCols[2])
def tumorR1 = file(lineCols[3])
def tumorR2 = file(lineCols[4])
[(sampleName): [tuple(normalR1, normalR2), tuple(tumorR1, tumorR2)]]
} else {
return [:]
}
}
sampleMap.each { sampleName, pairList ->
def normalPair = pairList[0]
def tumorPair = pairList[1]
FASTP(tumorPair,normalPair,sampleName)
align_bwa_mem(FASTP.out.reads_tumor,FASTP.out.reads_normal) //already_created index
}
}
我相信这与下面的FASTP过程有关(输入):
process FASTP {
maxForks 3
debug true
input:
path(reads_tumor) //val outdir //doesn't work with path (outdir) // we pass multiple reads - for tumor and normal
path(reads_normal) //val outdir //doesn't work with path (outdir)
val (sample_name)
output:
tuple val(sample_name), path("${sample_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
path("${sample_id_tumor}.fastp.json"), emit: json_tumor
path("${sample_id_tumor}.fastp.html"), emit: html_tumor
tuple val(sample_id_normal), path("${sample_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
path("${sample_id_normal}.fastp.json"), emit: json_normal
path("${sample_id_normal}.fastp.html"), emit: html_normal
script:
def (r1_normal, r2_normal) = reads_normal
def (r1_tumor, r2_tumor)=reads_tumor
"""
ml fastp
fastp --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_normal}_trim_1.fq.gz" --out2 "${sample_id_normal}_trim_2.fq.gz" --json "${sample_id_normal}.fastp.json" --html "${sample_id_normal}.fastp.html" --thread 12
fastp --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20 -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_tumor}_trim_1.fq.gz" --out2 "${sample_id_tumor}_trim_2.fq.gz" --json "${sample_id_tumor}.fastp.json" --html "${sample_id_tumor}.fastp.html" --thread 12
echo "Exiting fastp"
"""
}
我不知道如何修复这个错误。我检查了多次,如果我不包括FASTP过程,它是罚款。我删除包括和FASTP调用过程中,他们没有工作。所以我不知道发生了什么。
1条答案
按热度按时间eni9jsuy1#
当您使用
each
遍历示例Map时,您实际上是在尝试在每次迭代中重用 FASTP 和 *align_bwa_bwa * 流程。Nextflow只是抱怨说,如果他们(即。进程)需要被重用,它们将需要使用不同的名称(即使用模块别名)或在不同的工作流上下文中(即,使用子工作流)。一个更好的方法来实现你想要的是使用channels和splitCSV
运算符,例如:或者,如果你想要更灵活,另一种方法是使用模块别名导入 FASTP:
./fastp_process.nf
的内容: