###################
# Implementations #
###################
import operator
import os.path
from inspect import isclass
from typing import Dict, Any, Set, List, Optional
from janis_core.deps import cwlgen, wdlgen
from janis_core.tool.test_classes import TTestExpectedOutput, TTestPreprocessor
from janis_core.utils.logger import Logger
from janis_core.__meta__ import GITHUB_URL
from janis_core.types.data_types import DataType, NativeTypes, NativeType, ParseableType
from janis_core.utils.generics_util import is_generic, is_qualified_generic
class UnionType(DataType):
def __init__(self, *subtypes: ParseableType, optional=False):
self._initial_subtypes = [t for t in subtypes]
invalid_types = []
valid_types = []
types_with_secondaries = []
for subtype in subtypes:
resolvedtype = get_instantiated_type(subtype)
if not isinstance(resolvedtype, DataType):
invalid_types.append(resolvedtype)
elif isinstance(resolvedtype, File) and resolvedtype.secondary_files():
types_with_secondaries.append(types_with_secondaries)
else:
valid_types.append(resolvedtype)
if len(types_with_secondaries) > 0:
raise Exception(
"UnionType doesn't accept data types with secondary files (yet), affected types: "
+ ", ".join(str(t) for t in types_with_secondaries)
)
if len(invalid_types) > 0:
raise Exception(
"UnionType contained invalid types "
+ ", ".join(str(t) for t in invalid_types)
)
if len(valid_types) < 1:
raise Exception("UnionType is expecting at least 2 data type arguments")
self.subtypes = valid_types
super().__init__(optional)
def is_base_type(self, base_type):
return all(s.is_base_type(base_type) for s in self.subtypes)
def get_extensions(self):
s = set()
for subtype in self.subtypes:
if hasattr(subtype, "get_extensions"):
s = s.union(subtype.get_extensions() or [])
return list(s)
def is_array(self):
return all(s.is_array() for s in self.subtypes)
def id(self):
return "Union<" + ", ".join(s.id() for s in self.subtypes) + ">"
@staticmethod
def name() -> str:
return "Union"
@staticmethod
def primitive() -> NativeType:
return None
@staticmethod
def doc() -> str:
return "Union datatype"
def validate_value(self, *args, **kwargs) -> bool:
return any(t.validate_value(*args, **kwargs) for t in self.subtypes)
def invalid_value_hint(self, *args, **kwargs):
hints = [t.invalid_value_hint(*args, **kwargs) for t in self.subtypes]
return ", ".join(t for t in hints if t)
def can_receive_from(self, other, *args, **kwargs):
if isinstance(other, UnionType):
# we'll require all elements in the source to be received by this type-
return all(
self.can_receive_from(t, *args, **kwargs) for t in other.subtypes
)
return any(t.can_receive_from(other, *args, **kwargs) for t in self.subtypes)
def wdl(self, has_default=False) -> wdlgen.WdlType:
# custom stuff here
wdl_data_types = [a.wdl() for a in self.subtypes]
# we require the WDL to be identical for WDL to work
if len(set(a.get_string() for a in wdl_data_types)) > 1:
resuting_signatures = ", ".join(
f"{a.id()}: {a.wdl().get_string()}" for a in self.subtypes
)
raise Exception(
"Janis doesn't support UnionTypes in WDL where there is more than 1 WDL type signatures. "
f"Please raise an issue on GitHub ({GITHUB_URL}) if this is a blocker. Resulting signatures: "
+ resuting_signatures
)
return wdl_data_types[0]
def cwl_type(self, has_default=False):
inner_types = [a.cwl_type(has_default=has_default) for a in self.subtypes]
try:
inner_types = list(set(inner_types))
except Exception as e:
Logger.debug(f"Error creating set from ({inner_types}): {e}")
if len(inner_types) == 1:
return inner_types[0]
return inner_types
[docs]class String(DataType):
@staticmethod
def name():
return "String"
@staticmethod
def primitive():
return NativeTypes.kStr
@staticmethod
def doc():
return "A string"
@classmethod
def schema(cls) -> Dict:
return {"type": "string", "required": True}
def input_field_from_input(self, meta):
return next(iter(meta.values()))
def can_receive_from(self, other, source_has_default=False):
if isinstance(other, Filename):
return True
return super().can_receive_from(other, source_has_default=source_has_default)
def validate_value(self, meta: Any, allow_null_if_not_optional: bool):
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, (str, float, int))
def coerce_value_if_possible(self, value):
return str(value)
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected string"
[docs]class Filename(String):
def __init__(
self, prefix="generated", suffix=None, extension: str = None, optional=None
):
"""
:param suffix: suffix the guid
:param extension: with no '.' (dot)
:param guid: Use this guid instead of generating one
:param optional: IGNORED (legacy)
"""
self.prefix = prefix
self.extension = extension
self.suffix = suffix
super().__init__(optional=True)
@staticmethod
def name() -> str:
return "Filename"
@staticmethod
def primitive() -> NativeType:
return NativeTypes.kStr
def cwl_type(self, has_default=False):
return super().cwl_type()
@staticmethod
def doc() -> str:
return """
This class is a placeholder for generated filenames, by default it is optional and CAN be overridden,
however the program has been structured in a way such that these names will be generated based on the step label.
These should only be used when the tool _requires_ a filename to output and you aren't
concerned what the filename should be. The Filename DataType should NOT be used as an output.
""".strip()
@classmethod
def schema(cls) -> Dict:
pass
def map_cwl_type(self, parameter: cwlgen.Parameter):
super().map_cwl_type(parameter)
parameter.default = self.generated_filenamecwl()
def generated_filename(self, replacements: Dict = None) -> str:
repl = replacements or {}
prefix = repl.get("prefix", self.prefix)
suffix = repl.get("suffix", self.suffix)
suf = ""
if suffix:
if str(suffix).startswith("."):
suf = str(suffix)
else:
suf = "-" + str(suffix)
ex = "" if self.extension is None else self.extension
return prefix + suf + ex
def generated_filenamecwl(self) -> str:
return f'"{self.generated_filename()}"'
# code = "Math.random().toString(16).substring(2, 8)"
# pf = (self.prefix + "-") if self.prefix else ""
# sf = self.suffix if self.suffix else ""
# ext = self.extension if self.extension else ""
# return f'"{pf}generated-" + {code} + "{sf + ext}"'
def can_receive_from(self, other: DataType, source_has_default=False):
# Specific override because Filename should be able to receive from string
if isinstance(other, String):
return True # Always provides default, and is always optional
return super().can_receive_from(other, source_has_default=source_has_default)
def wdl(self, has_default=True):
return super().wdl(has_default=has_default)
def validate_value(self, meta: Any, allow_null_if_not_optional: bool):
return True
def invalid_value_hint(self, meta):
return None
[docs]class Int(DataType):
@staticmethod
def name():
return "Integer"
@staticmethod
def primitive():
return NativeTypes.kInt
def doc(self):
return "An integer"
@classmethod
def schema(cls) -> Dict:
return {"type": "number", "required": True}
def input_field_from_input(self, meta):
return next(iter(meta.values()))
def validate_value(self, meta: Any, allow_null_if_not_optional: bool):
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, int)
def coerce_value_if_possible(self, value):
try:
return int(value)
except:
raise Exception(f"Value '{value}' cannot be coerced to an integer")
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected int"
[docs]class Float(DataType):
@staticmethod
def name():
return "Float"
@staticmethod
def primitive():
return NativeTypes.kFloat
def doc(self):
return "A float"
def input_field_from_input(self, meta: Dict):
return next(iter(meta.values()))
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, float) or isinstance(meta, int)
def coerce_value_if_possible(self, value):
try:
return float(value)
except:
raise Exception(f"Value '{value}' cannot be coerced to a float")
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected float | int"
class Double(Float):
@staticmethod
def name():
return "Double"
@staticmethod
def primitive():
return NativeTypes.kDouble
def doc(self):
return "An integer"
@classmethod
def schema(cls) -> Dict:
return {"type": "number", "required": True}
def input_field_from_input(self, meta):
return next(iter(meta.values()))
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, float) or isinstance(meta, int)
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected float | int"
def can_receive_from(self, other, *args, **kwargs) -> bool:
if not other.optional and isinstance(other, Float):
return True
return super().can_receive_from(other, *args, **kwargs)
[docs]class Boolean(DataType):
@staticmethod
def name():
return "Boolean"
@staticmethod
def primitive():
return NativeTypes.kBool
def doc(self):
return "A boolean"
def input_field_from_input(self, meta):
return next(iter(meta.values()))
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
if isinstance(meta, str):
return meta.lower() == "true" or meta.lower() == "false"
if isinstance(meta, int):
return meta == 0 or meta == 1
return isinstance(meta, bool)
def coerce_value_if_possible(self, value):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() == "true"
if isinstance(value, int):
return value != 0
raise Exception(f"Value {value} could not be coerced to boolean type")
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected bool"
[docs]class File(DataType):
def __init__(
self, optional=False, extension=None, alternate_extensions: Set[str] = None
):
"""
:param optional:
:param extension: Used in CWL to try and guess the file extension where it's not available otherwise
"""
super(File, self).__init__(optional=optional)
self.extension = extension
self.alternate_extensions = alternate_extensions
def get_extensions(self):
exts = []
if self.extension:
exts.append(self.extension)
if self.alternate_extensions:
exts.extend(self.alternate_extensions)
return exts
@staticmethod
def name():
return "File"
@staticmethod
def primitive():
return NativeTypes.kFile
def doc(self):
return "A local file"
@classmethod
def schema(cls) -> Dict:
return {"path": {"type": "string", "required": True}}
def get_value_from_meta(self, meta):
return meta.get("path")
def cwl_input(self, value: Any):
return {"class": "File", "path": value}
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, str)
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected string (path)"
def can_receive_from(self, other, source_has_default=False) -> bool:
o = get_instantiated_type(other).received_type()
if type(self) == File and isinstance(o, File):
return True
return super().can_receive_from(o)
# def cwl_type(self, has_default=False):
# secs = self.secondary_files()
# if secs:
# tp = cwlgen.File(secondaryFiles=self.secondary_files())
# return [tp, "null"] if self.optional and not has_default else tp
# return super().cwl_type(has_default=has_default)
@classmethod
def basic_test(
cls,
tag: str,
min_size: int,
md5: Optional[str] = None,
) -> List[TTestExpectedOutput]:
outcome = [
TTestExpectedOutput(
tag=tag,
preprocessor=TTestPreprocessor.FileSize,
operator=operator.ge,
expected_value=min_size,
),
]
if md5 is not None:
outcome += [
TTestExpectedOutput(
tag=tag,
preprocessor=TTestPreprocessor.FileMd5,
operator=operator.eq,
expected_value=md5,
),
]
return outcome
class Directory(DataType):
def __init__(self, optional=False):
"""
Specifically exclude default
"""
super(Directory, self).__init__(optional=optional)
@staticmethod
def name():
return "Directory"
@staticmethod
def primitive():
return NativeTypes.kDirectory
def doc(self):
return "A directory of files"
def get_value_from_meta(self, meta):
return meta["path"]
@classmethod
def schema(cls) -> Dict:
return {"path": {"type": "string", "required": True}}
def input_field_from_input(self, meta):
return meta["path"]
def cwl_input(self, value: Any):
# WDL: "{workflowName}.label" = meta["path"}
return {"class": "Directory", "path": value}
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
return isinstance(meta, str)
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
return f"Value was of type {type(meta)}, expected string (path)"
class Array(DataType):
init_key_map = {"t": "_t"}
def __init__(self, t: ParseableType, optional=False):
resolvedtype = get_instantiated_type(t)
if not isinstance(resolvedtype, DataType):
raise Exception(f"Type t ({type(t)}) must be an instance of 'DataType'")
self._t = resolvedtype
super().__init__(optional)
def is_array(self):
return True
def subtype(self):
return self._t
@staticmethod
def name():
return "Array"
@staticmethod
def primitive():
return NativeTypes.kArray
def id(self):
if self._t is None:
return super().id()
t = self._t
typed = f"Array<{t.id()}>"
if self.optional:
return f"Optional<{typed}>"
return typed
def doc(self):
return "An array"
@classmethod
def schema(cls) -> Dict:
return {"type": "array"}
def cwl_type(self, has_default=False):
inp = cwlgen.CommandInputArraySchema(
items=self._t.cwl_type(),
type="array"
# label=None,
# input_binding=None
)
return [inp, "null"] if self.optional and not has_default else inp
def map_cwl_type(self, parameter: cwlgen.Parameter) -> cwlgen.Parameter:
parameter.type = cwlgen.CommandInputArraySchema(items=None, type="array")
return parameter
def cwl_input(self, value: Any):
if isinstance(value, list):
return [self._t.cwl_input(v) for v in value]
if value is None:
return None
else:
raise Exception(f"Input value for input '{self.id()}' was not an array")
def wdl(self, has_default=False) -> wdlgen.WdlType:
ar = wdlgen.ArrayType(self._t.wdl(has_default=False), requires_multiple=False)
return wdlgen.WdlType(ar, optional=self.optional or has_default)
def can_receive_from(self, other, source_has_default=False):
if other.is_array():
return self._t.can_receive_from(other._t)
if not self._t.can_receive_from(other):
return False
return super().can_receive_from(other, source_has_default=source_has_default)
def input_field_from_input(self, meta):
return next(iter(meta.values()))
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
if meta is None:
return self.optional or allow_null_if_not_optional
if not isinstance(meta, list):
return False
return all(
self.subtype().validate_value(q, allow_null_if_not_optional) for q in meta
)
def invalid_value_hint(self, meta):
if meta is None:
return "value was null"
if self.validate_value(meta, True):
return None
if not isinstance(meta, list):
return f"Value was of type {type(meta)}, expected type Array<{self.subtype().id()}>"
hints = []
st = self.subtype()
for i in range(len(meta)):
hint = st.invalid_value_hint(meta[i])
if not hint:
continue
hints.append(f"{i}. {hint}")
return str(hints)
def parse_value(self, valuetoparse):
if not isinstance(valuetoparse, list):
valuetoparse = [valuetoparse]
return [self.subtype().parse_value(v) for v in valuetoparse]
def fundamental_type(self) -> DataType:
st = self.subtype()
if st.is_array():
return st.fundamental_type()
return st.received_type()
def received_type(self):
return Array(self._t.received_type(), optional=self.optional)
@classmethod
def array_wrapper(cls, expected_outputs: List[List[TTestExpectedOutput]]):
result = []
for i in range(len(expected_outputs)):
for expected_output in expected_outputs[i]:
expected_output.array_index = i
result.append(expected_output)
return result
class Stdout(File):
@staticmethod
def name():
return "Stdout"
def __init__(self, subtype=None, optional=None):
super().__init__(optional=False)
subtype = get_instantiated_type(subtype) if subtype is not None else File()
if optional is not None:
subtype.optional = optional
if subtype and not isinstance(subtype, File):
raise Exception(
"Janis does not currently support non-File stdout annotations"
)
self.subtype = subtype
if self.subtype.secondary_files():
raise Exception(
f"The subtype '{self.subtype.__name__}' has secondary files, "
f"but stdout does not have the ability to collect files"
)
@staticmethod
def primitive():
return NativeTypes.kStdout
def id(self):
return f"stdout<{self.subtype.id()}>"
def received_type(self):
st = self.subtype
if self.optional is not None:
st.optional = self.optional
return st
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
"""
Will always toss away the value
"""
return True
def invalid_value_hint(self, meta):
return None
class Stderr(File):
@staticmethod
def name():
return "Stderr"
def __init__(self, subtype=None, stderrname=None, optional=None):
super().__init__(optional=False)
subtype = get_instantiated_type(subtype) if subtype is not None else File()
if optional is not None:
subtype.optional = optional
if subtype and not isinstance(subtype, File):
raise Exception(
"Janis does not currently support non-File stderr annotations"
)
self.stderrname = stderrname
self.subtype = subtype
if self.subtype.secondary_files():
raise Exception(
f"The subtype '{self.subtype.__name__}' has secondary files, "
f"but stderr does not have the ability to collect files"
)
@staticmethod
def primitive():
return NativeTypes.kStderr
def id(self):
return f"stderr<{self.subtype.id()}>"
def received_type(self):
st = self.subtype
if self.optional is not None:
st.optional = self.optional
return st
def validate_value(self, meta: Any, allow_null_if_not_optional: bool) -> bool:
"""
Will always toss away the value
"""
return True
def invalid_value_hint(self, meta):
return None
class GenericFileWithSecondaries(File):
def __init__(
self, optional=False, secondaries: List[str] = None, extension: str = None
):
super().__init__(optional=optional, extension=extension)
if not isinstance(secondaries, list):
secondaries = [secondaries]
self.secondaries = secondaries
def secondary_files(self) -> Optional[List[str]]:
return self.secondaries
def id(self):
return f"{super().id()} [{', '.join(self.secondaries)}]"
@staticmethod
def name():
return "GenericFileWithSecondaries"
all_types = [
String,
Filename,
Int,
Float,
Double,
Boolean,
File,
Directory,
Stdout,
Stderr,
Array,
]
def get_from_python_type(dt, optional: bool = None, overrider=None):
if dt is None:
return Boolean(optional=True)
bc = overrider or get_instantiated_type
dtt = dt if type(dt) == type else None
typedt = type(dt)
try:
if dtt == str or typedt == str:
return String(optional=optional)
except Exception as e:
print(e)
if dtt == bool or typedt == bool:
return Boolean(optional=optional)
if dtt == int or typedt == int:
return Int(optional=optional)
if dtt == float or typedt == float:
return Float(optional=optional)
if is_qualified_generic(dt):
if str(dt).startswith("typing.List"):
nt = bc(dt.__args__[0], overrider=bc)
return Array(nt, optional=optional)
elif str(dt).startswith("typing.Union"):
subtypes = dt.__args__
# Filter out None or NoneType
try:
new_subtypes = [
t for t in subtypes if t is not None and type(None) != t
]
except Exception as e:
Logger.critical(
f"Couldn't determine the appropriate internal types from {str(dt)}, failed with error: {str(e)}"
)
raise
optional = len(subtypes) != len(new_subtypes)
if len(new_subtypes) == 0:
raise TypeError(
"Unsure how to parse generic: '{str(dt)}', please raise an issue if you think this is in error"
)
if len(new_subtypes) == 1:
return get_instantiated_type(
new_subtypes[0], optional=optional, overrider=bc
)
nts = [bc(n, overrider=bc) for n in new_subtypes]
return UnionType(*nts, optional=optional)
args = dt.__args__
if len(args) > 2:
raise Exception(f"Janis is unsure how to parse qualfied generic '{dt}'")
aridxofnonetype = [
i for i, val in enumerate(a == type(None) for a in args) if val
]
optional = len(aridxofnonetype) > 0
if len(aridxofnonetype) > 1 and optional is False:
raise Exception("Janis cannot accept union ")
idxofsubtype = (len(args) - 1 - aridxofnonetype[0]) if optional else 0
subtype = args[idxofsubtype]
nt = bc(subtype, optional=optional)
return nt
elif is_generic(dt):
raise Exception(f"Generic {dt} was generic typing, but unqualified")
def get_instantiated_type(datatype: ParseableType, optional=None, overrider=None):
bc = overrider or get_instantiated_type
if isinstance(datatype, list):
if len(datatype) == 0:
raise TypeError("Couldn't determine type of array with length 0")
return Array(bc(datatype[0]))
if isinstance(datatype, DataType):
return datatype
if isclass(datatype) and issubclass(datatype, DataType):
return datatype(optional=optional)
dt = get_from_python_type(datatype, optional=optional, overrider=bc)
if dt:
return dt
raise TypeError(f"Unable to parse type '{str(datatype)}'")
NumericType = UnionType(Int, Double, Float)
AnyType = UnionType(String, Boolean, Int, Double, Float, File, Directory)