This page collects some recurrent implementation patterns used in Nextflow applications. Feel free to contribute opening a pull request in the GitHub repository at this link.
1. Basic patterns
1.1. Channel duplication
1.1.1. Problem
You need to you use the same channel as input in two or more processes.
1.1.2. Solution
Use the into operator to create two (or more) copies of the source channel. Then, use the new channels as input for the processes.
1.1.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Channel
.fromPath('prots/*_?.fa')
.into { prot1_ch; prot2_ch }
process foo {
input: file x from prot1_ch
script:
"""
echo your_command --input $x
"""
}
process bar {
input: file x from prot2_ch
script:
"""
your_command --input $x
"""
}
1.1.4. Run it
Use the the following command to execute the example:
nextflow run patterns/channel-duplication.nf
2. Scatter executions
2.1. Process per file path
2.1.1. Problem
You need to execute a task for each file that matches a glob pattern.
2.1.2. Solution
Use the Channel.fromPath method to create a channel emitting all files matching the glob pattern. Then, use the channel as input of the process implementing your task.
2.1.3. Code
1
2
3
4
5
6
7
8
9
10
11
Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }
process foo {
input:
file x from samples_ch
script:
"""
your_command --input $x
"""
}
2.1.4. Run it
Use the the following command to execute the example:
nextflow run patterns/process-per-file-path.nf
2.2. Process per file chunk
2.2.1. Problem
You need to split one or more input files into chunks and execute a task for each of them.
2.2.2. Solution
Use the the splitText operator to split a file in chunks of a given size. Then use the resulting channel as input for the process implementing your task.
Caveat: By default chunks are kept in memory. When splitting big files specify the parameter file: true to save the chunks into files. See the documentation for details.
Splitter for specific file formats are available, eg splitFasta and splitFastq.
2.2.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Channel
.fromPath('poem.txt')
.splitText(by: 5)
.set{ chunks_ch }
process foo {
echo true
input:
file x from chunks_ch
script:
"""
rev $x | rev
"""
}
2.2.4. Run it
Use the the following command to execute the example:
nextflow run patterns/process-per-file-chunk.nf
2.3. Process per file pairs
2.3.1. Problem
You need to process the files into a directory grouping them by pairs.
2.3.2. Solution
Use the Channel.fromFilePairs method to create a channel emitting the file pairs matching a glob pattern. The pattern must match a common prefix in the paired file names.
The matching files are emitted as tuples in which the first element is the grouping key of the matching files and the second element is the file pair itself.
2.3.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
Channel
.fromFilePairs('reads/*_{1,2}.fq.gz')
.set { samples_ch }
process foo {
input:
set sampleId, file(reads) from samples_ch
script:
"""
your_command --sample $sampleId --reads $reads
"""
}
2.3.4. Run it
nextflow run patterns/process-per-file-pairs.nf
2.3.5. Custom grouping strategy
When needed it is possible to define a custom grouping strategy. A common use case is for alignment BAM files (sample1.bam) that come along with their index file. The difficulty is that the index is sometimes called sample1.bai and sometimes sample1.bam.bai depending on the software used. The following example can accommodate both cases.
1
2
3
4
5
6
7
8
9
10
11
12
13
Channel
.fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.set { samples_ch }
process foo {
input:
set sampleId, file(bam) from samples_ch
script:
"""
your_command --sample $sampleId --bam ${sampleId}.bam
"""
}
2.3.6. Run it
nextflow run patterns/process-per-file-pairs-custom.nf
2.4. Process per file range
2.4.1. Problem
You need to execute a task over two or more series of files having a common index range.
2.4.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel
.from(1..23)
.map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
.set { pairs_ch }
process foo {
tag "$sampleId"
input:
set sampleId, file(indels), file(snps) from pairs_ch
"""
echo foo_command --this $indels --that $snps
"""
}
2.4.4. Run it
nextflow run patterns/process-per-file-range.nf
2.5. Process per CSV record
2.5.1. Problem
You need to execute a task for each record in one or more CSV files.
2.5.3. Code
Given the file index.csv with the following content:
| sampleId | read1 | read2 |
|---|---|---|
FC816RLABXX |
reads/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz |
reads/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz |
FC812MWABXX |
reads/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz |
reads/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz |
FC81DE8ABXX |
reads/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz |
reads/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz |
FC81DB5ABXX |
reads/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz |
reads/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz |
FC819P0ABXX |
reads/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz |
reads/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz |
This snippet parses the file and executes a process for each line:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
params.index = 'index.csv'
Channel
.fromPath(params.index)
.splitCsv(header:true)
.map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
.set { samples_ch }
process foo {
input:
set sampleId, file(read1), file(read2) from samples_ch
script:
"""
echo your_command --sample $sampleId --reads $read1 $read2
"""
}
Note: relative paths are resolved by the file function against the execution directory.
In a real use case prefer absolute file paths.
2.5.4. Run it
Use the the following command to execute the example:
nextflow run patterns/process-per-csv-record.nf
2.6. Process per file output
2.6.1. Problem
A task in your workflow produces two or more files at time. A downstream task needs to process each of these files independently.
2.6.2. Solution
Use the flatten operator to transform the outputs of the upstream process to a channel emitting each file separately. Then use this channel as input for the downstream process.
2.6.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
process foo {
output:
file '*.txt' into foo_ch
script:
'''
echo Hello there! > file1.txt
echo What a beautiful day > file2.txt
echo I wish you are having fun1 > file3.txt
'''
}
process bar {
input:
file x from foo_ch.flatten()
script:
"""
cat $x
"""
}
2.6.4. Run it
Use the the following command to execute the example:
nextflow run patterns/process-per-file-output.nf
3. Gather results
3.1. Process all outputs altogether
3.1.1. Problem
You need to process all the outputs of an upstream task altogether.
3.1.2. Solution
Use the collect operator to gather all the outputs produced by the upstream task and emit them as a sole output. Then use the resulting channel as input input for the process.
3.1.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
input:
file x from samples_ch
output:
file 'file.fq' into unzipped_ch
script:
"""
< $x zcat > file.fq
"""
}
process bar {
echo true
input:
file '*.fq' from unzipped_ch.collect()
"""
cat *.fq
"""
}
3.1.4. Run it
Use the the following command to execute the example:
nextflow run patterns/process-collect.nf
3.2. Process outputs into groups
3.2.1. Problem
You need to process in the same batch all files that have a matching key in the file name.
3.2.2. Solution
Use the map operator to associate with each file a key extracted from the file name. Then chain the resulting channel with the groupTuple operator to group together all files that have a matching key. Finally use the resulting channel as input for the process.
3.2.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Channel
.fromPath('reads/*')
.map { file ->
def key = file.name.toString().tokenize('_').get(0)
return tuple(key, file)
}
.groupTuple()
.set{ groups_ch }
process foo {
input:
set key, file(samples) from groups_ch
script:
"""
echo your_command --batch $key --input $samples
"""
}
3.2.4. Run it
nextflow run patterns/process-into-groups.nf
3.3. Collect outputs into a file
3.3.1. Problem
You need to concatenate into a single file all output files produced by an upstream process.
3.3.2. Solution
Use the collectFile operator to merge all the output files into a single file.
3.3.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
input:
file x from samples_ch
output:
file 'file.fq' into unzipped_ch
script:
"""
< $x zcat > file.fq
"""
}
unzipped_ch
.collectFile()
.println{ it.text }
3.3.4. Run it
Use the the following command to execute the example:
nextflow run patterns/collect-into-file.nf
4. Organize outputs
4.1. Store process outputs
4.1.1. Problem
You need to store the outputs of one or more processes into a directory structure of your choice.
4.1.2. Solution
Use the publishDir directive to set a custom directory where the process outputs need to be made available.
4.1.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'
Channel.fromFilePairs(params.reads).set{ samples_ch }
process foo {
publishDir "$params.outdir/$sampleId"
input:
set sampleId, file(samples) from samples_ch
output:
file '*.fq'
script:
"""
< ${samples[0]} zcat > sample_1.fq
< ${samples[1]} zcat > sample_2.fq
"""
}
4.1.4. Run it
Run the script with the following command:
nextflow run patterns/publish-process-outputs.nf
4.2. Store outputs matching a glob pattern
4.2.1. Problem
A task in your workflow creates many output files that are required by a downstream task. You want to store some of those files into separate directories depending the file name.
4.2.2. Solution
Use two or more publishDir directives
to store the output files into separate storing paths. For each directive specify a different glob string
using the option pattern to store into each directory only the files that match the provided pattern.
4.2.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
params.reads = 'reads/*_{1,2}.fq.gz'
params.outdir = 'my-results'
Channel
.fromFilePairs(params.reads, flat: true)
.set{ samples_ch }
process foo {
publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"
publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'
publishDir "$params.outdir/$sampleId/", pattern: '*.fq'
input:
set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_ch
output:
file "*"
script:
"""
< sample1.fq.gz zcat > sample1.fq
< sample2.fq.gz zcat > sample2.fq
awk '{s++}END{print s/4}' sample1.fq > sample1_counts.txt
awk '{s++}END{print s/4}' sample2.fq > sample2_counts.txt
head -n 50 sample1.fq > sample1_outlook.txt
head -n 50 sample2.fq > sample2_outlook.txt
"""
}
4.2.4. Run it
Run the script with the following command:
nextflow run patterns/publish-matching-glob.nf
4.3. Rename process outputs
4.3.1. Problem
You need to store the outputs of a process to a directory giving files a name of your choice.
4.3.2. Solution
The publishDir allows you to store the process outputs in a directory of your choice.
Specify the saveAs parameter to give each file a name of your choice proving
a custom rule as a closure.
4.3.3. Code
1
2
3
4
5
6
7
8
9
10
11
process foo {
publishDir 'results', saveAs: { filename -> "foo_$filename" }
output:
file '*.txt'
'''
touch this.txt
touch that.txt
'''
}
4.3.4. Run it
nextflow run patterns/publish-rename-outputs.nf
4.3.5. Save outputs in a sub-directory
The same pattern can be used to store specific files in separate directories depending the actual name.
1
2
3
4
5
6
7
8
9
10
11
process foo {
publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }
output:
file '*'
'''
touch this.txt
touch that.zip
'''
}
Relative paths are resolved against the publishDir store path. Use an absolute path
to store files in a directory outside the publishDir store path.
|
4.3.6. Run it
nextflow run patterns/publish-rename-outputs-subdirs.nf
5. Other
5.1. Get process work directory
5.1.1. Problem
A tool need the explicit path of the current task work directory.
5.1.2. Solution
Use the $PWD Bash variable or the pwd command to retrieve the task working directory path.
Note: Make sure use to escape the $ variable placeholder
when the command script is enclosed in double quote characters.
5.1.3. Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process foo {
echo true
script:
"""
echo foo task path: \$PWD
"""
}
process bar {
echo true
script:
'''
echo bar task path: $PWD
'''
}
5.1.4. Run it
The command run the script with an empty channel:
nextflow run patterns/process-get-workdir.nf
Use the following command to provide the same script some input files, that prevents the process to be executed:
nextflow run patterns/process-get-workdir.nf --inputs ../data/prots/\*
5.2. Ignore failing process
5.2.1. Problem
A task is expected to fail in a certain condition. You want to ignore the failure and continue the execution of the remaining tasks in the workflow.
5.2.2. Solution
Use the process directive errorStrategy 'ignore' to ignore the error condition.
5.2.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process foo {
errorStrategy 'ignore'
script:
'''
echo This is going to fail!
exit 1
'''
}
process bar {
script:
'''
echo OK
'''
}
5.2.4. Run it
Run the script with the following command:
nextflow run patterns/ignore-failing-process.nf
5.3. Mock dependency
5.3.1. Problem
You need to synchronize the execution of two processes
for which there isn’t a direct input-output relationship,
so that process bar is executed only after the
completion of process foo.
5.3.2. Solution
Add to the outputs of process foo a channel producing
a flag value.
Then use this channel as input for process bar to trigger
its execution when the other process completes.
5.3.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Channel
.fromPath('.data/reads/*.fq.gz')
.set{ reads_ch }
process foo {
output:
val true into done_ch
script:
"""
your_command_here
"""
}
process bar {
input:
val flag from done_ch
file fq from reads_ch
script:
"""
other_commad_here --reads $fq
"""
}
6. Advanced patterns
6.1. Conditional resources definition
6.1.1. Problem
A task in your workflow requires to use an amount of computing resources eg. memory that depends on the size or the name of one or more input files.
6.1.2. Solution
Declare the resource requirements e.g. memory, cpus, etc.
in a dynamic manner using a closure.
The closure computes the required amount of resources using the file
attributes, such as size, etc., of the inputs declared in the process
definition.
6.1.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Channel
.fromPath('reads/*_1.fq.gz')
.set { reads_ch }
process foo {
memory { reads.size() < 70.KB ? 1.GB : 5.GB }
input:
file reads from reads_ch
"""
your_command_here --in $reads
"""
}
6.1.4. Run it
nextflow run patterns/conditional-resources.nf
Note: requires version 0.32.0 or later.
6.2. Conditional process executions
6.2.1. Problem
Two different tasks need to be executed in a mutually exclusive manner, then a third task should post-process the results of the previous execution.
6.2.2. Solution
Use a when statement to conditionally execute two different processes. Each process declares its own output channel.
Then use the mix operator to create a new channel that will emit the outputs produced by the two processes and use it as the input for the third process.
6.2.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
params.flag = false
process foo {
output:
file 'x.txt' into foo_ch
when:
!params.flag
script:
'''
echo foo > x.txt
'''
}
process bar {
output:
file 'x.txt' into bar_ch
when:
params.flag
script:
'''
echo bar > x.txt
'''
}
process omega {
echo true
input:
file x from foo_ch.mix(bar_ch)
script:
"""
cat $x
"""
}
6.2.4. Run it
Use the the following command to execute the example:
nextflow run patterns/conditional-process.nf
The processes foo and omega are executed. Run the same command
with the --flag command line option.
nextflow run patterns/conditional-process.nf --flag
This time the processes bar and omega are executed.
6.2.5. Alternative solution
Conditionally create the input channels normally (with data) or as empty channels. The process consuming the individual input channels will only execute if the channel is populated. Each process still declares its own output channel.
Then use the mix operator to create a new channel that will emit the outputs produced by the two processes and use it as the input for the third process.
6.2.6. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
params.flag = false
(foo_inch, bar_inch) = ( params.flag
? [ Channel.empty(), Channel.from(1,2,3) ]
: [ Channel.from(4,5,6), Channel.empty() ] )
process foo {
input:
val(f) from foo_inch
output:
file 'x.txt' into foo_ch
script:
"""
echo $f > x.txt
"""
}
process bar {
input:
val(b) from bar_inch
output:
file 'x.txt' into bar_ch
script:
"""
echo $b > x.txt
"""
}
process omega {
echo true
input:
file x from foo_ch.mix(bar_ch)
script:
"""
cat $x
"""
}
6.2.7. Run it
nextflow run patterns/conditional-process2.nf
6.3. Skip process execution
6.3.1. Problem
You have two sequential tasks in your workflow. When an optional flag is specified the first task should not be executed and its input(s) is processed by the second task.
6.3.2. Solution
Use an empty channel, created in a conditional expression, to skip the first process execution when an optional parameter is specified.
Then, define the second process input as a mix of the first process output (when executed) and the input channel.
6.3.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
params.skip = false
params.input = "$baseDir/sample.fq.gz"
Channel.fromPath(params.input).set{ input_ch }
(foo_ch, bar_ch) = ( params.skip
? [Channel.empty(), input_ch]
: [input_ch, Channel.empty()] )
process foo {
input:
file x from foo_ch
output:
file('*.fastq') into optional_ch
script:
"""
< $x zcat > ${x.simpleName}.fastq
"""
}
process bar {
echo true
input:
file x from bar_ch.mix(optional_ch)
"""
echo your_command --input $x
"""
}
6.3.4. Run it
Use the the following command to execute the example:
nextflow run patterns/skip-process-execution.nf
The processes foo and bar are executed. Run the same command
with the --skip command line option.
nextflow run patterns/skip-process-execution.nf --skip
This time only processes bar is executed.
6.4. Feedback loop
6.4.1. Problem
You need to repeat the execution of one or more tasks, using the output as the input for a new iteration, until a certain condition is reached.
6.4.2. Solution
Use the output of the last process in the iteration loop as the input for the first process.
To do so, explicitly create the output channel using Channel.create method.
6.4.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
params.input = 'hello.txt'
condition = { it.readLines().size()>3 }
feedback_ch = Channel.create()
input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )
process foo {
input:
file x from input_ch
output:
file 'foo.txt' into foo_ch
script:
"""
cat $x > foo.txt
"""
}
process bar {
input:
file x from foo_ch
output:
file 'bar.txt' into feedback_ch
file 'bar.txt' into result_ch
script:
"""
cat $x > bar.txt
echo World >> bar.txt
"""
}
result_ch.last().println { "Result:\n${it.text.indent(' ')}" }
6.4.4. Run it
Use the the following command to execute the example:
nextflow run patterns/feedback-loop.nf
6.5. Optional input
6.5.1. Problem
One or more processes have an optional input file.
6.5.2. Solution
Use a special file name to mark the absence of the file parameter.
6.5.3. Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
params.inputs = 'prots/*{1,2,3}.fa'
params.filter = 'NO_FILE'
prots_ch = Channel.fromPath(params.inputs)
opt_file = file(params.filter)
process foo {
input:
file seq from prots_ch
file opt from opt_file
script:
def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''
"""
your_commad --input $seq $filter
"""
}
6.5.4. Run it
Run the script with the following command:
nextflow run patterns/optional-input.nf
Run the same script providing an optional file input:
nextflow run patterns/optional-input.nf --filter foo.txt
6.6. Optional output
6.6.1. Problem
A task in your workflow is expected to not create an output file in some circumstances.
6.6.2. Solution
Declare such output as an optional file.
6.6.3. Code
1
2
3
4
5
6
7
8
9
process foo {
output:
file 'foo.txt' optional true into foo_ch
script:
'''
your_command
'''
}
6.6.4. Run it
Use the the following command to execute the example:
nextflow run patterns/optional-output.nf
6.7. Execute when empty
6.7.1. Problem
You need to execute a process if a channel is empty.
6.7.2. Solution
Use the ifEmpty operator to emit a marker value to trigger the execution of the process.
6.7.3. Example
1
2
3
4
5
6
7
8
9
10
11
process foo {
input:
val x from ch.ifEmpty { 'EMPTY' }
when:
x == 'EMPTY'
script:
'''
your_command
'''
}
6.7.4. Run it
The command run the script with an empty channel:
nextflow run patterns/process-when-empty.nf
Use the following command to provide the same script some input files, that prevents the process to be executed:
nextflow run patterns/process-when-empty.nf --inputs ../data/prots/\*
