nf-parquet is a Nextflow plugin able to read and write parquet files
This plugin provides with several Functions to work with parquet files
as splitParquet, to emit the content of a file, or toParquet to
write a Collection of records
- WARNING
-
This plugin relay heavily in the
RecordJava concept introduced in recent versions of Java, so it requires Java version 17 as a minimum
CSV vs Parquet
A csv file is a text file where each line represents a record and fields are separated by some special character(";" for example)
Parquet by opposite is a binary file and can’t be opened with a simple editor, but file size is smaller and has a better read performance
Configuration
plugins {
id "nf-parquet"
}
Basic example
include { splitParquet } from 'plugin/nf-parquet'
channel.fromPath( params.file ).splitParquet()
| view
this pipeline will read the parquet file params.file and emit each record as a single item in the channel
SchemaS
First thing to do (only one time meanwhile your schema doesn’t change) is to define and compile the schemaS to use,
represented as Record Java classes
In your nextflow pipeline repository create a folder schemas (for example) and a subfolder myrecords (for example)
Create two records java:
package myrecords;
record SingleRecord(long id, String name) {
}
package myrecords;
record CustomRecord(long id, String name, long timestamp) {
}
- INFO
-
As you can see, they’re pure Java records
CustomRecords will represent a "full" record we want to write/read from a parquet file meanwhile SingleRecord
will represent a projection, a subset of fields from CustomRecords.
Using projections can improve the CPU and time spent on reading a huge file as the parquet reader is able to skip
non-interesting records
Now create a module-info.java file:
module myrecords {
opens myrecords;
}
This file is required to allow the access of our schemas to all modules (and avoid classpath loaders issues)
Now compile your scemas with:
javac --release 17 -d lib/ schemas/myrecords/*
If all goes well you’ll have in your lib folder 3 classes. Nextflow will attach these classes in the classpath
so nf-plugin will be able to inspect them.
- WARNING
-
This step is only required meanwhile your schemas not change. In case you need to add/remove fields or create new schemas (records java) you need to execute the
javacagain - TIP
-
Remember to add to the repository the schemas file. Maybe you’ll want to add the lib folder with the binaries
Reading a projection
In this example, we’ll read these records but only a subset of fields
include { splitParquet } from 'plugin/nf-parquet'
import myrecords.*
channel.fromPath( params.file ).splitParquet( record:SingleRecord ) //(1)
| view
-
Read only id and name (defined in SingleRecord)
Writing a parquet file
From version 0.2.0 you can write a parquet file using the toParquet function
Say we want to read an input parquet file with SingleRecord and write it to another file with CustomRecord
include { splitParquet; toParquet } from 'plugin/nf-parquet'
import myrecords.*
workflow{
Channel.fromPath( params.input )
.splitParquet( [record:SingleRecord] )
.map({ record ->
new CustomRecord(record.id, record.name, new Date().time )
})
.toParquet( params.output, [record:AugmentedRecord])
| view
}
Operators
splitParquet
record
"record" option allows specifying a Record java class instead of a raw Map. This option can improve parsing time as the reader can use only a subset of the fields to read
by
"by" option allows specifying a chunk size to read records.
- WARNING
-
Chunks are stored in memory by default. When splitting large files, specify
file: trueto save the chunks into files to avoid running out of memory. See the list of options below for details.
elem
The index of the element to split when the source items are lists or tuples (default: first file object or first element).
file
When true, it saves each split to a file. Use a string instead of true value to create split files with a specific name (split index number is automatically added). Finally, set this attribute to an existing directory, to save the split files into the specified directory. In this case, the objects emitted are the PathS of every file
each
with each option you can provide a closure to be called per each row readed
countParquet
Similar to splitParquet, it emits the number of the records in the parquet file
toParquet
record
"record" option allows specifying a Record java class instead of a raw Map. This option is required as the writer needs to know the structure of the record
by
"by" option allows specifying a chunk size to write records, so I/O operations performance can be improved
- WARNING
-
Chunks are stored in memory, try to use a good value to avoid memory exceptions.