nf-parquet is a Nextflow plugin able to read and write parquet files

This plugin provides with several Functions to work with parquet files as splitParquet, to emit the content of a file, or toParquet to write a Collection of records

WARNING

This plugin relay heavily in the Record Java concept introduced in recent versions of Java, so it requires Java version 17 as a minimum

CSV vs Parquet

A csv file is a text file where each line represents a record and fields are separated by some special character(";" for example)

Parquet by opposite is a binary file and can’t be opened with a simple editor, but file size is smaller and has a better read performance

Configuration

plugins {
     id "nf-parquet"
}

Basic example

include { splitParquet } from 'plugin/nf-parquet'

channel.fromPath( params.file ).splitParquet()
        | view

this pipeline will read the parquet file params.file and emit each record as a single item in the channel

SchemaS

First thing to do (only one time meanwhile your schema doesn’t change) is to define and compile the schemaS to use, represented as Record Java classes

In your nextflow pipeline repository create a folder schemas (for example) and a subfolder myrecords (for example)

Create two records java:

SingleRecord.java
package myrecords;
record SingleRecord(long id, String name) {
}
CustomRecord.java
package myrecords;
record CustomRecord(long id, String name, long timestamp) {

}
INFO

As you can see, they’re pure Java records

CustomRecords will represent a "full" record we want to write/read from a parquet file meanwhile SingleRecord will represent a projection, a subset of fields from CustomRecords. Using projections can improve the CPU and time spent on reading a huge file as the parquet reader is able to skip non-interesting records

Now create a module-info.java file:

module-info.java
module myrecords {
    opens myrecords;
}

This file is required to allow the access of our schemas to all modules (and avoid classpath loaders issues)

Now compile your scemas with:

javac --release 17 -d lib/ schemas/myrecords/*

If all goes well you’ll have in your lib folder 3 classes. Nextflow will attach these classes in the classpath so nf-plugin will be able to inspect them.

WARNING

This step is only required meanwhile your schemas not change. In case you need to add/remove fields or create new schemas (records java) you need to execute the javac again

TIP

Remember to add to the repository the schemas file. Maybe you’ll want to add the lib folder with the binaries

Reading a projection

In this example, we’ll read these records but only a subset of fields

include { splitParquet } from 'plugin/nf-parquet'

import myrecords.*

    channel.fromPath( params.file ).splitParquet( record:SingleRecord ) //(1)
        | view
  1. Read only id and name (defined in SingleRecord)

Writing a parquet file

From version 0.2.0 you can write a parquet file using the toParquet function

Say we want to read an input parquet file with SingleRecord and write it to another file with CustomRecord

include { splitParquet; toParquet } from 'plugin/nf-parquet'

import myrecords.*

workflow{
    Channel.fromPath( params.input )
        .splitParquet( [record:SingleRecord] )
	.map({ record ->
            new CustomRecord(record.id, record.name, new Date().time )
        })
        .toParquet( params.output, [record:AugmentedRecord])
        | view
}

Operators

splitParquet

record

"record" option allows specifying a Record java class instead of a raw Map. This option can improve parsing time as the reader can use only a subset of the fields to read

by

"by" option allows specifying a chunk size to read records.

WARNING

Chunks are stored in memory by default. When splitting large files, specify file: true to save the chunks into files to avoid running out of memory. See the list of options below for details.

elem

The index of the element to split when the source items are lists or tuples (default: first file object or first element).

file

When true, it saves each split to a file. Use a string instead of true value to create split files with a specific name (split index number is automatically added). Finally, set this attribute to an existing directory, to save the split files into the specified directory. In this case, the objects emitted are the PathS of every file

each

with each option you can provide a closure to be called per each row readed

countParquet

Similar to splitParquet, it emits the number of the records in the parquet file

toParquet

record

"record" option allows specifying a Record java class instead of a raw Map. This option is required as the writer needs to know the structure of the record

by

"by" option allows specifying a chunk size to write records, so I/O operations performance can be improved

WARNING

Chunks are stored in memory, try to use a good value to avoid memory exceptions.