Source code for scenario

#      ConCERO - a program to automate data format conversion and the execution of economic modelling software.
#      Copyright (C) 2018  CSIRO Energy Business Unit
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
Created on Feb 05 09:44:50 2018

.. sectionauthor:: Lyle Collins <Lyle.Collins@csiro.au>
.. codeauthor:: Lyle Collins <Lyle.Collins@csiro.au>
"""
import os

import datetime as dt

import concero.conf as conf
from concero.format_convert_tools import read_yaml
from concero.model import Model
from concero.to_cero import ToCERO
from concero.from_cero import FromCERO
from concero.cero import CERO

[docs]class Scenario(dict): _logger = conf.setup_logger(__name__) def __init__(self, sc_def: dict, *args, parent: dict=None, **kwargs): """ :param sc_def: A scenario definition object. :param args: Passed to the superclass (dict) as positional arguments at initialisation. :param kwargs: Passed to the superclass (dict) as keyword arguments at initialisation. """ defaults = {"name": None, "run_no": None, "search_paths": [], "ref_dir": None, "models": [], "input_conf": [], "output_conf": []} if parent is None: parent = {} defaults.update(parent) try: assert isinstance(sc_def, dict) except AssertionError: raise TypeError("Scenario definition provided in incorrect format - type %s instead of dict." % type(sc_def)) defaults.update(sc_def) sc_def = defaults super().__init__(sc_def, *args, **kwargs) if not self.get("name"): self["name"] = "scenario_unnamed" self._logger.warn("Scenario name has not been specified - scenario named '%s'." % self["name"]) if not issubclass(type(self.get("run_no")), int): self["run_no"] = 1 self._logger.info("Scenario run_no (run number) has not been specified (or is not of integer type) - defaults to %s." % self["run_no"]) if isinstance(self["search_paths"], str): self["search_paths"] = [os.path.abspath(self["search_paths"])] elif not self["search_paths"]: self["search_paths"].append(os.path.abspath(".")) if self["ref_dir"] is None: self["ref_dir"] = os.path.abspath(".") model_parent = {"search_paths": self["search_paths"], "ref_dir": self["ref_dir"]} self["models"] = [Model(m, parent=model_parent) for m in self.get("models")] if isinstance(self["input_conf"], str): self["input_conf"] = [self["input_conf"]] if isinstance(self["output_conf"], str): self["output_conf"] = [self["output_conf"]] # Load ToCERO conf par_dict = {"search_paths": self["search_paths"]} for idx, ic in enumerate(self["input_conf"]): self["input_conf"][idx] = self.find_file(ic) self["input_conf"][idx] = ToCERO(self["input_conf"][idx], parent=par_dict) # Load FromCERO conf par_dict = {"ref_dir": self["ref_dir"]} for idx, oc in enumerate(self["output_conf"]): self["output_conf"][idx] = self.find_file(oc) self["output_conf"][idx] = FromCERO(self["output_conf"][idx], parent=par_dict) self.is_valid() # Check Scenario is valid
[docs] def run(self) -> None: """ Execute a scenario run. """ self.cero = CERO.create_empty() ceros = [in_conf.create_cero() for in_conf in self["input_conf"]] if ceros: self.cero = CERO.combine_ceros(ceros) print("Successfully loaded scenario inputs as CERO.") FromCERO.dataframe_out(self.cero, (self.get_name() + "_%03d_step_%02d.xlsx" % (self["run_no"], 0)), "xlsx") for idx, model in enumerate(self["models"]): m_cero = model.run(self.cero) print("Completed run of model (%s) at %s." % (model["name"], dt.datetime.now().strftime('%Y-%m-%d %H:%M'))) # If ouput_conf is not defined for a model, then None is returned... if m_cero is None: continue if not CERO.is_cero(m_cero): raise TypeError("Object returned from model run is *not* of CERO format.") if model.get("export_mod_xlsx", self.get("export_mod_xlsx", True)): # By default, export model outputs automatically to xlsx files model_out_file = (self.get_name() + "_%03d_%s.xlsx" % (self["run_no"], model["name"])) print("Exporting output of %s to %s." % (model["name"], model_out_file)) m_cero.to_excel(model_out_file) self.cero = CERO.combine_ceros([self.cero, m_cero]) if self.get("export_int_xlsx", True): # If true (default), export the intermediate steps to xlsx files isfn = (self.get_name() + "_%03d_step_%02d.xlsx" % (self["run_no"], idx + 1)) print("Exporting updated CERO to %s." % (isfn)) self.cero.to_excel(isfn) for out_conf in self["output_conf"]: out_conf.exec_procedures(self.cero) else: print("Completed generation of scenario outputs.")
[docs] def is_valid(self, raise_exception=True) -> bool: """ Performs static checks on ``self`` to ensure it is a valid Scenario object.""" req_keys = ["name", "models", "input_conf", "output_conf"] if not all([k in self.keys() for k in req_keys]): raise TypeError(("Not all required key-value pairs have been defined. " + "It is necessary to define all of %s.") % req_keys) if not isinstance(self["models"], list): raise TypeError("Scenario property \'models\' must be defined as a list.") for model in self["models"]: if not issubclass(type(model), Model): raise TypeError("Object '%s' is of type '%s', not 'Model'." % (model, type(model))) if not model.check_config(raise_exception=raise_exception, runtime=False): return False for ic in self["input_conf"]: if not ToCERO.check_config(ic, raise_exception=raise_exception, runtime=False): return False for oc in self["output_conf"]: if not FromCERO.check_config(oc, raise_exception=raise_exception, runtime=False): return False return True
[docs] def run_checks(self, raise_exception=True): """ Performs runtime checks on ``self`` to ensure it is a valid Scenario object. Failure of runtime checks indicates that the scenario is not ready to run. :param bool raise_exception: :return: """ for ic in self["input_conf"]: ToCERO.check_config(ic, raise_exception=raise_exception, runtime=True)
[docs] def get_name(self, long_form: bool=True, raise_exception=False) -> str: """ Returns the name of the ``Scenario``, which is dependent on the first linked ``ScenariosSet`` object. :param long_form: If ``True`` (default) return a long-form of the name. If ``False``, return a short form. :return: The name of the ``Scenario``. """ if hasattr(self, "_linked_scenariosets"): return self._linked_scenariosets[0].get_scenario_name(self, long_form=long_form) elif self.get("name"): return self["name"] else: if raise_exception: raise TypeError("Must either link ScenariosSet, or give Scenario a 'name' before a name can be generated.") return ""
def find_file(self, filename): orig_filename = filename filename = os.path.relpath(filename) for sp in self["search_paths"]: ToCERO._logger.debug("Scenario.find_file(): testing path: %s" % os.path.join(sp, filename)) if os.path.isfile(os.path.join(sp, filename)): return os.path.join(sp, filename) else: msg = "File '%s' not found on any of the paths %s." % (orig_filename, self["search_paths"]) Scenario._logger.error(msg) raise FileNotFoundError(msg)
[docs] def get_linked_scenarios(self): """ :return "List['ScenariosSet']": A list of linked ``ScenariosSet``. """ return self.get("_linked_scenariosets")
[docs] @staticmethod def load_scenarios(scen_def: str, parent=None): """Load one or more scenarios from a file. :param scen_def: The file containing one or more scenario definitions. :return "Union['Scenario',List['Scenario']]": Either a single ``Scenario`` , or a `list` of ``Scenario`` s. """ defaults = {"search_paths": []} if parent is None: parent = {} defaults.update(parent) if isinstance(scen_def, str): scen_def = os.path.abspath(os.path.normpath(scen_def)) defaults["search_paths"].append(os.path.dirname(scen_def)) scen_def = read_yaml(scen_def) msg = "defaults: %s" % (defaults) Scenario._logger.debug(msg) if isinstance(scen_def, dict): # single scenario in file return [Scenario.load_scenario(scen_def, parent=defaults)] elif isinstance(scen_def, list): # List of scenarios return [Scenario.load_scenario(scd, parent=defaults) for scd in scen_def]
[docs] @staticmethod def load_scenario(scen_def: str, parent=None): """ :param scen_def: The file containing a single scenario definition, or a scenario definition `dict` . :return 'Scenario': A single ``Scenario`` object. """ defaults = {"search_paths": [], "ref_dir": os.path.abspath(os.getcwd())} if parent is None: parent = {} defaults.update(parent) if isinstance(scen_def, str): scen_def = os.path.abspath(scen_def) defaults["search_paths"].append(os.path.dirname(scen_def)) scen_def = read_yaml(scen_def) if not isinstance(scen_def, dict): raise TypeError( "'scen_def' must be either a str to a file containing a scenario definition or a scenario definition dict.") return Scenario(scen_def, parent=defaults)