Workflow

Manages the connections between tools

Declaration

There are two major ways to construct a workflow:

class janis.WorkflowBuilder(identifier: str = None, friendly_name: str = None, version: str = None, metadata: janis_core.utils.metadata.WorkflowMetadata = None, tool_provider: str = None, tool_module: str = None, doc: str = None)[source]
class janis.Workflow(**connections)[source]

Advanced Workflows

Janis allows you to dynamically create workflows based on inputs. More information can be found on the Dynamic Workflows page.

Overview

The janis.Workflow and janis.WorkflowBuilder classes exposes inputs, and manages the connections between these inputs, tools and exposes some outputs.

A janis.WorkflowBuilder is the class used inline to declare workflows. The janis.Workflow class should only be inherited through subclasses.

A workflow does not directly execute, but declares what inputs a janis.CommandTool should receive.

A representation of a workflow can be exported to cwl or wdl through the :method:`janis.Workflow.translate()` function.


Translating

Currently Janis supports two translation targets:

  1. Common Workflow Language
  2. Workflow Description Language
Workflow.translate(translation: Union[str, janis_core.translationdeps.supportedtranslations.SupportedTranslation], to_console=True, tool_to_console=False, to_disk=False, write_inputs_file=True, with_docker=True, with_hints=False, with_resource_overrides=False, validate=False, should_zip=True, export_path='./{name}', merge_resources=False, hints=None, allow_null_if_not_optional=True, additional_inputs: Dict[KT, VT] = None, max_cores=None, max_mem=None, max_duration=None, allow_empty_container=False, container_override: dict = None)

Structure of a workflow

A workflow has the following _nodes_:

Once an node has been added to the workflow, it may be referenced through dot-notation on the workflow. For this reason, identifiers have certain naming restrictions. In the following examples we’re going to create an inline workflow using the WorkflowBuilder class.

Creating an input

An input requires a unique identifier (string) and a janis.DataType.

Workflow.input(identifier: str, datatype: Union[Type[Union[str, float, int, bool]], janis_core.types.data_types.DataType, Type[janis_core.types.data_types.DataType]], default: any = None, value: any = None, doc: Union[str, janis_core.tool.documentation.InputDocumentation, Dict[str, any]] = None)

Create an input node on a workflow :return:

The input node is returned from this function, and is also available as a property on a workflow (accessible through dot-notation OR index notation).

import janis as j

w = j.WorkflowBuilder("myworkflow")
myInput = w.input("myInput", String)
myInput == w.myInput == w["myInput"] # True

Note

Default vs Value: The input

Creating a step

A step requires a unique identifier (string), a mapped tool (either a janis.CommandTool or janis.Workflow called with it’s inputs), scattering information (if required).

Workflow.step(identifier: str, tool: janis_core.tool.tool.Tool, scatter: Union[str, List[str], janis_core.utils.scatter.ScatterDescription] = None, _foreach: Union[janis_core.operators.selectors.Selector, List[janis_core.operators.selectors.Selector]] = None, when: Optional[janis_core.operators.operator.Operator] = None, ignore_missing=False, doc: str = None)

Construct a step on this workflow.

Parameters:
  • identifier – The identifier of the step, unique within the workflow.
  • tool – The tool that should run for this step.
  • scatter (Union[str, ScatterDescription]) – Indicate whether a scatter should occur, on what, and how.
  • when (Optional[Operator]) – An operator / condition that determines whether the step should run
  • ignore_missing – Don’t throw an error if required params are missing from this function
  • _foreach – NB: this is unimplemented. Iterate for each value of this resolves list, where you should use the “ForEachSelector” to select each value in this iterable.
Returns:

Janis will throw an error if all the required inputs are not provided. You can provide the parameter ignore_missing=True to the step function to skip this check.

from janis.unix.tools.echo import Echo

# Echo has the required input: "inp": String
# https://janis.readthedocs.io/en/latest/tools/unix/echo.html

echoStep = w.step("echoStep", Echo(inp=w.myInput))
echoStep == w.echoStep == w["echoStep"] # True

Creating an output

An output requires a unique identifier (string), an output source and an optional janis.DataType. If a data type is provided, it is type-checked against the output source. Don’t be put off by the automatically generated interface for the output method, it’s there to be exhaustive for the type definitions.

Here is the (simplified) method definition:

def output(
    self,
    identifier: str,
    datatype: Optional[ParseableType] = None,
    source: Union[Selector, ConnectionSource]=None # or List[Selector, ConnectionSource]
    output_folder: Union[str, Selector, List[Union[str, Selector]]] = None,
    output_name: Union[bool, str, Selector, ConnectionSource] = True, # let janis decide output name
    extension: Optional[str] = None, # file extension if janis names file
    doc: Union[str, OutputDocumentation] = None,
):
Workflow.output(identifier: str, datatype: Union[Type[Union[str, float, int, bool]], janis_core.types.data_types.DataType, Type[janis_core.types.data_types.DataType], None] = None, source: Union[List[Union[janis_core.operators.selectors.Selector, janis_core.graph.node.Node, janis_core.operators.selectors.StepOutputSelector, Tuple[janis_core.graph.node.Node, str]]], janis_core.operators.selectors.Selector, janis_core.graph.node.Node, janis_core.operators.selectors.StepOutputSelector, Tuple[janis_core.graph.node.Node, str]] = None, output_folder: Union[str, janis_core.operators.selectors.Selector, List[Union[janis_core.operators.selectors.Selector, str]]] = None, output_name: Union[bool, str, janis_core.operators.selectors.Selector, janis_core.graph.node.Node, janis_core.operators.selectors.StepOutputSelector, Tuple[janis_core.graph.node.Node, str]] = True, extension: Optional[str] = None, doc: Union[str, janis_core.tool.documentation.OutputDocumentation] = None)

Create an output on a workflow

Parameters:
  • identifier – The identifier for the output
  • datatype – Optional data type of the output to check. This will be automatically inferred if not provided.
  • source – The source of the output, must be an output to a step node
  • output_folder

    Decides the output folder(s) where the output will reside. If a list is passed, it represents a structure of nested directories, the first element being the root directory.

    • None (default): the assistant will copy to the root of the output directory
    • Type[Selector]: will be resolved before the workflow is run, this means it may only depend on the inputs

    NB: If the output_source is an array, a “shard_n” will be appended to the output_name UNLESS the output_source also resolves to an array, which the assistant can unwrap multiple dimensions of arrays ONLY if the number of elements in the output_scattered source and the number of resolved elements is equal.

  • output_name
    Decides the name of the output (without extension) that an output will have:
    • True (default): the assistant will choose an output name based on output identifier (tag),
    • None / False: the assistant will use the original filename (this might cause filename conflicts)
    • Type[Selector]: will be resolved before the workflow is run, this means it may only depend on the inputs
    NB: If the output_source is an array, a “shard_n” will be appended to the output_name UNLESS the output_source
    also resolves to an array, which the assistant can unwrap multiple dimensions of arrays.
  • extension – The extension to use if janis renames the output. By default, it will pull the extension from the inherited data type (eg: CSV -> “.csv”), or it will attempt to pull the extension from the file.
Returns:

janis.WorkflowOutputNode

You are unable to connect an input node directly to an output node, and an output node cannot be referenced as a step input.

# w.echoStep added to workflow
w.output("out", source=w.echoStep)

Subclassing Workflow

Instead of creating inline workflows, it’s possible to subclass janis.Workflow, implement the required methods which allows a tool to have documentation automatically generated.

Required methods:

Workflow.id() → str
Workflow.friendly_name()

Overriding this method is not required UNLESS you distribute your tool. Generating the docs will fail if your tool does not provide a name.

Returns:A friendly name of your tool
Workflow.constructor()[source]

A place to construct your workflows. This is called directly after initialisation. :return:

Within the constructor method, you have access to self to add inputs, steps and outputs.

OPTIONAL:

Workflow.bind_metadata()

A convenient place to add metadata about the tool. You are guaranteed that self.metadata will exist. It’s possible to return a new instance of the ToolMetadata / WorkflowMetadata which will be rebound. This is usually called after the initialiser, though it may be called multiple times. :return:

Examples

Inline example

The Echo tool has one inputs inp of type string, and one output out.
import janis as j
from janis.unix.tools.echo import Echo

w = j.WorkflowBuilder("my_workflow")
w.input("my_input", String)
echoStep = w.step("echo_step", Echo(inp=w.my_input))
w.output("out", source=w.echo_step)

# Will print the CWL, input file and relevant tools to the console
w.translate("cwl", to_disk=False)  # or "wdl"

Subclass example

import janis as j
from janis.unix.tools.echo import Echo

class MyWorkflow(j.Workflow):

    def id(self):
        return "my_workflow"

    def friendly_name(self):
        return "My workflow"

    def constructor(self):
        self.input("my_input", String)
        echoStep = w.step("echo_step", Echo(inp=self.my_input))
        self.output("out", source=self.echo_step)

    # optional

    def metadata(self):
        self.metadata.author = "Michael Franklin"
        self.metadata.version = "v1.0.0"
        self.metadata.documentation = "A tool that echos the input to standard_out