Source code for janis_core.tool.tool

import operator
import sys
import os
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, List, Dict, Set

from janis_core.types.common_data_types import Array
from janis_core.tool.documentation import (
    InputDocumentation,
    OutputDocumentation,
    InputQualityType,
)
from janis_core.types import get_instantiated_type, DataType
from janis_core.utils import find_duplicates
from janis_core.utils.metadata import Metadata
from janis_core.utils.validators import Validators
from janis_core.tool.test_classes import (
    TTestCase,
    TTestExpectedOutput,
    TTestPreprocessor,
)
from nose.tools import nottest


class ToolType(Enum):
    Workflow = "workflow"
    CommandTool = "command-tool"
    CodeTool = "code-tool"

    def __str__(self):
        if self == ToolType.Workflow:
            return "Workflow"
        elif self == ToolType.CommandTool:
            return "CommandTool"
        elif self == ToolType.CodeTool:
            return "CodeTool"
        return "".join(a.title() for a in self.value.split("-"))


class TInput(object):
    def __init__(
        self, tag: str, intype: DataType, default=None, doc: InputDocumentation = None
    ):
        self.tag = tag
        self.intype = get_instantiated_type(intype)
        self.default = default
        self.doc = doc

    def __repr__(self):
        items = ["{self.id()}", self.intype.id()]
        if self.default is not None:
            items.append("default=" + str(self.default))
        return f"ToolOutput({', '.join(items)})"

    def id(self):
        return self.tag


class TOutput(object):
    def __init__(self, tag, outtype, doc: OutputDocumentation = None):
        self.tag = tag
        self.outtype = get_instantiated_type(outtype)
        self.doc: Optional[OutputDocumentation] = doc

    def __repr__(self):
        return f'ToolOutput("{self.id()}", {self.outtype.id()})'

    def id(self):
        return self.tag


class Tool(ABC, object):
    """
    One of Workflow, CommandLineTool, ExpressionTool* (* unimplemented)
    """

    TEST_DATA_FOLDER = "test_data"

    def __init__(self, metadata_class=Metadata, **connections):
        """
        :param metadata_class:
        :param connections:
        """

        self.metadata: metadata_class = metadata_class()
        meta = self.bind_metadata()
        if meta:
            self.metadata = meta

        self.connections = connections

    def __repr__(self):
        return f"{str(self.type())}<{self.id()}>"

    @classmethod
    @abstractmethod
    def type(cls) -> ToolType:
        raise Exception(f"'{cls}' must implement type() method")

    @abstractmethod
    def containers(self) -> Dict[str, str]:
        pass

    @abstractmethod
    def id(self) -> str:
        raise Exception("Must implement id() method")

    def versioned_id(self) -> str:
        if self.version() is not None:
            return Validators.transform_identifier_to_be_valid(
                f"{self.id()}/{self.version()}", "_"
            )
        return self.id()

    def tool_module(self):
        return None

    def tool_provider(self):
        return None

    @abstractmethod
    def tool_inputs(self) -> List[TInput]:
        raise Exception("Must implement inputs() method")

    @abstractmethod
    def tool_outputs(self) -> List[TOutput]:
        raise Exception("Must implement outputs() method")

    def inputs_map(self) -> Dict[str, TInput]:
        ins = self.tool_inputs()
        indict = {inp.tag: inp for inp in ins}

        if len(ins) != len(indict):
            dups = find_duplicates([i.tag for i in ins])
            dupstext = ", ".join(dups)
            raise Exception(
                f"There are {len(dups)} duplicate values in {self.id()}'s inputs: {dupstext}"
            )

        return indict

    def outputs_map(self) -> Dict[str, TOutput]:
        outs = self.tool_outputs()
        outdict = {outp.tag: outp for outp in outs}

        if len(outs) != len(outdict):
            dups = find_duplicates([o.tag for o in outs])
            dupstext = ", ".join(dups)
            raise Exception(
                f"There are {len(dups)} duplicate values in {self.id()}'s outputs: {dupstext}"
            )

        return outdict

    def friendly_name(self) -> Optional[str]:
        """
        Overriding this method is not required UNLESS you distribute your tool.
        Generating the docs will fail if your tool does not provide a name.

        :return: A friendly name of your tool
        """
        return None

    def all_input_keys(self) -> List[str]:
        return [t.id() for t in self.tool_inputs()]

    @abstractmethod
    def has_tool_with_no_container(self):
        pass

    @abstractmethod
    def generate_inputs_override(
        self,
        additional_inputs=None,
        with_resource_overrides=False,
        hints=None,
        include_defaults=True,
        values_to_ignore: Set[str] = None,
        quality_type: List[InputQualityType] = None,
    ):
        pass

    def __call__(self, **connections):
        self.connections = connections
        return self

    @abstractmethod
    def version(self):
        return None

    def doc(self) -> Optional[str]:
        return None

    @abstractmethod
    def translate(
        self,
        translation: str,
        to_console=True,
        to_disk=False,
        export_path=None,
        with_docker=True,
        with_resource_overrides=False,
        allow_empty_container=False,
        container_override=None,
    ):
        raise Exception("Subclass must provide implementation for 'translate()' method")

    def bind_metadata(self):
        """
        A convenient place to add metadata about the tool. You are guaranteed that self.metadata will exist.
        It's possible to return a new instance of the ToolMetadata / WorkflowMetadata which will be rebound.
        This is usually called after the initialiser, though it may be called multiple times.
        :return:
        """
        return self.metadata

    def help(self):
        import inspect

        tb = " " * 4

        path = inspect.getfile(self.__class__)

        ins = self.tool_inputs()

        metadata = self.metadata

        def input_format(t: TInput):
            return (
                f"{2 * tb}{t.id()} ({t.intype.id()}{('=' + str(t.default)) if t.default is not None else ''})"
                f": {'' if t.doc is None else t.doc}"
            )

        output_format = (
            lambda t: f"{2 * tb}{t.id()} ({t.outtype.id()}): {'' if t.doc is None else t.doc}"
        )

        requiredInputs = "\n".join(
            input_format(x) for x in ins if not x.intype.optional
        )
        optionalInputs = "\n".join(input_format(x) for x in ins if x.intype.optional)
        outputs = "\n".join(output_format(o) for o in self.tool_outputs())

        return f"""
Pipeline tool: {path} ({self.id()})
NAME
    {self.id()} ({self.friendly_name()})

DOCUMENTATION URL
    {metadata.documentationUrl if metadata.documentationUrl else "No url provided"}

DESCRIPTION
    {metadata.documentation if metadata.documentation else "No documentation provided"}

INPUTS:
    REQUIRED:
{requiredInputs}

    OPTIONAL:
{optionalInputs}

OUTPUTS:
{outputs}
"""

    @nottest
    def tests(self) -> Optional[List[TTestCase]]:
        """
        A list of test cases for this tool
        """
        return None

    def minimal_test(self) -> List[TTestExpectedOutput]:
        """
        A minimal test simply checks if output files exist (if their sizes are bigger than 0).
        It should be used when we don't know what outputs we can expect from a tool. It should
        be called within a TTestCase. Be aware that we still need to know the inputs.

        :return: List of expected outputs
        :rtype: List[TTestExpectedOutput]
        """
        outcome = []
        for i in self.tool_outputs():
            preprocessor = (
                TTestPreprocessor.ListOfFilesExist
                if i.outtype.is_base_type(Array)
                else TTestPreprocessor.FileSize
            )
            comparison = operator.eq if i.outtype.is_base_type(Array) else operator.gt
            expected_value = True if i.outtype.is_base_type(Array) else 0
            secondary_files_suffixes = (
                i.outtype.fundamental_type().secondary_files()
                if i.outtype.is_base_type(Array)
                else i.outtype.secondary_files()
            )
            outcome += [
                TTestExpectedOutput(
                    tag=i.tag,
                    preprocessor=preprocessor,
                    operator=comparison,
                    expected_value=expected_value,
                )
            ]
            if secondary_files_suffixes is not None:
                for suffix in secondary_files_suffixes:
                    outcome += [
                        TTestExpectedOutput(
                            tag=i.tag,
                            suffix_secondary_file=suffix,
                            preprocessor=preprocessor,
                            operator=comparison,
                            expected_value=expected_value,
                        )
                    ]
        return outcome

    @classmethod
    @nottest
    def test_data_path(cls):
        module_path = os.path.dirname(sys.modules[cls.__module__].__file__)
        return os.path.join(module_path, cls.TEST_DATA_FOLDER)

    @classmethod
    @nottest
    def skip_test(cls) -> bool:
        """
        Sometimes, we may want to skip tests for some tools because they are not ready yet
        """
        return False