Source code for janis_assistant.management.configuration

import os.path
from copy import copy
from enum import Enum
from typing import Optional, List, Union, Type
import ruamel.yaml

from janis_assistant.containers import get_container_by_name
from janis_assistant.containers.base import Container

from janis_assistant.data.models.util import Serializable
from janis_core.utils.logger import Logger

from janis_assistant.utils import fully_qualify_filename

from janis_assistant.engines.enginetypes import EngineType
from janis_assistant.data.enums.dbtype import DatabaseTypeToUse

from janis_assistant.management.envvariables import EnvVariables, HashableEnum
from janis_assistant.utils.callprogram import collect_output_from_command


def parse_if_dict(T: Type, val, path: str, skip_if_empty=True):
    if val is None:
        return None
    elif isinstance(val, T):
        return val
    elif isinstance(val, dict):
        if skip_if_empty and len(val) == 0:
            return None
        try:
            return T(**val)
        except TypeError as e:
            Logger.critical(
                f"Couldn't initialise key {path} of type '{T.__name__}', {str(e)}"
            )
            exit(1)
    raise Exception(
        f"Couldn't parse for type '{T.__name__}', "
        f"expected dict but received '{val.__class__.__name__}' for {str(val)}"
    )


class NoAttributeErrors:
    def __getattr__(self, item):
        try:
            return self.__getattribute__(item)
        except AttributeError:
            # Give None to support backwards compatible JanisConfigurations
            return None

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)


[docs]class JanisConfigurationTemplate(Serializable):
[docs] def __init__(self, id: str = None, **d): """ :param id: :type id: The identifier of the template """ from janis_assistant.templates import from_template self.id = ( id or EnvVariables.default_template.resolve() or "local" ) # change default here self.templateconfig = {k: v for k, v in d.items()} Logger.log("Got template ID: " + str(self.id)) self.template = from_template(self.id, self.templateconfig)
def to_dict(self): return {"id": self.id, **self.templateconfig}
[docs]class JanisConfigurationEnvironment(NoAttributeErrors, Serializable):
[docs] def __init__( self, max_cores: int = None, max_memory: int = None, max_duration: int = None ): """ Additional settings to configure a Janis environment. Currently, it mostly involves restricing resources (like cores, memory, duration) to fit within specific compute requirements. Notable, these values limit the requested values if they're a number. It doesn't currently limit this value if it's determined via an operator. :param max_cores: Limit the number of CPUs a job can request :type max_cores: int :param max_memory: Limit the amount of memory (in GB) a job can request :type max_memory: int :param max_duration: (Default: 86400) Limit the amount of time (in seconds) a job can request. :type max_duration: int """ self.max_cores = max_cores self.max_memory = max_memory self.max_duration = max_duration
[docs]class MySqlInstanceConfig(Serializable):
[docs] def __init__(self, url, username, password, dbname="cromwell"): """ Configuration options for a MySQL instance :param url: URL of the mysql instance (including port if not 3036) :param username: Username :param password: Password, not this is embedded into the Cromwell configuration (<output-dir>/janis/configuration/cromwell.conf) :param dbname: Database name to use, default 'cromwell' """ self.url = url self.username = username self.password = password self.dbname = dbname
[docs]class JanisConfigurationCromwell(Serializable):
[docs] def __init__( self, jar: str = None, config_path: str = None, url: str = None, memory_mb: int = None, call_caching_method: str = "fingerprint", timeout: int = 10, polling_interval=None, db_type: DatabaseTypeToUse = DatabaseTypeToUse.filebased, mysql_credentials: Union[dict, MySqlInstanceConfig] = None, additional_config_lines: str = None, ): """ :param url: Use an existing Cromwell instance with this URL (with port). Use the BASE url, do NOT include http. :type url: str :param jar: Specific Cromwell JAR to use (prioritised over ``$JANIS_CROMWELLJAR``) :param config_path: Use a supplied Config path when running a Cromwell instance. Also see ``additional_config_lines`` for including specific cromwell options. :param memory_mb: Amount of memory to give Cromwell instance through ``java -xmx <max-memory>M -jar <jar>`` :param call_caching_method: (Default: "fingerprint") Cromwell caching strategy to use, see `Call cache strategy options for local filesystem <https://cromwell.readthedocs.io/en/stable/Configuring/#call-cache-strategy-options-for-local-filesystem>`_ for more information. :param timeout: Suspend a Janis workflow if unable to contact cromwell for <timeout> MINTUES. :param polling_interval: How often to poll Cromwell, by default this starts at 5 seconds, and gradually falls to 60 seconds over 30 minutes. For more information, see the ``janis_assistant.Cromwell.get_poll_interval`` `method <https://github.com/PMCC-BioinformaticsCore/janis-assistant/blob/master/janis_assistant/engines/cromwell/main.py#L179>`_ :param db_type: (Default: filebased) DB type to use for Janis. "none" -> no database; "existing" -> use mysql credentials from ``cromwell.mysql_credentials``; "managed" -> Janis will start and manage a containerised MySQL instance; "filebased": Use the HSQLDB filebased db through Cromwell for SMALL workflows only (NB: this can produce large files, and timeout for large workflows); "from_script": Call the script ``$JANIS_DBCREDENTIALSGENERATOR`` for credentials. See `get_config_from_script <https://github.com/PMCC-BioinformaticsCore/janis-assistant/blob/master/janis_assistant/management/configuration.py#L621>`_ for more information. :type db_type: "none" | "existing" | "managed" | "filebased" | "from_script" :param mysql_credentials: A dictionary of MySQL credentials :type mysql_credentials: MySqlInstanceConfig :param additional_config_lines: A string to add to the bottom of a generated Cromwell configuration. This is NOT used for an existing cromwell instance, or a config is supplied. :type additional_config_lines: str """ self.jar = jar self.config_path = config_path self.url = url self.memory_mb = memory_mb self.call_caching_method = call_caching_method self.timeout = timeout self.polling_interval = polling_interval self.db_type = ( DatabaseTypeToUse(db_type) if db_type else DatabaseTypeToUse.filebased ) self.mysql_credentials = None if mysql_credentials: self.mysql_credentials = parse_if_dict( MySqlInstanceConfig, mysql_credentials, "cromwell.mysql_credentials" ) self.additional_params = additional_config_lines
def get_database_config_helper(self): return JanisDatabaseConfigurationHelper( db_type=self.db_type, mysql_credentials=self.mysql_credentials )
[docs]class JanisConfigurationRecipes(Serializable): VALID_YAML_EXTENSIONS = { "yaml", "yml", "json", } # json can be parsed by ruamel.yaml with no extra config
[docs] def __init__( self, recipes: dict = None, paths: Union[str, List[str]] = None, directories: Union[str, List[str]] = None, ): """ :param recipes: A dictionary of input values, keyed by the recipe name. :type recipes: dict :param paths: a list of ``*.yaml`` files, where each path contains a dictionary of input values, keyed by the recipe name, similar to the previous recipes name. :type paths: List[str] :param directories: a directory of ``*.yaml`` files, where the ``*`` is the recipe name. :type directories: List[str] """ self.recipes = recipes or {} self.paths: Optional[List[str]] = None self.directories: Optional[List[str]] = None self._files_by_key = None self._loaded_recipes = False if paths: if isinstance(paths, list): self.paths = [fully_qualify_filename(d) for d in paths] else: self.paths = fully_qualify_filename(paths) if directories: if isinstance(directories, list): self.directories = [fully_qualify_filename(d) for d in directories] else: self.directories = fully_qualify_filename(directories)
@staticmethod def parseable_yaml_filename_if_valid(path): base, ext = os.path.splitext(path) if len(ext) == 0: return os.path.basename(path), path if ext[1:] in JanisConfigurationRecipes.VALID_YAML_EXTENSIONS: return os.path.basename(path)[: -len(ext)], path return None def load_recipes(self, force=False): from os import listdir dirs: List[str] = [] paths: List[str] = [] paths_from_env = EnvVariables.recipe_paths.resolve(True) dirs_from_env = EnvVariables.recipe_directory.resolve(True) or [] if paths_from_env: paths.extend(paths_from_env) if self.paths: paths.extend(self.paths) if dirs_from_env: dirs.extend(dirs_from_env) if self.directories: dirs.extend(self.directories) self._files_by_key = {} # Do if: force or (we haven't loaded recipes and we have recipes to load) if not (force or not self._loaded_recipes and (paths or dirs)): return import ruamel.yaml # Do the env first, then ones from the config can cascade over them for recipe_location in paths: try: with open(recipe_location) as rl: adr = ruamel.yaml.load(rl, Loader=ruamel.yaml.Loader) self.recipes.update(adr) except Exception as e: Logger.critical(f"Couldn't load recipe '{recipe_location}': {e}") for d in dirs: if not os.path.exists(d): Logger.critical(f"Couldn't find recipe directory: '{d}', skipping") continue if not os.path.isdir(d): Logger.critical( f"The path listed as a recipe directory was not a directory: '{d}', skipping" ) continue contents = listdir(d) for f in contents: fpath = os.path.join(d, f) parsed = self.parseable_yaml_filename_if_valid(fpath) if not parsed: Logger.warn( f"Skipping file within recipe directory '{fpath}' as it contained " f"an unrecognised extension: '{os.path.splitext(fpath)[1]}" ) continue key, value = parsed if key not in self._files_by_key: self._files_by_key[key] = [] self._files_by_key[key].append(value) self._loaded_recipes = True @staticmethod def get_cascaded_dict_from_yamls(files): d = {} for f in files: try: with open(f) as rl: adr = ruamel.yaml.load(rl, Loader=ruamel.yaml.Loader) d.update(adr) except Exception as e: Logger.critical(f"Couldn't parse file '{f}': {e}") return d def get_recipe_for_keys(self, keys: List[str]): self.load_recipes() if not keys: return {} rec = {} for key in keys: found_key = False if key in self._files_by_key: found_key = True rec.update(self.get_cascaded_dict_from_yamls(self._files_by_key[key])) if key in self.recipes: found_key = True rec.update(self.recipes[key] or {}) if not found_key: Logger.critical(f"Couldn't find '{key}' in known recipes") return rec
[docs]class JanisConfigurationNotifications(Serializable):
[docs] def __init__( self, email: str = None, from_email: str = "janis-noreply@petermac.org", mail_program: str = None, ): """ :param email: Email address to send status updates to :param from_email: (Default: janis-noreply@petermac.org) :param mail_program: Which mail program to use to send emails. A fully formatted email will be directed as stdin (eg: sendmail -t) """ self.email = email self.from_email = from_email self.mail_program = mail_program
[docs]class JanisConfiguration(NoAttributeErrors, Serializable): _configpath = None @staticmethod def initial_configuration( path: Optional[str], potential_paths: Optional[Union[str, List[str]]] = None ): paths_to_check = [] if path is not None: if not os.path.exists(path): raise Exception(f"Couldn't find Janis configuration at path: {path}") paths_to_check = [path] else: if potential_paths: if isinstance(potential_paths, list): paths_to_check.extend(potential_paths) else: paths_to_check.append(potential_paths) default_path = EnvVariables.config_path.resolve(False) if default_path: paths_to_check.append(default_path) paths_to_check.append(EnvVariables.config_path.default()) for p in paths_to_check: if p: p = os.path.expanduser(p) if not os.path.exists(p): continue with open(os.path.expanduser(p)) as cp: y = ruamel.yaml.safe_load(cp) return JanisConfiguration(**y) return JanisConfiguration()
[docs] def __init__( self, output_dir: str = EnvVariables.output_dir.resolve(False), execution_dir: str = EnvVariables.exec_dir.resolve(False), call_caching_enabled: bool = True, engine: str = EngineType.cromwell.value, cromwell: Union[JanisConfigurationCromwell, dict] = None, template: Union[JanisConfigurationTemplate, dict] = None, recipes: Union[JanisConfigurationRecipes, dict] = None, notifications: Union[JanisConfigurationNotifications, dict] = None, environment: Union[JanisConfigurationEnvironment, dict] = None, run_in_background: bool = None, digest_cache_location: str = None, container: Union[str, Container] = None, search_paths: List[str] = None, ): """ :param engine: Default engine to use :type engine: "cromwell" | "cwltool" :param template: Specify options for a Janis template for configuring an execution environment :type template: JanisConfigurationTemplate :param cromwell: A dictionary for how to configure Cromwell for Janis :type cromwell: JanisConfigurationCromwell :param recipes: Configure recipes in Janis :type recipes: JanisConfigurationRecipes :param notifications: Configure Janis notifications :type notifications: JanisConfigurationNotifications :param environment: Additional ways to configure the execution environment for Janis :type environment: JanisConfigurationEnvironment :param output_dir: A directory that Janis will use to generate a new output directory for each janis-run :param execution_dir: Move all execution to a static directory outside the regular output directory. :param call_caching_enabled: (default: true) call-caching is enabled for subsequent runs, on the SAME output directory :param run_in_background: By default, run workflows as a background process. In a SLURM environment, this might submit Janis as a SLURM job. :type run_in_background: bool :param digest_cache_location: A cache of docker tags to its digest that Janis uses replaces your docker tag with it's `digest <https://docs.docker.com/engine/reference/commandline/pull/#pull-an-image-by-digest-immutable-identifier>`_. :type digest_cache_location: str :param container: Container technology to use, important for checking if container environment is available and running mysql instance. :type container: "docker" | "singularity" :param search_paths: A list of paths to check when looking for python files and input files :type search_paths: List[str] """ self.config_dir = EnvVariables.config_dir.resolve(True) self.db_path = fully_qualify_filename(os.path.join(self.config_dir, "janis.db")) self.digest_cache_location = digest_cache_location if not digest_cache_location: self.digest_cache_location = os.path.join(self.config_dir, "digest_cache") self.output_dir = output_dir self.execution_dir = execution_dir self.search_paths = search_paths or [os.path.expanduser("~/janis/")] self.engine = engine self.call_caching_enabled = call_caching_enabled self.run_in_background = run_in_background self.recipes = parse_if_dict( JanisConfigurationRecipes, recipes or {}, "recipes", skip_if_empty=False, ) self.template = parse_if_dict( JanisConfigurationTemplate, template or {}, "template", skip_if_empty=False ) self.cromwell: JanisConfigurationCromwell = parse_if_dict( JanisConfigurationCromwell, cromwell or {}, "cromwell", skip_if_empty=False ) self.notifications: JanisConfigurationNotifications = parse_if_dict( JanisConfigurationNotifications, notifications or {}, "notifications", skip_if_empty=False, ) self.environment: JanisConfigurationEnvironment = parse_if_dict( JanisConfigurationEnvironment, environment or {}, "environment", skip_if_empty=False, ) # Get's set by the template for now, but eventually we should be able to look it up self.container = None if container: if isinstance(container, Container): self.container = container else: self.container = get_container_by_name(container) if self.template and self.template.template: self.template.template.post_configuration_hook(self)
@staticmethod def get_value_for_key(d, key, default): val = d.get(key) if val is None: return default.get(key) if default else None Logger.log(f"Got value '{val}' for key '{key}'") return val @staticmethod def from_path(path: Optional[str]): p = path or EnvVariables.config_path.resolve(True) if p: p = os.path.expanduser(p) if p and os.path.exists(p): with open(os.path.expanduser(p)) as cp: y = ruamel.yaml.load(cp, Loader=ruamel.yaml.Loader) return JanisConfiguration(y) return JanisConfiguration() @staticmethod def default(): """ The defaults listed are provided to the user on init, they should be bareboned to the options that a user may often want to configure. These are not defaults during execution """ deflt = { "engine": EngineType.cromwell.value, "notifications": {"email": None}, } return stringify_dict_keys_or_return_value(deflt)
class JanisDatabaseConfigurationHelper(Serializable): def __init__( self, db_type: DatabaseTypeToUse, mysql_credentials: Optional[Union[MySqlInstanceConfig, dict]], ): self.db_type = db_type self.mysql_config = parse_if_dict( MySqlInstanceConfig, mysql_credentials, "mysql_config" ) def which_db_to_use(self) -> DatabaseTypeToUse: return self.db_type def get_config_for_existing_config(self): t = self.which_db_to_use() if t != DatabaseTypeToUse.existing: raise Exception( f"Attempted to request database config for {DatabaseTypeToUse.existing.value} config, " f"but the database helper wants to use {t.value}" ) from janis_assistant.engines.cromwell.cromwellconfiguration import ( CromwellConfiguration, ) config = self.mysql_config return CromwellConfiguration.Database.mysql( url=config.url, username=config.username, password=config.password, database=config.dbname, # maxConnections=2, ) def get_config_for_filebased_db(self, path): t = self.which_db_to_use() if t != DatabaseTypeToUse.filebased: raise Exception( f"Attempted to request database config for {DatabaseTypeToUse.filebased.value} config, " f"but the database helper wants to use {t.value}" ) from janis_assistant.engines.cromwell.cromwellconfiguration import ( CromwellConfiguration, ) return CromwellConfiguration.Database.filebased_db(location=path) def get_config_for_managed_mysql(self, url): t = self.which_db_to_use() if t != DatabaseTypeToUse.managed: raise Exception( f"Attempted to request database config for {DatabaseTypeToUse.managed.value} " f"config, but the database helper wants to use {t.value}" ) from janis_assistant.engines.cromwell.cromwellconfiguration import ( CromwellConfiguration, ) return CromwellConfiguration.Database.mysql( username=None, password=None, url=url ) def get_config_from_script(self, execution_dir: str): try: import subprocess, os, json from janis_assistant.management.envvariables import EnvVariables from janis_assistant.engines.cromwell.cromwellconfiguration import ( CromwellConfiguration, ) file_path = os.getenv(EnvVariables.db_script_generator) Logger.debug( f"Found path '{EnvVariables.db_script_generator}' to generate database credentials" ) if file_path is None: raise Exception( f"Couldn't get database credentials as couldn't find value in env var '{EnvVariables.db_script_generator}'" ) # if not os.path.exists(file_path): # raise Exception(f"Couldn't locate script '{file_path}' to execute") try: val = collect_output_from_command( f"{file_path} {execution_dir}", stderr=Logger.guess_log, shell=True ) except Exception as e: Logger.critical(f"Failed to generate database credentials ({repr(e)})") raise d = json.loads(val) Logger.debug( "Received keys from database credentials script: " + ", ".join(d.keys()) ) keys = {"username", "password", "database", "host"} missing_keys = {k for k in keys if k not in d} if len(missing_keys) > 0: raise Exception( "The script to generate database credentials was missing the keys: " + ", ".join(missing_keys) ) return CromwellConfiguration.Database.mysql( username=d["username"], password=d["password"], database=d["database"], url=d["host"], ) except Exception as e: Logger.critical( "Failed to get database configuration details from script: " + repr(e) ) raise def run_delete_database_script(self, execution_dir: str): try: import subprocess, os from janis_assistant.management.envvariables import EnvVariables file_path = os.getenv(EnvVariables.db_script_generator_cleanup) if file_path is None: raise Exception( f"Couldn't delete generated database credentials as couldn't find value in env var '{EnvVariables.db_script_generator_cleanup}'" ) Logger.debug( f"Found path '{EnvVariables.db_script_generator_cleanup}' to delete database credentials" ) # if not os.path.exists(file_path): # raise Exception(f"Couldn't locate script '{file_path}' to execute") val = collect_output_from_command( f"{file_path} {execution_dir}", stderr=Logger.guess_log, shell=True ) if val is not None and len(val) > 0: Logger.info( f"Successfully deleted DB credentials and received message: {val}" ) else: Logger.info("Deleted credentials with rc=0") except Exception as e: Logger.warn( f"Failed to delete database configuration details for execution directory '{execution_dir}': " + repr(e) ) def stringify_dict_keys_or_return_value(d): if d is None: return d if isinstance(d, list): return [stringify_dict_keys_or_return_value(dd) for dd in d] if isinstance(d, int) or isinstance(d, float) or isinstance(d, bool): return d if not isinstance(d, dict): return str(d) out = {} for k, v in d.items(): out[str(k)] = stringify_dict_keys_or_return_value(v) return out