Git Product home page Git Product logo

flowcraft's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flowcraft's Issues

Add definition of compute resources to Process classes

Add the cpu and ram attributes to the Process base class. These attributes will be used to build the nextflow configuration file based on the preset values. They can be later edited in the nextflow config as usual.

Provide process directives on runtime

Some directives such as version, cpus, ram or container could be provided for each process at runtime. This could be accomplished with a JSON notation in the pipeline string:

-t processA={version:1.0,cpus:4} processB={memory:4GB}

This would be parsed by the engine and update the process's attributes accordingly.

Move initial channel definition to templates

At the moment, the definition of the initial channels is done in the HeaderSkeleton file. However, this should be move to each template so that only the relevant channels are defined when creating the pipeline. This raises the issue of channels that are used by multiple processes. This system should be smart enough to remove any duplicate channel creation.

Future-proof Process structure

Context on assemblerflow's Processes

As detailed in the development docs, every new component that we want to include in assemblerflow requires the creation of a new python class that inherits from the Process abstract class. This Process class contains all the heavy-lifting code and attributes that allow assemblerflow's engine to build a pipeline and it abstracts the majority of the work from the person adding the new component. The new classes that are created, simply need to define attributes concerning the input and output types, parameters, secondary inputs, etc. So, adding a new process/module class can be as simple as:

class Skesa(Process):
    """Skesa process template interface"""

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self.input_type = "fastq"
        self.output_type = "fasta"

        self.directives = {"skesa": {
            "cpus": 4,
            "memory": "{ 5.GB * task.attempt }",
            "container": "ummidock/skesa",
            "version": "0.2.0-3",
            "scratch": "true"
        }}

There are several other attributes that can be specified, but these classes are basically descriptions of the component, with basically no additional code required.

The problem

Currently, both the Processabstract class and all the actual classes that describe assemblerflow's modules are stored in the same file in assemblerflow.generator.process. However, this is hardly sustainable in the medium/long-term where more and more component classes are expected to be added. It also creates a problem for the automated generation of the documentation, since the API and description of the Process abstract class is mainly dev-oriented, while the descriptions of the actual components like fastqc, `spades, etc, are user-oriented. Finally, putting everything in the same file without any type of sorting or categorisation will make it hard to navigate this file.
If we want to make it easy for other people to contribute, this has to be improved.

Potential solution

The most obvious solution is to move the component classes to another file, or directory in the assemblerflow package as leave the abstract Process class in the same place. Additionally, we could sort the classes by some categories, such as "quality control" or "assembly". So the idea would be to create a directory in

assemblerflow/generator/process_market # name up to discussion

And inside, create a file structure according to the type of component:

assemblerflow/generator/process_market/quality_control.py
assemblerflow/generator/process_market/assembly.py
(...)

The suggested categories for what we have now could be:

  • downloads.py
  • quality_control.py
  • assembly.py
  • assembly_{polish|processing}.py
  • annotation.py
  • mlst.py
  • mapping.py

New categories could then be added as needed, we would just have to import the new modules in the engine. Another advantage of this approach would be the natural sorting of the classes into categories for the user documentation.

Log files into results folder

It would be nice to have an option to write the logs of the modules to a folder specified by the user instead of the work directory.

Dynamic raw channels

Currently, assemblerflow has the limitation of accepting only FastQ files as the first input, and processes that received these "raw" FastQ files must have the self.input_type = "raw" attribute.
This creates an obvious limitation where only three processes (integrity_coverage, seq_typing and patho_typing) can be applied at the beginning of the pipeline. Moreover, it is not possible to start the pipeline with other input types, such as assemblies in fasta or other generic inputs.

Creating dynamic raw channels

Raw channels are defined as channels that come directly from the user (e.g. --fastq for FastQ files).

Instead of defining processes with self.input_type = "raw" explicitly, each supported input_type could have a raw channel implicitly added by the assemblerflow engine.

If a process has self.input_type = "fastq", the engine would automatically set a class attribute like:

self.main_user_input = {
    "params": "fastq",
    "channel": "IN_fastq_raw = Channel.fromFilePairs(params.fastq)"
}

If the first channel is a process with self.input_type = "assembly", it would change to:

self.main_user_input = {
    "params": "assembly"
    "channel": "IN_fasta_raw = Channel.fromPath(params.assembly)"
}

This means that we would need to have a mapping between input_types and their raw channels:

# Different input_types (keys in this dict) may map to the same 
# raw input channels
raw_mapping = {
    "fastq" : {params: "fastq", channel: "IN_fastq_raw = Channel.fromFilePairs(params.fastq)"},
    "assemby": {params: "fasta", channel: "IN_fastq_raw = Channel.fromFilePairs(params.fasta)"}
}

Whenever a new raw type would be requried, this dictionary would need to be updated, as well as the nextflow.config file that holds all params. If a user specifies a process with an input_type that is not present in raw_mapping, it would mean that the input_type is not supported as the first process, and it would raise an exception.

With these changes, there would be no need to change anything in the Process classes themselves, nor any particular addition by the process developers, and it would allow the pipeline to start from any process that has a supported input type.

Add nextflow tracer parser

This parser should return a JSON/dict object with the following information for each process execution (line) in the trace file:

  • task_id
  • work directory
  • tag (usually the sample id)
  • status
  • exit code
  • start time
  • container
  • cpus allocated
  • time allocated
  • disk allocated
  • memory allocated
  • duration
  • real time
  • % cpu
  • % mem
  • rss
  • vmem

Add params.config to pipeline generation

Akin to containers.config and resources.config, the specific parameters for a pipeline should appear in a params.config file with the parameters specific to the used processes.

assemblerflow_utils not in templates folder

When I run assemblerflow to generate a pipeline with --include-templates the assemblerflow_utils folder does not appear. I have been simply getting from GitHub each time I need it, but thought maybe I should let you know! Thanks
Cheers
Kristy

Add basic inspector overview

This overview should be the default inspect view of the pipeline run when using assemblerflow inspect. It should display:

  • Total number of processes
  • Rough estimate of percentage of completion (based on the inferred samples and assuming that they should pass through each process)
  • A ordered list/tsv view of procecess and the number of samples already completed for each

Add better error detection and handling for assembler processes

As of now, failure in the assembler processes may trigger an error in the .status file but it does not trigger the error for the nextflow process. This leads to the assembly not being performed due to some reason and the resume cannot repeat the process, since it terminated successfully as far as nextflow knows.

The optional true tag should be removed from these processes to force the error in these processes and the error strategy could be changed to ignore, so that it does not compromise the rest of the pipeline. Coupled with the adaptive RAM directive in #26 , this should allow for a more robust and informative approach.

Add tests for pipeline parsers

There can be at least two layers of pipeline string (user provided) tests.

  • General formatting tests that checks if the overall pipeline string is OK before the actual parsing.

  • Checking if the connections established in the pipeline string are compliant.

Add group IDs for compiler processes

Currently, Compiler processes receive data from all pipeline components that emit data to that compiler. This is usually the desired behaviour. The exception is when there is a fork in the pipeline, and the Compiler should receive from multiple groups of components except of one. For instance, assuming that proc1 and proc2 emit data to the same compiler:

A ( B  proc1 proc2 | C proc1 proc2 )

It may be necessary to emit the data from the two lanes independently. To address this issue, a new directive could be included to assign a group ID to each component. With this approach, sending two batches could be accomplished in the following way:

A ( B proc1={'group':'1'} proc2={'group':'1'} | C proc1 proc2 )

For each group (including the default with no group ID), a compiler will be added to handle the emitted data.

Print process information

Add an option to print the processes currently available in assemblerflow. The short list options should include only the name and short description (1-2 lines). A detailed list option should include:

  • Process name
  • Description
  • Input type
  • Output type
  • Dependencies
  • Conflicts

Improve template versioning system

The issue

The current versioning system for each template is repeated in each script. This is done with the build_versions function, which gathers the template metadata and other functions that get the version of third party software. The first issue, is that the version gathering of the template itself is repeated in all templates. The simplest example is this:

def build_versions():
    logger.debug("Checking module versions")

    ver = [{
        "program": __template__,
        "version": __version__,
        "build": __build__
    }]
    logger.debug("Versions list set to: {}".format(ver))

    with open(".versions", "w") as fh:
        fh.write(json.dumps(ver, separators=(",", ":")))

Additional functions can then be created that expand the ver list:

    def get_fastqc_version():

        try:

            cli = ["fastqc", "--version"]
            p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
            stdout, _ = p.communicate()

            version = stdout.strip().split()[1][1:].decode("utf8")

        except Exception as e:
            logger.debug(e)
            version = "undefined"

        return {
            "program": "FastQC",
            "version": version,
        }

This can become quite cumbersome to include in all templates, particularly the parts that are the same for every template.

Potential solution

The execution of the main function in each template can be decorated with a class that handles all of this, and provide some other features (such as error handling). A proposal for such class is defined here.

Then, we decorate the main function:

@MainWrapper
def main(fastq_id, fastq_pair, gsize, minimum_coverage, opts):

This means that the main function will be actually executed in this wrapper:

    def __call__(self, *args, **kwargs):

        context = self.f.__globals__

        logger = context.get("logger", None)

        try:
            self.build_versions()
            self.f(*args, **kwargs)
        except:
            if logger:
                logger.error("Module exited unexpectedly with error:"
                             "\\n{}".format(traceback.format_exc()))
            log_error()

The build_versions method will do the actual heavy lifting.

    def build_versions(self):
        version_storage = []

        template_version = self.context.get("__version__", None)
        template_program = self.context.get("__template__", None)
        template_build = self.context.get("__build__", None)

        if template_version and template_program and template_build:
            version_storage.append({
                "program": template_program,
                "version": template_version,
                "build": template_build
            })

        for var, obj in self.context:
            if var.startswith("__set_version"):
                version_storage.append(obj())

        with open(".versions", "w") as fh:
            fh.write(json.dumps(version_storage, separators=(",", ":")))

It will automatically fetch the relevant metadata from the template, and scan the context for functions with a specific signature (name starts with __set_version). These are then executed and their result added to the version list.

Overall, this means that templates that do not used third-party software don't even need to set anything related to versions, other than defining the metadata. Whenever one or more third-party software are used within a template, a specific version for each software can be created and will be automatically executed by the decorator.

This new system is already implemented in the integrity_coverage and fastqc templates, as examples.

Add usage documentation

We should add usage documentation for assembling a pipeline, which should explain very well the available processes (modules) and what they can do. Maybe we can also give some example usages.

One cool idea related with this would be to create a pipeline assembler GUI, but that is way ahead of us atm. For now, we should focus on explaining the usage.

Wrong fork when there are multiple fork sources with the same name

The following pipeline:

    integrity_coverage (
        spades={'memory':'\'50GB\''} |
        skesa={'memory':'\'40GB\'','cpus':'4'} |
        trimmomatic fastqc (
            spades pilon (abricate={'extra_input':'default'} | prokka) |
            skesa pilon (abricate | prokka)
        )
    )

Forks from both pilon processes at the end of the pipeline. However, the resulting DAG reveals that the first pilon fork is being ignored and all abricate and prokka processes are added after the second pilon.

image

This is caused by a bug in the detection of the fork source when parsing the pipeline string.

Dynamic nextflow.config file

The nextflow config file currently contains information for all processes in assemblerflow. Ideally, only information for the processes in the pipeline should be included.

Allow merge

At the moment assemblerflow allows to fork processes but if you want to add another process after the fork, you have to add after each process in the fork. Example:

assemblerflow.py build -t "integrity_coverage fastqc_trimmomatic remove_host (spades card_rgi | metaspades card_rgi | megahit card_rgi)"

It's much more intuitive to instead do:

assemblerflow.py build -t "integrity_coverage fastqc_trimmomatic remove_host (spades | metaspades | megahit) card_rgi "

This requires adding a new operator merge to assemblerflow.

Explicit forks

Here is a proposal for an explicit forking system of any processes that can be set at run time by the user without any special requirements in the Process class. The focus of this system should be in simplicity and readability for the user.

Proposal 1

A linear pipeline is currently defined simply by:

A B C

Forks could be initiated by () and different "lanes" could be separated by a special character like "," or "|". For instance, forking the A process into two, could be simply:

A (B | C)

Process A could be forked N times, by separating each process by the | special character.

A (B | C | D | E)

Each "lane" created by the fork, can then continue normally in a linear fashion. If we want to attach more processes after the B fork, this could be done like:

A (B Y Z | C | D | E)

Subsequent forks could be done using the () again:

A (B | C ( E | F))

If we support pipelines defined via an input file, indentation could help with the readability:

A (
    B Z Y |
    C (
        E | 
        F
    )
)

A more real example would be:

trimmomatic fastqc (spades pilon | skesa pilon)

# or

trimmomatic fastqc (
    spades pilon |
    skesa pilon
)

Consider renaming this project or breaking into 2 project

I've just noticed its called assemblerflow and says A NextFlow pipeline assembler for bacterial genomics but it does a lot more than assemble genomes.

See also #42 about supporting contigs too. The goal of this project is to use the assemblies for AMR, Virulome, cgMLST etc (ie. replace Nullarbor)

If it's not, maybe separate into assembly, and then the other genotyping?

Add skesa module

Add the Skesa assembler module to the pipeline as an alternative to spades.

Add container attribute to Process

Add the container attribute to the Process base class. This will be used to determine the container directive in the nextflow configuration file.

Add skesa check for empty file at the end of assembly

Even when skesa crashes (out of RAM for instance) it creates an empty assembly file, which passes through nextflow output channels. A check should be added to confirm that only not empty files can pass, otherwise the process fails.

Module to collect reports

Create a terminal module that collects the .report.json files generated throughout the pipeline and adds the fields required for parsing by report-nf

Sanity error when pipeline string includes return in the terminal

The following command:

assemblerflow.py build -t "trimmomatic fastqc ( spades | skesa pilon (abricate | prokka | chewbbaca) )
" -o my_pipe.nf

contains a valid pipeline, but raises an inSanityError:

Checking pipeline for errors...
inSANITY ERROR: After a fork it is not allowed to have any alphanumeric value.

Remove integrity_coverage dependency from spades

The dependency exists to provide the maximum read length to set the spades kmer list. However, this task could be easily and quickly performed in the spades process, when integrity_coverage is not included in the pipeline.

Add documentation for recipe creation.

We should add the necessary information to create an assemblerflow recipe. A placeholder doc file has been placed in docs/create_recipes.rst and should appear on the Contributing section of the docs.

Add bioconda recipe for assemblerflow

This would be the easiest way to install all dependencies of assemblerflow (including java and nextflow) with a single command.

We would also have to decide which container engine to support for the default execution. Out of the three that are supported by nextflow (docker, singularity and shifter), only singularity is installable on a latptop or server AND does not require sudo permissions to run. Singularity is also on bioconda, so perhaps this would be the ideal solution?

Improve logging and error handling

Currently, logging is only available on debug mode. Logging should be expanded to normal working condition and provide information to the user about the pipeline that is being built.

In addition, custom exception should exist with a well formatted message instead of simply raising an exception.

Add real world test to .travis.yml

Maybe download a read set and a scheme and run allele caller on it?

Will need skesa binary - spades is too slow to TRAVIS to run in time.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.