Source code for janis_core.tool.commandtool
import re
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any, Union, Callable, Set, Tuple
from janis_core.tool.documentation import (
InputDocumentation,
DocumentationMeta,
OutputDocumentation,
InputQualityType,
)
from janis_core.utils.validators import Validators
from janis_core.types import ParseableType, get_instantiated_type, Stdout, Stderr
from janis_core.types.common_data_types import String, Filename
from janis_core.tool.tool import Tool, ToolType, TInput, TOutput
from janis_core.translationdeps.supportedtranslations import SupportedTranslation
from janis_core.utils.logger import Logger
from janis_core.operators import Selector, Operator
from janis_core.utils.metadata import ToolMetadata
[docs]class ToolArgument:
expr_pattern = "\$\(.*\)"
def __repr__(self):
attrs = ", ".join(
f"{k}={repr(v)}"
for k, v in self.__dict__.items()
if not k.startswith("_") and not callable(v)
)
return f"{self.__class__.__name__}({attrs})"
[docs] def __init__(
self,
value: Any,
prefix: Optional[str] = None,
position: Optional[int] = 0,
separate_value_from_prefix=None,
doc: Optional[Union[str, DocumentationMeta]] = None,
shell_quote: bool = None,
):
"""
A ``ToolArgument`` is a CLI parameter that cannot be override (at runtime).
The value can
:param value:
:type value: ``str`` | ``janis.InputSelector`` | ``janis.StringFormatter``
:param position: The position of the input to be applied. (Default = 0, after the base_command).
:param prefix: The prefix to be appended before the element. (By default, a space will also be applied, see ``separate_value_from_prefix`` for more information)
:param separate_value_from_prefix: (Default: True) Add a space between the prefix and value when ``True``.
:param doc: Documentation string for the argument, this is used to generate the tool documentation and provide
:param shell_quote: Stops shell quotes from being applied in all circumstances, useful when joining multiple commands together.
"""
self.prefix: Optional[str] = prefix
self.value = value
self.position: Optional[int] = position
self.is_expression = (
isinstance(self.value, Selector)
or (
isinstance(self.value, str)
and re.match(self.expr_pattern, self.value) is not None
)
if self.value is not None
else None
)
self.separate_value_from_prefix = separate_value_from_prefix
self.doc: DocumentationMeta = (
doc if isinstance(doc, InputDocumentation) else InputDocumentation(doc)
)
self.shell_quote = shell_quote
if (
self.prefix
and self.separate_value_from_prefix is not None
and not self.separate_value_from_prefix
and not self.prefix.endswith("=")
):
# I don't really know what this means.
Logger.warn(
f"Argument ({self.prefix}{self.value}) is not separating and did not end with ='"
)
# This should really be a CommandToolInput
[docs]class ToolInput(ToolArgument):
[docs] def __init__(
self,
tag: str,
input_type: ParseableType,
position: Optional[int] = None,
prefix: Optional[str] = None,
separate_value_from_prefix: bool = None,
prefix_applies_to_all_elements: bool = None,
presents_as: str = None,
secondaries_present_as: Dict[str, str] = None,
separator: str = None,
shell_quote: bool = None,
localise_file: bool = None,
default: Any = None,
doc: Optional[Union[str, InputDocumentation]] = None,
):
"""
A ``ToolInput`` represents an input to a tool, with parameters that allow it to be bound on the command line.
The ToolInput must have either a position or prefix set to be bound onto the command line.
:param tag: The identifier of the input (unique to inputs and outputs of a tool)
:param input_type: The data type that this input accepts
:type input_type: ``janis.ParseableType``
:param position: The position of the input to be applied. (Default = 0, after the base_command).
:param prefix: The prefix to be appended before the element. (By default, a space will also be applied, see ``separate_value_from_prefix`` for more information)
:param separate_value_from_prefix: (Default: True) Add a space between the prefix and value when ``True``.
:param prefix_applies_to_all_elements: Applies the prefix to each element of the array (Array inputs only)
:param shell_quote: Stops shell quotes from being applied in all circumstances, useful when joining multiple commands together.
:param separator: The separator between each element of an array (defaults to ' ')
:param localise_file: Ensures that the file(s) are localised into the execution directory.
:param default: The default value to be applied if the input is not defined.
:param doc: Documentation string for the ToolInput, this is used to generate the tool documentation and provide
hints to the user.
"""
super().__init__(
value=None,
prefix=prefix,
position=position,
separate_value_from_prefix=separate_value_from_prefix,
doc=None,
shell_quote=shell_quote,
)
self.doc: InputDocumentation = (
doc if isinstance(doc, DocumentationMeta) else InputDocumentation(doc=doc)
)
# if default is not None:
# input_type.optional = True
if not Validators.validate_identifier(tag):
raise Exception(
f"The identifier '{tag}' was not validated because {Validators.reason_for_failure(tag)}"
)
self.tag: str = tag
self.input_type: ParseableType = get_instantiated_type(input_type)
self.default = default
self.prefix_applies_to_all_elements = prefix_applies_to_all_elements
self.separator = separator
self.localise_file = localise_file
self.presents_as = presents_as
self.secondaries_present_as = secondaries_present_as
if self.secondaries_present_as:
if not self.input_type.secondary_files():
raise Exception(
f"The ToolOutput '{self.id()}' requested a rewrite of secondary file extension through "
f"'secondaries_present_as', but the type {self.input_type.id()} not have any secondary files."
)
secs = set(self.input_type.secondary_files())
to_remap = set(self.secondaries_present_as.keys())
invalid = to_remap - secs
if len(invalid) > 0:
raise Exception(
f"Error when constructing output '{self.id()}', the secondaries_present_as contained secondary "
f"files ({', '.join(invalid)}) that were not found in the output "
f"type '{self.input_type.id()}' ({', '.join(secs)})"
)
def id(self):
return self.tag
# This should really be a CommandToolOutput
[docs]class ToolOutput:
init_key_map = {
# Skip glob when building python string initialiser
"glob": None
}
[docs] def __init__(
self,
tag: str,
output_type: ParseableType,
selector: Optional[Union[Selector, str]] = None,
presents_as: str = None,
secondaries_present_as: Dict[str, str] = None,
doc: Optional[Union[str, OutputDocumentation]] = None,
glob: Optional[Union[Selector, str]] = None,
_skip_output_quality_check=False,
):
"""
A ToolOutput instructs the the engine how to collect an output and how
it may be referenced in a workflow.
:param tag: The identifier of a output, must be unique in the inputs and outputs.
:param output_type: The type of output that is being collected.
:param selector: How to collect this output, can accept any :class:`janis.Selector`.
:param glob: (DEPRECATED) An alias for `selector`
:param doc: Documentation on what the output is, used to generate docs.
:param _skip_output_quality_check: DO NOT USE THIS PARAMETER, it's a scapegoat for parsing CWL ExpressionTools when an cwl.output.json is generated
"""
if not Validators.validate_identifier(tag):
raise Exception(
f"The identifier '{tag}' was invalid because {Validators.reason_for_failure(tag)}"
)
self.tag = tag
self.output_type: ParseableType = get_instantiated_type(output_type)
self._skip_output_quality_check = _skip_output_quality_check
if selector is None and glob is not None:
selector = glob
elif selector is not None and glob is not None:
raise TypeError(
f"ToolInput({tag}) received inputs for both selector and glob. Please only use glob"
)
if (
not _skip_output_quality_check
and selector is None
and not (
isinstance(self.output_type, Stdout)
or isinstance(self.output_type, Stderr)
)
):
raise Exception(
"ToolOutput expects a 'selector=' param when the output type is not Stdout / Stderr"
)
self.selector = selector
self.presents_as = presents_as
self.secondaries_present_as = secondaries_present_as
self.doc = (
doc
if isinstance(doc, OutputDocumentation)
else OutputDocumentation(doc=doc)
)
if isinstance(selector, Operator) and self.presents_as:
raise Exception(
f"Error when constructing output '{self.id()}', Janis does not support 'presents_as' AND "
"operators within a ToolOutput selector. Please raise an issue if you think this is in error."
)
if self.secondaries_present_as:
if not self.output_type.secondary_files():
raise Exception(
f"The ToolOutput '{self.id()}' requested a rewrite of secondary file extension through "
f"'secondaries_present_as', but the type {self.output_type.id()} not have any secondary files."
)
secs = set(self.output_type.secondary_files())
to_remap = set(self.secondaries_present_as.keys())
invalid = to_remap - secs
if len(invalid) > 0:
raise Exception(
f"Error when constructing output '{self.id()}', the secondaries_present_as contained secondary "
f"files ({', '.join(invalid)}) that were not found in the output "
f"type '{self.output_type.id()}' ({', '.join(secs)})"
)
def id(self):
return self.tag
def __repr__(self):
attrs = ", ".join(
f"{k}={repr(v)}"
for k, v in self.__dict__.items()
if not k.startswith("_") and not callable(v) and v is not None
)
return f"{self.__class__.__name__}({attrs})"
[docs]class CommandTool(Tool, ABC):
"""
A CommandTool is an interface between Janis and a program to be executed.
Simply put, a CommandTool has a name, a command, inputs, outputs and a container to run in.
This class can be inherited to created a CommandTool, else a CommandToolBuilder may be used.
"""
def __init__(self, **connections):
super().__init__(metadata_class=ToolMetadata, **connections)
# Tool base
@abstractmethod
def tool(self) -> str:
"""
Unique identifier of the tool
:return:
"""
pass
@abstractmethod
def base_command(self) -> Optional[Union[str, List[str]]]:
"""
The command of the tool to execute, usually the tool name or path and not related to any inputs.
This field will always come before any inputs or arguments, though it's possible to omit this
field and the program will use the first ordered argument / position.
:return: Optional[Union[str, List[str]]]
"""
pass
@abstractmethod
def inputs(self) -> List[ToolInput]:
"""
A list of named tool inputs that will be used to create the command line. See :class:`janis.ToolInput`
for options on how to configure this command line binding.
:return: List[janis.ToolInput]
"""
pass
def arguments(self) -> Optional[List[ToolArgument]]:
"""
A list of arguments that will be used to create the command line. Although they are not directly
addressable as inputs, it's possible to use use a :class:`janis.InputSelector` or
:class:`janis.StringFormatter` in the value field. See :class:`janis.ToolArgument` for
options on how to configure a this command line binding.
:return: List[janis.ToolArgument]
"""
return None
@abstractmethod
def outputs(self) -> List[ToolOutput]:
"""
A list of named outputs of the tool. Each :class:`janis.ToolOutput` has a ``glob`` field that
can be used to select the outputs, see its documentation for more information.
:return:
"""
pass
def env_vars(self) -> Optional[Dict[str, Union[str, Selector]]]:
return None
# Tool versions
@abstractmethod
def container(self) -> str:
"""
A link to an OCI compliant container accessible by your engine. Previously, docker().
:return: str
"""
pass
@abstractmethod
def version(self) -> str:
"""
Version of the tool. Janis supports multiple versions of tools with the same ``.tool()`` value.
The recommended format is `SemVer <https://semver.org/>`_, though you should reflect the tool version.
:return: str
"""
pass
## Other studd
def has_tool_with_no_container(self):
return self.container() is None
def containers(self) -> Dict[str, str]:
return {self.versioned_id(): self.container()}
def id(self):
return self.tool()
def __hash__(self):
return hash(self.tool())
def full_name(self):
if self.version() is not None:
return f"{self.tool()}/{self.version()}"
return self.tool()
def memory(self, hints: Dict[str, Any]) -> Optional[Union[float, Selector]]:
"""
These values are used to generate a separate runtime.json / runtime.yaml input
that can be passed to the execution engine to fill in for the specified hints.
These are now (2019-04-10) to be kept out of the workflow, to leave the workflow
truly portable.
This memory must be in GB!
"""
return None
def cpus(self, hints: Dict[str, Any]) -> Optional[Union[int, Selector]]:
"""
These values are used to generate a separate runtime.json / runtime.yaml input
that can be passed to the execution engine to fill in for the specified hints.
These are now (2019-04-10) to be kept out of the workflow, to leave the workflow
truly portable.
The CPU must be a whole number or a Selector that resolves to a whole number.
:return:
"""
return None
def time(self, hints: Dict[str, Any]) -> Optional[Union[int, Selector]]:
"""
These values are used to generate a separate runtime.json / runtime.yaml input
that can be passed to the execution engine to fill in for the specified hints.
These are now (2019-04-10) to be kept out of the workflow, to leave the workflow
truly portable.
The time is specified in SECONDS and must be a whole number.
:return:
"""
return None
def disk(self, hints: Dict[str, Any]) -> Optional[Union[float, Selector]]:
"""
These values are used to generate a separate runtime.json / runtime.yaml input
that can be passed to the execution engine to fill in for the specified hints.
These are now (2019-04-10) to be kept out of the workflow, to leave the workflow
truly portable.
The time is specified in GB.
:return:
"""
return None
def directories_to_create(self) -> Union[str, List[str]]:
"""
A list of directories to create. In WDL this is called before files_to_create as:
mkdir -p directory
In CWL, this is transformed to a InitialWorkDirRequiriement.Directory. The listing
is transformed by the files_to_create.
:return:
"""
pass
def files_to_create(self) -> Dict[str, Union[str, Selector]]:
"""
A list of files to create, keyed by their path. In WDL, this is executed AFTER directories_to_create.
In CWL, this will get turned into a InitialWorkDirRequirement.
:return:
"""
pass
@classmethod
def type(cls):
return ToolType.CommandTool
def translate(
self,
translation: Union[str, SupportedTranslation],
to_console=True,
to_disk=False,
export_path=None,
with_docker=True,
with_resource_overrides=False,
allow_empty_container=False,
container_override=None,
):
import janis_core.translations
if isinstance(container_override, str):
container_override = {self.id().lower(): container_override}
return janis_core.translations.translate_tool(
self,
translation,
to_console=to_console,
to_disk=to_disk,
export_path=export_path,
with_docker=with_docker,
with_resource_overrides=with_resource_overrides,
allow_empty_container=allow_empty_container,
container_override=container_override,
)
def tool_inputs(self) -> List[TInput]:
return [
TInput(t.id(), t.input_type, default=t.default, doc=t.doc)
for t in self.inputs()
]
def tool_outputs(self) -> List[TOutput]:
return [TOutput(t.id(), t.output_type, doc=t.doc) for t in self.outputs()]
def all_input_keys(self):
return super().all_input_keys() + [
"runtime_memory",
"runtime_cpu",
"runtime_disks",
"runtime_seconds",
]
def help(self):
import inspect
tb = " " * 4
path = inspect.getfile(self.__class__)
ins = sorted(
self.inputs(), key=lambda i: i.position if i.position is not None else 0
)
# args = ""
# if self.arguments():
# args = " " + " ".join(
# f"{(a.prefix if a.prefix is not None else '') + ' ' if (a.prefix is not None and a.separate_value_from_prefix) else ''}{a.value}"
# for a in self.arguments()
# ):x
#
# prefixes = " -" + "".join(
# i.prefix.replace("-", "").replace(" ", "")
# for i in ins
# if i.prefix is not None
# )
metadata = self.metadata
docker = self.container()
base = (
(
self.base_command()
if isinstance(self.base_command(), str)
else " ".join(self.base_command())
)
if self.base_command()
else ""
)
command = base + " [parameters]"
def input_format(t: ToolInput):
prefix_with_space = ""
if t.prefix is not None:
prefix_with_space = (
(t.prefix + ": ")
if (t.separate_value_from_prefix is not False)
else t.prefix
)
return (
f"{2 * tb}{t.tag} ({prefix_with_space}{t.input_type.id()}{('=' + str(t.default)) if t.default is not None else ''})"
f": {'' if t.doc is None else t.doc}"
)
output_format = (
lambda t: f"{2 * tb}{t.tag} ({t.output_type.id()}): {'' if t.doc is None else t.doc}"
)
requiredInputs = "\n".join(
input_format(x) for x in ins if not x.input_type.optional
)
optionalInputs = "\n".join(
input_format(x) for x in ins if x.input_type.optional
)
outputs = "\n".join(output_format(o) for o in self.outputs())
return f"""
Pipeline tool: {path} ({self.id()})
NAME
{self.id()}
SYNOPSIS
{command}
DOCKER
{docker}
DOCUMENTATION URL
{metadata.documentationUrl if metadata.documentationUrl else "No url provided"}
DESCRIPTION
{metadata.documentation if metadata.documentation else "No documentation provided"}
INPUTS:
REQUIRED:
{requiredInputs}
OPTIONAL:
{optionalInputs}
OUTPUTS:
{outputs}
"""
def generate_inputs_override(
self,
additional_inputs=None,
with_resource_overrides=False,
hints=None,
include_defaults=True,
values_to_ignore: Set[str] = None,
quality_type: List[InputQualityType] = None,
):
"""
Generate the overrides to be used with Janis. Although it may work with
other
:return:
"""
d, ad = {}, additional_inputs or {}
for i in self.inputs():
if (
(
not i.input_type.optional
or i.id() in ad
or (include_defaults and i.default)
)
and not (values_to_ignore and i.id() in values_to_ignore)
and (not (i.doc and quality_type) or i.doc.quality in quality_type)
):
d[i.id()] = ad.get(i.id(), i.default)
if with_resource_overrides:
cpus = self.cpus(hints)
mem = self.memory(hints)
disk = self.disk(hints)
secs = self.time(hints)
if cpus is None:
cpus = 1
elif isinstance(cpus, Selector):
cpus = None
if isinstance(mem, Selector):
mem = None
if isinstance(secs, Selector):
secs = None
if isinstance(disk, Selector):
disk = None
d.update(
{
"runtime_memory": mem,
"runtime_cpu": cpus,
"runtime_disks": disk,
"runtime_seconds": secs,
}
)
return d
def wrapped_in_wf(self):
from copy import copy
from janis_core.workflow.workflow import WorkflowBuilder
wf = WorkflowBuilder(self.id() + "Wf")
inpmap = {}
for i in self.inputs():
if isinstance(i.input_type, Filename):
intp = String(optional=True)
else:
intp = copy(i.input_type)
if i.default:
intp.optional = True
inpmap[i.id()] = wf.input(i.id(), intp)
stp = wf.step(self.tool().lower(), self(**inpmap))
for o in self.outputs():
wf.output(o.id(), source=stp[o.id()])
return wf
def to_command_tool_builder(self):
return CommandToolBuilder(
tool=self.tool(),
base_command=self.base_command(),
inputs=self.inputs(),
outputs=self.outputs(),
container=self.container(),
version=self.version(),
friendly_name=self.friendly_name(),
arguments=self.arguments(),
env_vars=self.env_vars(),
tool_module=self.tool_module(),
tool_provider=self.tool_provider(),
metadata=self.bind_metadata() or self.metadata,
cpus=self.cpus({}),
memory=self.memory({}),
time=self.time({}),
disk=self.disk({}),
directories_to_create=self.directories_to_create(),
files_to_create=self.files_to_create(),
)
SELECTOR_OR_VALUE = Union[Selector, str]
POTENTIAL_LIST_SElECTOR = Union[SELECTOR_OR_VALUE, List[SELECTOR_OR_VALUE]]
[docs]class CommandToolBuilder(CommandTool):
def tool(self) -> str:
return self._tool
def friendly_name(self):
return self._friendly_name
def base_command(self) -> Optional[Union[str, List[str]]]:
return self._base_command
def inputs(self) -> List[ToolInput]:
return self._inputs
def arguments(self):
return self._arguments
def outputs(self) -> List[ToolOutput]:
return self._outputs
def container(self) -> str:
return self._container
def version(self) -> str:
return self._version
def tool_provider(self):
return self._tool_provider
def tool_module(self):
return self._tool_module
def env_vars(self):
return self._env_vars
def cpus(self, hints: Dict[str, Any]):
if self._cpus is None:
return None
if isinstance(self._cpus, (int, float, Selector)):
return self._cpus
if callable(self._cpus):
return self._cpus(hints)
Logger.warn(
f"Janis does not recognise {self._cpus} ({type(self._cpus)}) as a valid CPU value, returning 1"
)
return 1
def memory(self, hints: Dict[str, Any]):
if self._memory is None:
return None
if isinstance(self._memory, (int, float, Selector)):
return self._memory
if callable(self._memory):
return self._memory(hints)
Logger.warn(
f"Janis does not recognise {self._memory} ({type(self._memory)}) as a valid value for memory, returning 4GB"
)
return 4
def time(self, hints: Dict[str, Any]) -> Optional[Union[int, Selector]]:
if self._time is None:
return None
if isinstance(self._time, (int, float, Selector)):
return self._time
if callable(self._time):
return self._time(hints)
Logger.warn(
f"Janis does not recognise {self._memory} ({type(self._time)}) as a valid value for time, returning 86400 seconds"
)
return 86400
def disk(self, hints: Dict[str, Any]) -> Optional[Union[float, Selector]]:
if self._disk is None:
return None
if isinstance(self._disk, (int, float, Selector)):
return self._disk
if callable(self._disk):
return self._disk(hints)
Logger.warn(
f"Janis does not recognise {type(self._disk)} as a valid value for disk, returning None"
)
return None
def directories_to_create(self) -> Union[str, List[str]]:
return self._directories_to_create
def files_to_create(self) -> Dict[str, Union[str, Selector]]:
return self._files_to_create
init_key_map = {
"tool": "_tool",
"base_command": "_base_command",
"inputs": "_inputs",
"outputs": "_outputs",
"container": "_container",
"version": "_version",
"friendly_name": "_friendly_name",
"arguments": "_arguments",
"env_vars": "_env_vars",
"tool_module": "_tool_module",
"tool_provider": "_tool_provider",
"metadata": "_metadata",
"cpus": "_cpus",
"memory": "_memory",
"time": "_time",
"disk": "_disk",
"directories_to_create": "_directories_to_create",
"files_to_create": "_files_to_create",
"doc": "_doc",
}
def doc(self) -> Optional[str]:
return self._doc
[docs] def __init__(
self,
tool: str,
base_command: Optional[Union[str, List[str]]],
inputs: List[ToolInput],
outputs: List[ToolOutput],
container: str,
version: str,
friendly_name: Optional[str] = None,
arguments: List[ToolArgument] = None,
env_vars: Dict = None,
tool_module: str = None,
tool_provider: str = None,
metadata: ToolMetadata = None,
cpus: Union[int, Callable[[Dict[str, Any]], int]] = None,
memory: Union[int, Callable[[Dict[str, Any]], int]] = None,
time: Union[int, Callable[[Dict[str, Any]], int]] = None,
disk: Union[int, Callable[[Dict[str, Any]], int]] = None,
directories_to_create: POTENTIAL_LIST_SElECTOR = None,
files_to_create: Union[
Dict[str, SELECTOR_OR_VALUE],
List[Tuple[SELECTOR_OR_VALUE, SELECTOR_OR_VALUE]],
] = None,
doc: str = None,
):
"""
Builder for a CommandTool.
:param tool: Unique identifier of the tool
:param friendly_name: A user friendly name of your tool (must be implemented for generated docs)
:param base_command: The command of the tool to execute, usually the tool name or path and not related to any inputs.
:param inputs: A list of named tool inputs that will be used to create the command line.
:param outputs: A list of named outputs of the tool; a ``ToolOutput`` declares how the output is captured.
:param arguments: A list of arguments that will be used to create the command line.
:param container: A link to an OCI compliant container accessible by the engine.
:param version: Version of the tool.
:param env_vars: A dictionary of environment variables that should be defined within the container.
:param tool_module: Unix, bioinformatics, etc.
:param tool_provider: The manafacturer of the tool, eg: Illumina, Samtools
:param metadata: Metadata object describing the Janis tool interface
:param cpu: An integer, or function that takes a dictionary of hints and returns an integer in 'number of CPUs'
:param memory: An integer, or function that takes a dictionary of hints and returns an integer in 'GBs'
:param time: An integer, or function that takes a dictionary of hints and returns an integer in 'seconds'
:param disk: An integer, or function that takes a dictionary of hints and returns an integer in 'GBs'
:param directories_to_create: A list of directories to create, accepts an expression (selector / operator)
:param files_to_create: Either a List of tuples [path: Selector, contents: Selector],
or a dictionary {"path": contents}. The list of tuples allows you to use an operator for the pathname
:param doc: Documentation string
"""
super().__init__()
self._tool = tool
self._friendly_name = friendly_name
self._base_command = base_command
self._inputs = inputs
self._outputs = outputs
self._container = container
self._version = version
self._arguments = arguments
self._env_vars = env_vars
self._tool_module = tool_module
self._tool_provider = tool_provider
self._metadata = metadata
self._cpus = cpus
self._memory = memory
self._time = time
self._disk = disk
self._directories_to_create = directories_to_create
self._files_to_create = files_to_create
self._doc = doc