This page collects some recurrent implementation patterns used in Nextflow applications. Feel free to contribute opening a pull request in the GitHub repository at this link.

1. Basic patterns

1.1. Channel duplication

1.1.1. Problem

You need to you use the same channel as input in two or more processes.

1.1.2. Solution

Use the into operator to create two (or more) copies of the source channel. Then, use the new channels as input for the processes.

1.1.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Channel
    .fromPath('prots/*_?.fa')
    .into { prot1_ch; prot2_ch }

process foo {
  input: file x from prot1_ch
  script:
  """
    echo your_command --input $x
  """
}

process bar {
  input: file x from prot2_ch
  script:
  """
    your_command --input $x
  """
}

1.1.4. Run it

Use the the following command to execute the example:

nextflow run patterns/channel-duplication.nf

2. Scatter executions

2.1. Process per file path

2.1.1. Problem

You need to execute a task for each file that matches a glob pattern.

2.1.2. Solution

Use the Channel.fromPath method to create a channel emitting all files matching the glob pattern. Then, use the channel as input of the process implementing your task.

2.1.3. Code

1
2
3
4
5
6
7
8
9
10
11
Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }

process foo {
  input:
  file x from samples_ch

  script:
  """
  your_command --input $x
  """
}

2.1.4. Run it

Use the the following command to execute the example:

nextflow run patterns/process-per-file-path.nf

2.2. Process per file chunk

2.2.1. Problem

You need to split one or more input files into chunks and execute a task for each of them.

2.2.2. Solution

Use the the splitText operator to split a file in chunks of a given size. Then use the resulting channel as input for the process implementing your task.

Caveat: By default chunks are kept in memory. When splitting big files specify the parameter file: true to save the chunks into files. See the documentation for details.

Splitter for specific file formats are available, eg splitFasta and splitFastq.

2.2.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Channel
    .fromPath('poem.txt')
    .splitText(by: 5)
    .set{ chunks_ch }

process foo {
  echo true
  input:
  file x from chunks_ch

  script:
  """
  rev $x | rev
  """
}

2.2.4. Run it

Use the the following command to execute the example:

nextflow run patterns/process-per-file-chunk.nf

2.3. Process per file pairs

2.3.1. Problem

You need to process the files into a directory grouping them by pairs.

2.3.2. Solution

Use the Channel.fromFilePairs method to create a channel emitting the file pairs matching a glob pattern. The pattern must match a common prefix in the paired file names.

The matching files are emitted as tuples in which the first element is the grouping key of the matching files and the second element is the file pair itself.

2.3.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
Channel
    .fromFilePairs('reads/*_{1,2}.fq.gz')
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(reads) from samples_ch

  script:
  """
  your_command --sample $sampleId --reads $reads
  """
}

2.3.4. Run it

nextflow run patterns/process-per-file-pairs.nf

2.3.5. Custom grouping strategy

When needed it is possible to define a custom grouping strategy. A common use case is for alignment BAM files (sample1.bam) that come along with their index file. The difficulty is that the index is sometimes called sample1.bai and sometimes sample1.bam.bai depending on the software used. The following example can accommodate both cases.

1
2
3
4
5
6
7
8
9
10
11
12
13
Channel
    .fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(bam) from samples_ch

  script:
  """
  your_command --sample $sampleId --bam ${sampleId}.bam
  """
}

2.3.6. Run it

nextflow run patterns/process-per-file-pairs-custom.nf

2.4. Process per file range

2.4.1. Problem

You need to execute a task over two or more series of files having a common index range.

2.4.2. Solution

Use a the from method define the range over which repeat the task execution, then chain it with a map operator to associate the each index the corresponding input files. Finally use the resulting channel as input for the process.

2.4.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel
  .from(1..23)
  .map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
  .set { pairs_ch }


process foo {
  tag "$sampleId"

  input:
  set sampleId, file(indels), file(snps) from pairs_ch

  """
  echo foo_command --this $indels --that $snps
  """
}

2.4.4. Run it

nextflow run patterns/process-per-file-range.nf

2.5. Process per CSV record

2.5.1. Problem

You need to execute a task for each record in one or more CSV files.

2.5.2. Solution

Read the CSV file line-by-line using the sliptCsv operator, then use the map to return a tuple with the required field for each line and convert any string path to a file path object using the file function. Finally use the resulting channel as input for the process.

2.5.3. Code

Given the file index.csv with the following content:

sampleId read1 read2

FC816RLABXX

reads/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz

reads/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz

FC812MWABXX

reads/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz

reads/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz

FC81DE8ABXX

reads/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz

reads/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz

FC81DB5ABXX

reads/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz

reads/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz

FC819P0ABXX

reads/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz

reads/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz

This snippet parses the file and executes a process for each line:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
params.index = 'index.csv'

Channel
    .fromPath(params.index)
    .splitCsv(header:true)
    .map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
    .set { samples_ch }

process foo {
    input:
    set sampleId, file(read1), file(read2) from samples_ch

    script:
    """
    echo your_command --sample $sampleId --reads $read1 $read2
    """
}

Note: relative paths are resolved by the file function against the execution directory. In a real use case prefer absolute file paths.

2.5.4. Run it

Use the the following command to execute the example:

nextflow run patterns/process-per-csv-record.nf

2.6. Process per file output

2.6.1. Problem

A task in your workflow produces two or more files at time. A downstream task needs to process each of these files independently.

2.6.2. Solution

Use the flatten operator to transform the outputs of the upstream process to a channel emitting each file separately. Then use this channel as input for the downstream process.

2.6.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
process foo {
  output:
  file '*.txt' into foo_ch
  script:
  '''
  echo Hello there! > file1.txt
  echo What a beautiful day > file2.txt
  echo I wish you are having fun1 > file3.txt
  '''
}

process foo {
  input:
  file x from foo_ch.flatten()
  script:
  """
  cat $x
  """
}

2.6.4. Run it

Use the the following command to execute the example:

nextflow run patterns/process-per-file-output.nf

3. Gather results

3.1. Process all outputs altogether

3.1.1. Problem

You need to process all the outputs of an upstream task altogether.

3.1.2. Solution

Use the collect operator to gather all the outputs produced by the upstream task and emit them as a sole output. Then use the resulting channel as input input for the process.

3.1.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

process bar {
  echo true
  input:
  file '*.fq' from unzipped_ch.collect()
  """
  cat *.fq
  """
}

3.1.4. Run it

Use the the following command to execute the example:

nextflow run patterns/process-collect.nf

3.2. Process outputs into groups

3.2.1. Problem

You need to process in the same batch all files that have a matching key in the file name.

3.2.2. Solution

Use the map operator to associate with each file a key extracted from the file name. Then chain the resulting channel with the groupTuple operator to group together all files that have a matching key. Finally use the resulting channel as input for the process.

3.2.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Channel
    .fromPath('reads/*')
    .map { file ->
        def key = file.name.toString().tokenize('_').get(0)
        return tuple(key, file)
     }
    .groupTuple()
    .set{ groups_ch }


process foo {
  input:
  set key, file(samples) from groups_ch

  script:
  """
  echo your_command --batch $key --input $samples
  """
}

3.2.4. Run it

nextflow run patterns/process-into-groups.nf

3.3. Collect outputs into a file

3.3.1. Problem

You need to concatenate into a single file all output files produced by an upstream process.

3.3.2. Solution

Use the collectFile operator to merge all the output files into a single file.

3.3.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

unzipped_ch
      .collectFile()
      .println{ it.text }

3.3.4. Run it

Use the the following command to execute the example:

nextflow run patterns/collect-into-file.nf

4. Organize outputs

4.1. Store process outputs

4.1.1. Problem

You need to store the outputs of one or more processes into a directory structure of your choice.

4.1.2. Solution

Use the publishDir directive to set a custom directory where the process outputs need to be made available.

4.1.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'

Channel.fromFilePairs(params.reads).set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId"
  input:
  set sampleId, file(samples) from samples_ch
  output:
  file '*.fq'

  script:
  """
  < ${samples[0]} zcat > sample_1.fq
  < ${samples[1]} zcat > sample_2.fq
  """
}

4.1.4. Run it

Run the script with the following command:

nextflow run patterns/publish-process-outputs.nf

4.2. Store outputs matching a glob pattern

4.2.1. Problem

A task in your workflow creates many output files that are required by a downstream task. You want to store some of those files into separate directories depending the file name.

4.2.2. Solution

Use two or more publishDir directives to store the output files into separate storing paths. For each directive specify a different glob string using the option pattern to store into each directory only the files that match the provided pattern.

4.2.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
params.reads = 'reads/*_{1,2}.fq.gz'
params.outdir = 'my-results'

Channel
    .fromFilePairs(params.reads, flat: true)
    .set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"
  publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'
  publishDir "$params.outdir/$sampleId/", pattern: '*.fq'

  input:
    set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_ch
  output:
    file "*"
  script:
  """
    < sample1.fq.gz zcat > sample1.fq
    < sample2.fq.gz zcat > sample2.fq

    awk '{s++}END{print s/4}' sample1.fq > sample1_counts.txt
    awk '{s++}END{print s/4}' sample2.fq > sample2_counts.txt

    head -n 50 sample1.fq > sample1_outlook.txt
    head -n 50 sample2.fq > sample2_outlook.txt
  """
}

4.2.4. Run it

Run the script with the following command:

nextflow run patterns/publish-matching-glob.nf

4.3. Rename process outputs

4.3.1. Problem

You need to store the outputs of a process to a directory giving files a name of your choice.

4.3.2. Solution

The publishDir allows you to store the process outputs in a directory of your choice.

Specify the saveAs parameter to give each file a name of your choice proving a custom rule as a closure.

4.3.3. Code

1
2
3
4
5
6
7
8
9
10
11
process foo {
 publishDir 'results', saveAs: { filename -> "foo_$filename" }

 output:
 file '*.txt'

 '''
 touch this.txt
 touch that.txt
 '''
}

4.3.4. Run it

nextflow run patterns/publish-rename-outputs.nf

4.3.5. Save outputs in a sub-directory

The same pattern can be used to store specific files in separate directories depending the actual name.

1
2
3
4
5
6
7
8
9
10
11
process foo {
 publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }

 output:
 file '*'

 '''
 touch this.txt
 touch that.zip
 '''
}
Relative paths are resolved against the publishDir store path. Use an absolute path to store files in a directory outside the publishDir store path.

4.3.6. Run it

nextflow run patterns/publish-rename-outputs-subdirs.nf

5. Other

5.1. Get process work directory

5.1.1. Problem

A tool need the explicit path of the current task work directory.

5.1.2. Solution

Use the $PWD Bash variable or the pwd command to retrieve the task working directory path.

Note: Make sure use to escape the $ variable placeholder when the command script is enclosed in double quote characters.

5.1.3. Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process foo {
  echo true
  script:
  """
  echo foo task path: \$PWD
  """
}

process bar {
  echo true
  script:
  '''
  echo bar task path: $PWD
  '''
}

5.1.4. Run it

The command run the script with an empty channel:

nextflow run patterns/process-get-workdir.nf

Use the following command to provide the same script some input files, that prevents the process to be executed:

nextflow run patterns/process-get-workdir.nf --inputs ../data/prots/\*

5.2. Ignore failing process

5.2.1. Problem

A task is expected to fail in a certain condition. You want to ignore the failure and continue the execution of the remaining tasks in the workflow.

5.2.2. Solution

Use the process directive errorStrategy 'ignore' to ignore the error condition.

5.2.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process foo {
  errorStrategy 'ignore'
  script:
  '''
    echo This is going to fail!
    exit 1
  '''
}

process bar {
  script:
  '''
  echo OK
  '''
}

5.2.4. Run it

Run the script with the following command:

nextflow run patterns/ignore-failing-process.nf

5.3. Mock dependency

5.3.1. Problem

You need to synchronize the execution of two processes for which there isn’t a direct input-output relationship, so that process bar is executed only after the completion of process foo.

5.3.2. Solution

Add to the outputs of process foo a channel producing a flag value.

Then use this channel as input for process bar to trigger its execution when the other process completes.

5.3.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Channel
    .fromPath('.data/reads/*.fq.gz')
    .set{ reads_ch }

process foo {
    output:
    val true into done_ch

    script:
    """
    your_command_here
    """
}

process bar {
    input:
    val flag from done_ch
    file fq from reads_ch

    script:
    """
    other_commad_here --reads $fq
    """
}

5.3.4. Run it

Run the example using this command:

nextflow run patterns/mock-dependency.nf

6. Advanced patterns

6.1. Conditional resources definition

6.1.1. Problem

A task in your workflow requires to use an amount of computing resources eg. memory that depends on the size or the name of one or more input files.

6.1.2. Solution

Declare the resource requirements e.g. memory, cpus, etc. in a dynamic manner using a closure.

The closure computes the required amount of resources using the file attributes, such as size, etc., of the inputs declared in the process definition.

6.1.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Channel
    .fromPath('reads/*_1.fq.gz')
    .set { reads_ch }

process foo {
    memory { reads.size() < 70.KB ? 1.GB : 5.GB }

    input:
    file reads from reads_ch

    """
    your_command_here --in $reads
    """
}

6.1.4. Run it

nextflow run patterns/conditional-resources.nf

Note: requires version 0.32.0 or later.

6.2. Conditional process executions

6.2.1. Problem

Two different tasks need to be executed in a mutually exclusive manner, then a third task should post-process the results of the previous execution.

6.2.2. Solution

Use a when statement to conditionally execute two different processes. Each process declares its own output channel.

Then use the mix operator to create a new channel that will emit the outputs produced by the two processes and use it as the input for the third process.

6.2.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
params.flag = false

process foo {
  output:
  file 'x.txt' into foo_ch
  when:
  !params.flag

  script:
  '''
  echo foo > x.txt
  '''
}

process bar {
  output:
  file 'x.txt' into bar_ch
  when:
  params.flag

  script:
  '''
  echo bar > x.txt
  '''
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

6.2.4. Run it

Use the the following command to execute the example:

nextflow run patterns/conditional-process.nf

The processes foo and omega are executed. Run the same command with the --flag command line option.

nextflow run patterns/conditional-process.nf --foo

This time the processes bar and omega are executed.

6.2.5. Alternative solution

Conditionally create the input channels normally (with data) or as empty channels. The process consuming the individual input channels will only execute if the channel is populated. Each process still declares its own output channel.

Then use the mix operator to create a new channel that will emit the outputs produced by the two processes and use it as the input for the third process.

6.2.6. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
params.flag = false

(foo_inch, bar_inch) = ( params.flag
                     ? [ Channel.empty(), Channel.from(1,2,3) ]
                     : [ Channel.from(4,5,6), Channel.empty() ] )

process foo {

  input:
  val(f) from foo_inch

  output:
  file 'x.txt' into foo_ch

  script:
  """
  echo $f > x.txt
  """
}

process bar {
  input:
  val(b) from bar_inch

  output:
  file 'x.txt' into bar_ch

  script:
  """
  echo $b > x.txt
  """
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

6.2.7. Run it

nextflow run patterns/conditional-process2.nf

6.3. Skip process execution

6.3.1. Problem

You have two sequential tasks in your workflow. When an optional flag is specified the first task should not be executed and its input(s) is processed by the second task.

6.3.2. Solution

Use an empty channel, created in a conditional expression, to skip the first process execution when an optional parameter is specified.

Then, define the second process input as a mix of the first process output (when executed) and the input channel.

6.3.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
params.skip = false
params.input = "$baseDir/sample.fq.gz"

Channel.fromPath(params.input).set{ input_ch }

(foo_ch, bar_ch) = ( params.skip
                 ? [Channel.empty(), input_ch]
                 : [input_ch, Channel.empty()] )

process foo {
  input:
  file x from foo_ch

  output:
  file('*.fastq') into optional_ch

  script:
  """
  < $x zcat > ${x.simpleName}.fastq
  """
}

process bar {
  echo true
  input:
  file x from bar_ch.mix(optional_ch)
  """
  echo your_command --input $x
  """
}

6.3.4. Run it

Use the the following command to execute the example:

nextflow run patterns/skip-process-execution.nf

The processes foo and bar are executed. Run the same command with the --skip command line option.

nextflow run patterns/skip-process-execution.nf --skip

This time only processes bar is executed.

6.4. Feedback loop

6.4.1. Problem

You need to repeat the execution of one or more tasks, using the output as the input for a new iteration, until a certain condition is reached.

6.4.2. Solution

Use the output of the last process in the iteration loop as the input for the first process.

To do so, explicitly create the output channel using Channel.create method.

Then define the process input as a mix of the initial input and the process output to which is applied the until operator which defines the termination condition.

6.4.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
params.input = 'hello.txt'

condition = { it.readLines().size()>3 }
feedback_ch = Channel.create()
input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )

process foo {
    input:
    file x from input_ch
    output:
    file 'foo.txt' into foo_ch
    script:
    """
    cat $x > foo.txt
    """
}

process bar {
    input:
    file x from foo_ch
    output:
    file 'bar.txt' into feedback_ch
    file 'bar.txt' into result_ch
    script:
    """
    cat $x > bar.txt
    echo World >> bar.txt
    """
}

result_ch.last().println { "Result:\n${it.text.indent(' ')}" }

6.4.4. Run it

Use the the following command to execute the example:

nextflow run patterns/feedback-loop.nf

6.5. Optional input

6.5.1. Problem

One or more processes have an optional input file.

6.5.2. Solution

Use a special file name to mark the absence of the file parameter.

6.5.3. Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
params.inputs = 'prots/*{1,2,3}.fa'
params.filter = 'NO_FILE'

prots_ch = Channel.fromPath(params.inputs)
opt_file = file(params.filter)

process foo {
  input:
  file seq from prots_ch
  file opt from opt_file

  script:
  def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''
  """
  your_commad --input $seq $filter
  """
}

6.5.4. Run it

Run the script with the following command:

nextflow run patterns/optional-input.nf

Run the same script providing an optional file input:

nextflow run patterns/optional-input.nf --filter foo.txt

6.6. Optional output

6.6.1. Problem

A task in your workflow is expected to not create an output file in some circumstances.

6.6.2. Solution

Declare such output as an optional file.

6.6.3. Code

1
2
3
4
5
6
7
8
9
process foo {
  output:
  file 'foo.txt' optional true into foo_ch

  script:
  '''
  your_command
  '''
}

6.6.4. Run it

Use the the following command to execute the example:

nextflow run patterns/optional-output.nf

6.7. Execute when empty

6.7.1. Problem

You need to execute a process if a channel is empty.

6.7.2. Solution

Use the ifEmpty operator to emit a marker value to trigger the execution of the process.

6.7.3. Example

1
2
3
4
5
6
7
8
9
10
11
process foo {
  input:
  val x from ch.ifEmpty { 'EMPTY' }
  when:
  x == 'EMPTY'

  script:
  '''
  your_command
  '''
}

6.7.4. Run it

The command run the script with an empty channel:

nextflow run patterns/process-when-empty.nf

Use the following command to provide the same script some input files, that prevents the process to be executed:

nextflow run patterns/process-when-empty.nf --inputs ../data/prots/\*