Source code for janis_core.operators.stringformatter

from typing import Optional, List, Dict, Tuple

from janis_core.utils import first_value

from janis_core.types import String, AnyType
from janis_core.operators.logical import Operator, AddOperator
from janis_core.utils.bracketmatching import get_keywords_between_braces
from janis_core.utils.errors import (
    TooManyArgsException,
    IncorrectArgsException,
    InvalidByProductException,
    ConflictingArgumentsException,
)
from janis_core.utils.logger import Logger


[docs]class StringFormatter(Operator): def returntype(self): return String() def argtypes(self): return [String, Optional[AnyType]] @staticmethod def friendly_signature(): return "String, **kwargs -> String" def validate(self, perform_typecheck=False): return True def __init__(self, format: str, **kwargs): super().__init__([]) # ignore super().__init__ call self._format: str = format keywords, balance = get_keywords_between_braces(self._format) if balance > 0: Logger.warn( "There was an imbalance of braces in the string _format, this might cause issues with concatenation" ) skwargs = set(kwargs.keys()) if not keywords == skwargs: # what's the differences if not keywords.issubset(skwargs): raise IncorrectArgsException( "The _format required additional arguments to be provided by " "**kwargs, requires the keys:" + ", ".join(keywords - skwargs) ) else: raise TooManyArgsException( "The **kwargs contained unrecognised keys: " + ", ".join(skwargs - keywords) ) self.kwargs = kwargs resolved_types = [str, int, float] def to_cwl(self, unwrap_operator, *args): raise Exception("Don't use this method") def to_wdl(self, unwrap_operator, *args): raise Exception("Don't use this method") def to_python(self, unwrap_operator, *args): f = self._format for k, v in self.kwargs.items(): f = f.replace(f"{{{str(k)}}}", unwrap_operator(v)) return f def evaluate(self, inputs): resolvedvalues = { k: self.evaluate_arg(v, inputs) for k, v in self.kwargs.items() } values_that_are_lists = { k: v for k, v in resolvedvalues.items() if isinstance(v, list) } inp_combinations: List[dict] = [{}] if len(values_that_are_lists) > 0: l = len(first_value(values_that_are_lists)) list_values_that_are_different = sum( 0 if len(v) == l else 1 for v in values_that_are_lists.values() ) if list_values_that_are_different == 0: # dot product inp_combinations = [ {k: v[i] for k, v in values_that_are_lists.items()} for i in range(l) ] elif list_values_that_are_different == 1: # cross product inp_combinations = self.generate_combinations_of_input_dicts( values_that_are_lists=list(values_that_are_lists.items()) ) else: l_lengths = ", ".join( f"{k}={len(v)}" for k, v in values_that_are_lists.items() ) raise Exception( "String Formatter evaluation doesn't support scattering for list of " ) evaluated_combinations = [ self.resolve_with_resolved_values(**{**resolvedvalues, **c}) for c in inp_combinations ] if len(evaluated_combinations) == 0: raise Exception( "Something happened when resolving inputs with input values " + str(inputs) ) elif len(evaluated_combinations) == 1: return evaluated_combinations[0] else: return evaluated_combinations def rewrite_operator(self, args_to_rewrite: dict): return self.__class__( self._format, **self.substitute_arg(args_to_rewrite, self.kwargs) ) @staticmethod def generate_combinations_of_input_dicts( values_that_are_lists: List[Tuple[str, List[any]]] ) -> List[Dict]: if len(values_that_are_lists) == 0: return [] key = values_that_are_lists[0][0] values = values_that_are_lists[0][1] if len(values_that_are_lists) == 1: return [{key: v} for v in values] combinations = [] for v in values: for c in StringFormatter.generate_combinations_of_input_dicts( values_that_are_lists[1:] ): combinations.append({**c, key: v}) return combinations def __repr__(self): val = self._format for k, v in self.kwargs.items(): val = val.replace(f"{{{k}}}", f"{{{str(v)}}}") return val def get_leaves(self): leaves = [] for a in self.kwargs.values(): if isinstance(a, Operator): leaves.extend(a.get_leaves()) else: leaves.append(a) return leaves def resolve_with_resolved_values(self, **resolved_values): s1 = set(self.kwargs.keys()) actual_keys, _ = get_keywords_between_braces(self._format) if s1 != actual_keys: diff = (actual_keys - s1).union(s1 - actual_keys) raise Exception( "The format for the string builder has changed since runtime, or an internal error has" " occurred. The following keys did not appear in both sets: " + ", ".join(diff) ) s2 = set(resolved_values.keys()) missing_keys = s1 - s2 if len(missing_keys) > 0: raise IncorrectArgsException( "There were missing parameters when formatting string: " + ", ".join(missing_keys) ) unresolved_values = [ f"{r} ({type(resolved_values[r]).__name__})" for r in resolved_values if not any( isinstance(resolved_values[r], t) for t in StringFormatter.resolved_types ) ] if len(unresolved_values) > 0: raise ValueError( "There were unresolved parameters when formatting string: " + ", ".join(unresolved_values) ) retval = self._format for k in resolved_values: retval = retval.replace(f"{{{k}}}", str(resolved_values[k])) return retval def __radd__(self, other): return StringFormatter(other) + self def __add__(self, other): from janis_core.operators.selectors import InputSelector if isinstance(other, str): # check if it has args in it keywords = get_keywords_between_braces(other) if len(keywords) > 0: invalidkwargs = [k for k in self.kwargs if k not in self.kwargs] if len(invalidkwargs) > 0: raise InvalidByProductException( f"The string to be concatenated contained placeholder(s) ({', '.join(invalidkwargs)})" f"that were not in the original StringFormatter" ) return self._create_new_formatter_from_strings_and_args( [self._format, other], **self.kwargs ) elif isinstance(other, InputSelector): return self + other.to_string_formatter() elif isinstance(other, StringFormatter): # check if args overlap and they're different s1 = set(self.kwargs.keys()) s2 = set(other.kwargs.keys()) intersection = s1.intersection(s2) if len(intersection) > 0: not_same_args = [ k for k in intersection if self.kwargs[k] != other.kwargs[k] ] if len(not_same_args) > 0: raise ConflictingArgumentsException( f"Couldn't concatenate formats as there keys ({', '.join(not_same_args)}) " f"that were not equal between formatters " ) # yeah we sweet new_args = {**self.kwargs, **other.kwargs} return StringFormatter._create_new_formatter_from_strings_and_args( [self._format, other._format], **new_args ) @staticmethod def _create_new_formatter_from_strings_and_args(strings: [str], **kwargs): new_format = "".join(strings) try: return StringFormatter(new_format, **kwargs) except IncorrectArgsException as e: new_params = set( get_keywords_between_braces(new_format)[0] - set(kwargs.keys()) ) raise InvalidByProductException( "Joining the input files (to '{new_format}') created the new params: " + ", ".join(new_params) ) def to_string_formatter(self): return self