module load singularity/3.4.1
module load java/1.8.0
The nextflow
executable has been placed in /data/MSc/2022/
for you. Add this to your bin:
cp /data/MSc/2022/nextflow ~/bin/
Double check you can access the executable:
nextflow -v
nextflow version 21.04.0-edge.5543
We need to tell nextflow to run on the MSC
queue.
You should already have a config file present in your ~/.nextflow
directory (we used this to run the variant calling pipeline).
Replace the following line:
queue = { task.cpus > 8 ? 'highmem' : 'normal' }
with
queue = 'MSC'
The first step we will cover is reading files into nextflow
scripts and the data structure they are stored in.
Copy the contents of the code below, save it in a script called stage_files.nf
and run it:
nextflow run stage_files.nf
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
ch_reads.view()
ch_fasta.view()
ch_gtf.view()
We will build on our previous script and include a process to perform indexing using bwa
.
We will also add a parameter
called outdir
which we will use to specify the path to write files to.
parameters can be hardcoded in the script, passed via the command line, or config file.
The flag -with-singularity
needs to be included in the command line, the script needs to know where to find the tools to run the process.
nextflow run stage_files.nf --outdir "./" -with-singularity "/data/containers/nfcore-chipseq-1.2.2.img"
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
ch_index.view()
Note: generally you cannot reuse channels twice. Running
.view()
‘consumes’ the channel, so after your sanity check remove the.view()
line.
Build on the previous script by adding alignment using bwa
.
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
process BWA_Align{
tag "Aligning $base to genome"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(reads) from ch_reads
file(index) from ch_index.collect()
output:
tuple val(base), file("*.bam") into ch_aligned_bams
script:
RG = "\'@RG\\tID:${base}\\tSM:${base.split('_')[0..-2].join('_')}\\tPL:ILLUMINA\\tLB:${base}\\tPU:1\'"
genome = index[0]
"""
bwa mem \
-t 2 \
-M \
-R $RG \
$genome \
$reads | samtools view -@ 2 -b -h -F 0x0100 -O BAM -o ${base}.bam
"""
}
Update the script to include the sorting of aligned bam
files:
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
process BWA_Align{
tag "Aligning $base to genome"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(reads) from ch_reads
file(index) from ch_index.collect()
output:
tuple val(base), file("*.bam") into ch_aligned_bams
script:
RG = "\'@RG\\tID:${base}\\tSM:${base.split('_')[0..-2].join('_')}\\tPL:ILLUMINA\\tLB:${base}\\tPU:1\'"
genome = index[0]
"""
bwa mem \
-t 2 \
-M \
-R $RG \
$genome \
$reads | samtools view -@ 2 -b -h -F 0x0100 -O BAM -o ${base}.bam
"""
}
process Samtools_Sort{
tag "Sorting sample $base"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(bam) from ch_aligned_bams
output:
tuple val(base), file("*.{bam,bai}") into ch_aligned_bams_sorted
script:
"""
samtools sort -@ 2 -o ${base}.sorted.bam $bam
samtools index ${base}.sorted.bam
"""
}
One of the hardest things to conceptualise in nextflow is merging tuples. The schematic below shows how to merge our sorted bam
files according to a common grouping key i.e the Sample
identifier, which we can grab from the file names.
There exists multiple ways to go about this!
If you need help with this sort of operation in your final assignment please do not hesitate to ask me for help.
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
process BWA_Align{
tag "Aligning $base to genome"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(reads) from ch_reads
file(index) from ch_index.collect()
output:
tuple val(base), file("*.bam") into ch_aligned_bams
script:
RG = "\'@RG\\tID:${base}\\tSM:${base.split('_')[0..-2].join('_')}\\tPL:ILLUMINA\\tLB:${base}\\tPU:1\'"
genome = index[0]
"""
bwa mem \
-t 2 \
-M \
-R $RG \
$genome \
$reads | samtools view -@ 2 -b -h -F 0x0100 -O BAM -o ${base}.bam
"""
}
process Samtools_Sort{
tag "Sorting sample $base"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(bam) from ch_aligned_bams
output:
tuple val(base), file("*.{bam,bai}") into ch_aligned_bams_sorted
script:
"""
samtools sort -@ 2 -o ${base}.sorted.bam $bam
samtools index ${base}.sorted.bam
"""
}
ch_aligned_bams_sorted
.map{ it -> [ it[0].split('_')[0..-2].join('_'), it[1] ] }
.groupTuple(by: [0])
.map{ it -> [ it[0], it[1].flatten() ] }
.set{ ch_merge_bams }
process Merge_Bams{
tag "Merging $base BAM files"
publishDir "${params.outdir}/merged_bams", mode:'copy'
input:
tuple val(base), file(bam) from ch_merge_bams
output:
tuple val(base), file("*.{bam,bai}") into ch_merged_bams
script:
bams = bam.findAll{ it.toString().endsWith('.bam') }.sort()
"""
picard MergeSamFiles \
${'INPUT='+bams.join(' INPUT=')} \
OUTPUT=${base}.sorted.bam \
SORT_ORDER=coordinate \
VALIDATION_STRINGENCY=LENIENT \
samtools index ${base}.sorted.bam
"""
}
This process is straight forward now that we have merged our bam
files.
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
process BWA_Align{
tag "Aligning $base to genome"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(reads) from ch_reads
file(index) from ch_index.collect()
output:
tuple val(base), file("*.bam") into ch_aligned_bams
script:
RG = "\'@RG\\tID:${base}\\tSM:${base.split('_')[0..-2].join('_')}\\tPL:ILLUMINA\\tLB:${base}\\tPU:1\'"
genome = index[0]
"""
bwa mem \
-t 2 \
-M \
-R $RG \
$genome \
$reads | samtools view -@ 2 -b -h -F 0x0100 -O BAM -o ${base}.bam
"""
}
process Samtools_Sort{
tag "Sorting sample $base"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(bam) from ch_aligned_bams
output:
tuple val(base), file("*.{bam,bai}") into ch_aligned_bams_sorted
script:
"""
samtools sort -@ 2 -o ${base}.sorted.bam $bam
samtools index ${base}.sorted.bam
"""
}
ch_aligned_bams_sorted
.map{ it -> [ it[0].split('_')[0..-2].join('_'), it[1] ] }
.groupTuple(by: [0])
.map{ it -> [ it[0], it[1].flatten() ] }
.set{ ch_merge_bams }
process Merge_Bams{
tag "Merging $base BAM files"
publishDir "${params.outdir}/merged_bams", mode:'copy'
input:
tuple val(base), file(bam) from ch_merge_bams
output:
tuple val(base), file("*.bam"), file("*.bai") into ch_merged_bams
script:
bams = bam.findAll{ it.toString().endsWith('.bam') }.sort()
"""
picard MergeSamFiles \
${'INPUT='+bams.join(' INPUT=')} \
OUTPUT=${base}.sorted.bam \
SORT_ORDER=coordinate \
VALIDATION_STRINGENCY=LENIENT \
samtools index ${base}.sorted.bam
"""
}
process MarkDuplicates{
tag "Marking Duplicates in $base"
publishDir "${params.outdir}/markdups", mode:'copy'
input:
tuple val(base), file(bam), file(bai) from ch_merged_bams
output:
tuple val(base), file("*.bam"), file("*.bai") into ch_markdup_bams
script:
"""
picard MarkDuplicates \
INPUT=$bam \
OUTPUT=${base}.markdups.bam \
ASSUME_SORTED=true \
REMOVE_DUPLICATES=false \
VALIDATION_STRINGENCY=LENIENT \
METRICS_FILE=${base}.markdups.metrics.txt
samtools index ${base}.markdups.bam
"""
}
I was unable to expicitly pass SPT5_T0
and SPT5_Input
to the -t
and -c
flags dynamically.
This will work for the tutorial but is not scalable to other samples..
#!/usr/bin/env nextflow
ch_reads = Channel.fromFilePairs("/data/MSc/2022/chip_scratch/files/*_R{1,2}.fastq.gz")
ch_fasta = Channel.value(file("/data/MSc/2022/chip_scratch/files/genome.fa"))
ch_gtf = Channel.value(file("/data/MSc/2022/chip_scratch/files/genes.gtf"))
process BWA_Index{
tag "Indexing $fasta"
publishDir "${params.outdir}", mode:'copy'
input:
file(fasta) from ch_fasta
output:
file("BWAIndex/*") into ch_index
script:
"""
bwa index -a bwtsw $fasta
mkdir -p BWAIndex
mv ${fasta}* BWAIndex/
"""
}
process BWA_Align{
tag "Aligning $base to genome"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(reads) from ch_reads
file(index) from ch_index.collect()
output:
tuple val(base), file("*.bam") into ch_aligned_bams
script:
RG = "\'@RG\\tID:${base}\\tSM:${base.split('_')[0..-2].join('_')}\\tPL:ILLUMINA\\tLB:${base}\\tPU:1\'"
genome = index[0]
"""
bwa mem \
-t 2 \
-M \
-R $RG \
$genome \
$reads | samtools view -@ 2 -b -h -F 0x0100 -O BAM -o ${base}.bam
"""
}
process Samtools_Sort{
tag "Sorting sample $base"
publishDir "${params.outdir}/bwa", mode:'copy'
input:
tuple val(base), file(bam) from ch_aligned_bams
output:
tuple val(base), file("*.{bam,bai}") into ch_aligned_bams_sorted
script:
"""
samtools sort -@ 2 -o ${base}.sorted.bam $bam
samtools index ${base}.sorted.bam
"""
}
ch_aligned_bams_sorted
.map{ it -> [ it[0].split('_')[0..-2].join('_'), it[1] ] }
.groupTuple(by: [0])
.map{ it -> [ it[0], it[1].flatten() ] }
.set{ ch_merge_bams }
process Merge_Bams{
tag "Merging $base BAM files"
publishDir "${params.outdir}/merged_bams", mode:'copy'
input:
tuple val(base), file(bam) from ch_merge_bams
output:
tuple val(base), file("*.bam"), file("*.bai") into ch_merged_bams
script:
bams = bam.findAll{ it.toString().endsWith('.bam') }.sort()
"""
picard MergeSamFiles \
${'INPUT='+bams.join(' INPUT=')} \
OUTPUT=${base}.sorted.bam \
SORT_ORDER=coordinate \
VALIDATION_STRINGENCY=LENIENT \
samtools index ${base}.sorted.bam
"""
}
process MarkDuplicates{
tag "Marking Duplicates in $base"
publishDir "${params.outdir}/markdups", mode:'copy'
input:
tuple val(base), file(bam), file(bai) from ch_merged_bams
output:
tuple val(base), file("*.bam"), file("*.bai") into ch_markdup_bams
script:
"""
picard MarkDuplicates \
INPUT=$bam \
OUTPUT=${base}.markdups.bam \
ASSUME_SORTED=true \
REMOVE_DUPLICATES=false \
VALIDATION_STRINGENCY=LENIENT \
METRICS_FILE=${base}.markdups.metrics.txt
samtools index ${base}.markdups.bam
"""
}
ch_markdup_bams
.map{ it -> [ it[0].split('_')[0], it[1] ] }
.groupTuple( by:[0] )
.map{ it -> [ it[0], it[1].flatten() ] }
.set{ ch_macs_bams }
process Macs2{
tag "Performing Peak Calling on $base"
publishDir "${params.outdir}/macs2", mode:'copy'
input:
tuple val(base), file(bam) from ch_macs_bams
output:
file("${base}_*") into ch_macs2_output
file("${base}_summits.bed") into ch_macs2_bed
script:
"""
macs2 callpeak \
-t SPT5_T0.markdups.bam \
-c SPT5_Input.markdups.bam \
-f BAMPE \
--outdir . \
--name $base
"""
}
Finish the script by adding the final process to annotate the peaks using annotatePeaks.pl
.
I have placed the SPT5_summits.bed
file in it’s own channel - calling it should be straightforward.