Source code for janis_core.code.pythontool

from abc import ABC, abstractmethod
from textwrap import dedent, indent
from typing import Dict, Any, Optional, Type, List

from janis_core.translationdeps.supportedtranslations import SupportedTranslation
from janis_core.tool.documentation import InputDocumentation
from janis_core.types.data_types import NativeTypes

from janis_core.utils.docparser_info import parse_docstring
from janis_core.utils.metadata import ToolMetadata

from janis_core.code.codetool import CodeTool
from janis_core.tool.tool import TInput
from janis_core.types import (
    Boolean,
    Array,
    Int,
    Float,
    String,
    DataType,
    File,
    Directory,
    get_instantiated_type,
    all_types,
)

inspect_ignore_keys = {"self", "args", "kwargs", "cls"}


[docs]class PythonTool(CodeTool, ABC):
[docs] @staticmethod @abstractmethod def code_block(**kwargs) -> Dict[str, Any]: """ This code block must be 100% self contained. All libraries and functions must be imported and declared from within this block. :param kwargs: :return: """ pass
def __init__(self, **connections): self._cached_input_signature = None super().__init__(metadata_class=ToolMetadata, **connections) # Other internal methods def inputs(self): if self._cached_input_signature is None: import inspect argspec = inspect.signature(self.code_block) docstrings = parse_docstring(self.code_block.__doc__) paramlist = docstrings.get("params", []) paramdocs = { p["name"]: p.get("doc").strip() for p in paramlist if "name" in p } missing_annotations = set() unsupported_types = {} ins = [] for inp in argspec.parameters.values(): if inp.name in inspect_ignore_keys: continue fdefault = inp.default optional = (fdefault is not inspect.Parameter.empty) or fdefault is None default = fdefault if optional else None defaulttype = type(fdefault) if fdefault is not None else None annotation = ( defaulttype if inp.annotation is inspect.Parameter.empty else inp.annotation ) if not annotation: missing_annotations.add(inp.name) continue dt_type: Optional[DataType] = get_instantiated_type( annotation, optional=optional ) if not dt_type: unsupported_types[inp.name] = annotation continue ins.append( TInput( tag=inp.name, intype=dt_type, default=default, doc=InputDocumentation(paramdocs.get(inp.name)), ) ) if missing_annotations: raise Exception( f"The following types on the PythonTool '{self.id()}' were missing type annotations (REQUIRED): " + ", ".join(missing_annotations) ) if unsupported_types: raise Exception( f"Unsupported types for inputs: " + ", ".join(f"{k}: {v}" for k, v in unsupported_types.items()) ) self._cached_input_signature = ins return self._cached_input_signature def base_command(self): return ["python", self.script_name()] def script_name(self): return f"{self.id()}-script.py" def container(self): return "python:3.8.1" def prepared_script(self, translation: SupportedTranslation): import inspect nl = "\n" argkwargs = ", ".join(f"{t.id()}=args.{t.id()}" for t in self.inputs()) codeblock_without_static = nl.join( dedent(inspect.getsource(self.code_block)).split(nl)[1:] ) ins = self.tool_inputs() pt_decl = """\ class PythonTool: File = str Directory = str """ type_annotation_declarations = nl.join( f"{k} = {v}" for k, v in { **{inp.intype.__class__.__name__: "str" for inp in ins}, **{ t.__name__: NativeTypes.map_to_primitive(t.primitive()).__name__ for t in all_types }, "Array": "List", }.items() ) extra_param_parsing = "" if translation == SupportedTranslation.CWL: extra_param_parsing = """ from os import getcwd, path cwd = getcwd() def prepare_file_or_directory_type(file_or_directory, value): if value is None: return None if isinstance(value, list): return [prepare_file_or_directory_type(file_or_directory, v) for v in value] return { "class": file_or_directory, "path": path.join(cwd, value) }""" for out in self.outputs(): st = ( out.outtype.fundamental_type() if out.outtype.is_array() else out.outtype ) if not isinstance(st, (File, Directory)): continue file_or_directory = "Directory" if isinstance(st, Directory) else "File" extra_param_parsing += f'\nresult["{out.id()}"] = prepare_file_or_directory_type("{file_or_directory}", result["{out.id()}"])' extra_param_parsing = indent(extra_param_parsing, 4 * " ") return f""" import argparse, json, sys from typing import Optional, List, Dict, Any cli = argparse.ArgumentParser("Argument parser for Janis PythonTool") {nl.join(self.generate_cli_binding_for_input(inp) for inp in ins)} {type_annotation_declarations} {pt_decl} {codeblock_without_static} try: args = cli.parse_args() result = code_block({argkwargs}) {extra_param_parsing} print(json.dumps(result)) except Exception as e: print(str(e), file=sys.stderr) raise """ @staticmethod def generate_cli_binding_for_input(inp: TInput): params = [f'"--{inp.id()}"'] intype = None bintype = inp.intype required = not inp.intype.optional if bintype.is_array(): bintype = bintype.fundamental_type() params.append("nargs='+'") if required: params.append("default=[]") required = False if isinstance(bintype, Int): intype = "int" elif isinstance(bintype, Float): intype = "float" elif isinstance(bintype, (String, File, Directory)): intype = "str" elif isinstance(bintype, Boolean): params.append("action='store_true'") required = False if intype: params.append("type=" + intype) if required: params.append("required=True") if inp.doc and inp.doc.doc: escaped = inp.doc.doc.replace("'", "\\'").replace("\n", "\\n") params.append(f"help='{escaped}'") joined = ", ".join(params) return f"cli.add_argument({joined})"