"""Model class."""
import os
import numpy as np
import pandas as pd
import datetime as dt
from snowdrop.src.model.settings import SolverMethod
from snowdrop.src.model.util import setValues
from snowdrop.src.utils.equations import getMaxLeadsLags
from snowdrop.src.misc.termcolor import cprint, colored
from snowdrop.src.preprocessor.function import get_function_and_jacobian
path = os.path.dirname(os.path.abspath(__file__))
lst1 = []; lst2 = []; lst3 = []
[docs]
class Model:
def __init__(self,interface,anticipate=None,options=None,infos=None,bCompileAll=True):
"""
Construct Model class.
Parameters:
:param self: Model object.
:type self: `Model'.
:param interface: Interface object.
:type interface: `Interface'.
:param anticipate: If True future shocks are anticipated.
:type anticipate: bool.
:param options: Model options.
:type options: dict.
:param infos: Model Info.
:type infos: dict.
:param bCompileAll: If True compiles all functions source code.
:type bCompileAll: bool, optional.
:returns: Instance of a `Model' class.
"""
self.symbolic = interface
self.symbols = interface.symbols
self.calibration = interface.calibration_dict
self.eq_vars = interface.eq_vars
self.steady_state = interface.steady_state
self.steady_growth = None
self.ss = interface.ss
self.eqLabels = interface.eqLabels
self.numberOfNewEqs = interface.numberOfNewEqs
self.terminal_values = interface.terminal_values
self.autodiff = interface.autodiff
self.jaxdiff = interface.jaxdiff
self.SOLVER = interface.SOLVER
self.order = 1 if interface.order is None else interface.order
#self.variables = sum( [tuple(e) for k,e in self.symbols.items() if k not in ('parameters','shocks','exogenous', 'values','equations_labels')], ())
self.options = options if options is not None else {}
self.data_sources = interface.data_sources if interface.data_sources is not None else {}
self.infos = infos
self.name = "model" if not bool(self.infos) else self.infos['name']
self.priors = interface.priors
self.isLinear = False
self.solved = False
self.ev = None
self.nUnit = None
self.anticipate = None
self.mapSwap = None
self.condShocks = None
self.date_range = None
self.count = 0
self.T = 100
self.FILTER = None
self.SMOOTHER = None
self.INITIAL_CONDITION = None
self.INIT_COND_CORRECTION = None
self.PRIOR = None
self.SAMPLING_ALGORITHM = None
self.total_nmbr_shocks = 0
self.estimate = False
self.Topology = None
self.stable = None
self.unstable = None
self.max_lead_shock = 0
self.min_lag_shock = 0
self.max_lead_exog = 0
self.min_lag_exog = 0
self.LOG_LINEARIZE_EQS = bool(self.symbols["log_variables"]) if "log_variables" in self.symbols else False
self.COMPLEMENTARITY_CONDITIONS = interface.COMPLEMENTARITY_CONDITIONS
self.flippedEndogVariables = None
self.flippedExogVariables = None
self.indexOfFlippedEndogVariables = None
self.indexOfFlippedExogVariables = None
self.y = None
self.functions = {}
self.bCompileAll = bCompileAll
# Use sparse algebra matrix calculations
self.bSparse = interface.bSparse or interface.SOLVER == SolverMethod.ABLR
self.anticipate = None
self.lead_lag_incidence = None
self.var_rows_incidence = None
self.linear_model = {}
self.FILTER = None
self.SMOOTHER = None
self.INITIAL_CONDITION = None
self.INIT_COND_CORRECTION = None
self.PRIOR = None
self.state_vars = interface.state_vars
self.eqs_number = interface.eqs_number
# Update calibration parameters
self.__update_from_symbolic__()
# Save exogenous series
from snowdrop.src.utils.util import getExogenousSeries
self.calibration["exogenous"] = getExogenousSeries(self)
if bCompileAll:
self.__compile_functions__()
# Check if model is linear
var,p,e = self.calibration['variables','parameters','shocks']
if p.ndim == 2:
p = np.copy(p[:,0])
z = np.vstack((var,var,var))
jacob1 = get_function_and_jacobian(self,params=p,y=z,order=1)[1]
jacob2 = get_function_and_jacobian(self,params=p,y=0.001+z,order=1)[1]
isLinear = np.all(jacob1 == jacob2)
self.isLinear = isLinear
if self.SOLVER is None:
if isLinear:
self.SOLVER = SolverMethod.AndersonMoore
else:
self.SOLVER = SolverMethod.LBJ
if self.isLinear:
from snowdrop.src.utils.equations import getLeadLagIncidence
from snowdrop.src.utils.equations import topology
# from snowdrop.src.utils.equations import getVarRowsIncidence
# Get Lead Lag Incidence matrix
lli = getLeadLagIncidence(self)
# getVarRowsIncidence(self)
# Get variables topology. Find variables position in equations.
if not (interface.bellman) and not (interface.objective_function):
topology(self)
else:
lli = None
def __update_from_symbolic__(self):
"""
Update calibration dictionary according to the symbols definitions.
Parameters:
:param self: Model object.
:type self: `Model'.
"""
from snowdrop.src.preprocessor.misc import CalibrationDict, calibration_to_vector
# Check if calibration dictionary or options contain symbolic expressions
b = False
distribution = self.symbolic.options.get('distribution')
if not distribution is None:
b = True
else:
for k in self.calibration:
val = self.calibration[k]
if type(val) is str:
b = True
break
shock_values = self.symbolic.options.get('shock_values')
if not shock_values is None:
for val in shock_values:
if type(val) is str:
b = True
break
elif type(val) is list:
for v in val:
if type(v) is str:
b = True
break
if b:
from snowdrop.src.preprocessor.eval_solver import evaluate
from snowdrop.src.preprocessor.symbolic_eval import NumericEval
# x=dict(self.symbolic.calibration_dict)
self.calibration_dict = evaluate(self.symbolic.calibration_dict)
calib = calibration_to_vector(self.symbols,self.calibration_dict)
self.calibration = CalibrationDict(self.symbols, calib)
# Read symbolic structure
evaluator = NumericEval(self.calibration_dict)
self.options = evaluator.eval(self.symbolic.options)
self.distribution = evaluator.eval(distribution)
if self.distribution is None:
self.covariances = None
else:
self.covariances = np.atleast_2d(np.array(self.distribution.cov))
else:
calib = calibration_to_vector(self.symbols, self.calibration)
self.calibration = CalibrationDict(self.symbols, calib)
self.options = self.symbolic.options
[docs]
def get_calibration(self, pname, *args):
"""
Return calibration value.
Parameters:
:param self: Model object.
:type self: `Model'.
:param pname: name of the variable.
:type pname: str
:param args: Variable number of arguments.
:type args: Tuple
:returns: Calibration value.
"""
if isinstance(pname, list):
return [self.get_calibration(p) for p in pname]
elif isinstance(pname, tuple):
return tuple([self.get_calibration(p) for p in pname])
elif len(args)>0:
pnames = (pname,) + args
return self.get_calibration(pnames)
group = [g for g in self.symbols.keys() if pname in self.symbols[g]]
try:
group = group[0]
except Exception:
raise Exception('Unknown symbol {}.'.format(pname))
i = self.symbols[group].index(pname)
v = self.calibration[group][i]
return v
[docs]
def set_calibration(self, *args, **kwargs):
"""
Set the calibration value.
.. note::
PLEASE USE THIS FUNCTION ONLY WHEN VALUE IS A SCALAR!!!
IN OTHER WORDS THIS VALUE SHOULD NOT BE TIME DEPENDENT.
Parameters:
:param self: Model object.
:type self: `Model'
:param args: Variable number of arguments.
:type args: Tuple
:param kwargs: Arbitrary number of keyword arguments
:type kwargs: dict
"""
# raise exception if unknown symbol ?
if len(args)==2:
pname, pvalue = args
if isinstance(pname, str):
self.set_calibration(**{pname:pvalue})
else:
# else ignore pname and pvalue
calib = self.symbolic.calibration_dict
calib.update(kwargs)
self.__update_from_symbolic__()
def __compile_functions__(self,order=1):
"""
Compile model equations function and compute Jacobian matrix up to the third order.
Parameters:
:param self: Parameter.
:type self: Model.
:param order: Maximum order of partial equations derivatives.
:type order: int.
:returns: List of functions.
"""
from snowdrop.src.preprocessor.function_compiler_sympy import compile_higher_order_function
from snowdrop.src.preprocessor.function_compiler_sympy import compile_function
from snowdrop.src.preprocessor.function_compiler_sympy import compile_jacobian
from snowdrop.src.utils.equations import getRHS
import snowdrop.src.preprocessor.function_compiler_sympy as fc
func = f_jacob = f_hessian = f_tensor = src_dynamic = src_sparse = None
shocks = self.symbols.get('shocks',[])
measurement_shocks = self.symbols.get('measurement_shocks',[])
vv = self.symbols['variables']
logv = self.symbols.get('log_variables',[])
ss_vv = self.symbols['endogenous']
# Gets maximum of leads and minimum of lags of equations variables
max_lead,min_lag,n_fwd_looking_var,n_bkwd_looking_var,var_lead,var_lag = \
getMaxLeadsLags(eqs=self.symbolic.equations,variables=vv)
self.max_lead = max_lead
self.min_lag = min_lag
self.n_fwd_looking_var = n_fwd_looking_var
self.n_bkwd_looking_var = n_bkwd_looking_var
self.var_lead = var_lead
self.var_lag = var_lag
# Gets maximum of leads and minimum of lags of equations shock variables
max_lead_shock,min_lag_shock,n_fwd_looking_shocks,n_bkwd_looking_shocks,shock_lead,shock_lag = \
getMaxLeadsLags(eqs=self.symbolic.equations,variables=shocks)
self.max_lead_shock = max_lead_shock
self.min_lag_shock = min_lag_shock
self.n_fwd_looking_shocks = n_fwd_looking_shocks
self.n_bkwd_looking_shocks = n_bkwd_looking_shocks
self.var_lead = shock_lead
self.var_lag = shock_lag
# Constructs arguments of function f([y(1),y,y(-1),e],p)
syms = [(v,1) for v in vv] + [(v,0) for v in vv] + [(v,-1) for v in vv]
syms_shock = []
for i in range(min_lag_shock,1+max_lead_shock):
syms_shock += [(s,i) for s in shocks]
syms += syms_shock
syms_ss = [(v,0) for v in ss_vv] + [(v,0) for v in shocks]
exog = self.symbols.get('exogenous',None)
if exog is None:
syms_exog = []
syms_exog_ss = []
else:
# Get maximum of leads and minimum of lags of equations exogenous variables
max_lead_exog,min_lag_exog,n_fwd_looking_exog,n_bkwd_looking_exog,exog_lead,exoglag = \
getMaxLeadsLags(eqs=self.symbolic.equations,variables=exog)
self.max_lead_exog = max_lead_exog
self.min_lag_exog = min_lag_exog
syms_exog = []
for i in range(min_lag_exog,1+max_lead_exog):
syms_exog += [(s,i) for s in exog]
syms_exog_ss = [(v,0) for v in exog]
# Measurement variables
if 'measurement_variables' in self.symbols:
vm = self.symbols['measurement_variables']
else:
vm = []
# Constructs arguments of function f(y,e,p)
meas_syms = [(v,0) for v in vv]
meas_syms += [(v,0) for v in vm]
if bool(measurement_shocks):
meas_syms += [(s,0) for s in measurement_shocks]
params = self.symbols['parameters']
### Construct list of equations to differentiate
# Transient equations
eqs = []
for eq in self.symbolic.equations:
arr = str.split(eq,'=')
if len(arr) == 2:
s = '{} - ({})'.format(arr[0].strip(),arr[1].strip())
s = str.strip(s)
else:
s = eq
eqs.append(s)
steady_state_eqs = []
for eq in self.symbolic.ss_equations:
arr = str.split(eq,'=')
if len(arr) == 2:
s = '{} - ({})'.format(arr[0].strip(),arr[1].strip())
s = str.strip(s)
else:
s = eq
steady_state_eqs.append(s)
if bool(self.symbolic.measurement_equations):
# Measurement equations
meas_eqs = []
for eq in self.symbolic.measurement_equations:
arr = str.split(eq,'=')
if len(arr) == 2:
s = '{} - ({})'.format(arr[0].strip(),arr[1].strip())
s = str.strip(s)
else:
s = eq
meas_eqs.append(s)
# Compile measurement function and compute its partial derivatives
fc.skip = True
meas_params = self.symbols['measurement_parameters']
f_measurement = compile_higher_order_function(equations=meas_eqs,syms=meas_syms,
params=meas_params,syms_exog=syms_exog,order=order,
function_name='f_measurement',out='f_measurement',
model_name=self.infos["name"],log_variables=logv)
f_measurement = f_measurement[0]
fc.skip = False
else:
f_measurement = None
# Right-hand-side of equations
b = None
# Compile right-hand-side equations
rhs_eqs,_ = getRHS(eqs=self.symbolic.equations,variables=vv,eqLabels=self.eqLabels,b=b)
f_rhs,txt_rhs,_,_,_,src_rhs = compile_higher_order_function(equations=rhs_eqs,syms=syms,params=params,syms_exog=syms_exog,
eq_vars=self.eq_vars,order=0,function_name='f_rhs',
out='f_rhs',b=b,model_name=self.infos["name"],log_variables=[])
if self.autodiff or self.jaxdiff:
# Constructs arguments of function f(y(1),y,y(-1),p)
endog = [(v,1) for v in vv] + [(v,0) for v in vv] + [(v,-1) for v in vv]
exog = self.symbols['parameters']
for i in range(min_lag_shock,1+max_lead_shock):
endog += [(s,i) for s in shocks]
func,txt_dynamic = compile_function(eqs,syms=endog,params=exog,syms_exog=syms_exog,log_variables=logv,b=True)
f_dynamic = f_steady = f_sparse = txt_sparse = src_ss = src_ss_jacob = None
if self.bSparse:
from snowdrop.src.preprocessor.function_compiler_sympy import get_indices
from snowdrop.src.numeric.solver import LBJax
#ind = get_indices(eqs,syms=endog)
LBJax.row_ind, LBJax.col_ind = get_indices(eqs,syms=endog)
if self.autodiff:
from autograd import jacobian,hessian
if order <= 1:
f_jacob = jacobian(func)
elif order <= 2:
f_hessian = hessian(func)
elif order <= 3:
f_tensor = jacobian(hessian)
elif self.jaxdiff:
from jax import jacobian,hessian,jit
if order <= 1:
f_jacob = jit(jacobian(func),backend='cpu')
elif order <= 2:
f_hessian = jit(hessian(func),backend='cpu')
elif order <= 3:
f_tensor = jit(jacobian(hessian),backend='cpu')
func = jit(func,backend='cpu')
else:
if True or self.bSparse:
# Compile dynamic function and compute its partial derivatives
f_sparse,txt_sparse,txt_der,txt_der2,txt_der3,src_sparse = compile_higher_order_function(
equations=eqs,syms=syms,params=params,syms_exog=syms_exog,eq_vars=self.eq_vars,order=order,
function_name='f_sparse',out='f_sparse',bSparse=True,b=b,model_name=self.infos["name"],log_variables=logv)
else:
f_sparse = txt_sparse = src_sparse = None
# Compile dynamic function and compute its partial derivatives
f_dynamic,txt_dynamic,txt_der,txt_der2,txt_der3,src_dynamic = compile_higher_order_function(
equations=eqs,syms=syms,params=params,syms_exog=syms_exog,eq_vars=self.eq_vars,
order=order,function_name='f_dynamic',out='f_dynamic',b=b,
model_name=self.infos["name"],log_variables=logv)
# Compile steady-state function
f_steady,src_ss = compile_function(
equations=steady_state_eqs,syms=syms_ss,params=params,syms_exog=syms_exog_ss,
eq_vars=self.eq_vars,function_name='f_steady',out='f_steady',
b=b,model_name=self.infos["name"],log_variables=logv)
# Compile partial derivatives of steady-state function
f_jacob,src_ss_jacob = compile_jacobian(
equations=steady_state_eqs,syms=syms_ss,params=params,syms_exog=syms_exog,
eq_vars=self.eq_vars,function_name='f_jacob',out='f_jacob',
b=b,model_name=self.infos["name"],log_variables=logv)
def f_static(y,p,e):
from snowdrop.src.numeric.solver.util import getExogenousData
if np.ndim(p) == 2:
p = p[:,-1]
x = list(e)
for i in range(min_lag_shock,max_lead_shock):
x += x
z = np.concatenate([y,y,y,x])
# Get exogenous time series
exog = getExogenousData(self)
bHasAttr = hasattr(f_dynamic,"py_func")
if bHasAttr:
f = f_dynamic.py_func(z,p,exog=exog,order=0)
else:
f = f_dynamic(z,p,exog=exog,order=0)
return f
functions = {
'f_static': f_static,
'f_dynamic': f_dynamic,
'f_sparse': f_sparse,
'f_steady': f_steady,
'f_jacob': f_jacob,
'f_hessian': f_hessian,
'f_tensor': f_tensor,
'f_rhs': f_rhs,
'f_measurement': f_measurement,
'func': func
}
functions_src = {
'f_dynamic_txt': txt_dynamic,
'f_dynamic_src': src_dynamic,
'f_sparse_txt': txt_sparse,
'f_sparse_src': src_sparse,
'f_steady_src': src_ss,
'f_jacob_src': src_ss_jacob,
'f_rhs_src': src_rhs
}
self.functions = functions
self.functions_src = functions_src
return
[docs]
def condition(self,mapCond,reset=False):
"""
Set conditional shocks.
Parameters:
:param self: Parameter.
:type self: `Model'.
:param mapCond: Dictionary of conditional shocks.
:type mapCond: dict.
:param reset: Flag to reset model swap dictionary.
:type reset: bool.
"""
if reset or self.condShocks is None:
self.condShocks = {}
m = self.condShocks
n_range = len(self.date_range)
var_names = self.symbols['variables']
for k in mapCond:
if k in var_names:
ind = var_names.index(k)
ts = mapCond[k]
for x in ts.items():
t,v = x
b = False
for i in range(n_range-1):
x1 = self.date_range[i]
x2 = self.date_range[1+i]
if t >= x1 and t < x2:
b = True
period = i
break
if b:
if t in m:
values = m[t] + [(ind,v)]
else:
values = [(ind,v)]
m[period] = values
self.condShocks = m
[docs]
def swap(self,var1,var2,val1=None,val2=None,delta1=None,delta2=None,rng=None,reset=False):
"""
Swap variables, i.e. exogenize variable var1 and endogenize variable var2.
Parameters:
:param self: Parameter.
:type self: `Model'.
:param var1: Endogenous variable.
:type var1: str or list of str or dict.
:param var2: Exogenous variable.
:type var2: str or list of str.
:param val1: New value of exogenized variable.
:type val1: float or pandas.Series.
:param val2: New value of endogenized variable.
:type val2: float.
:param delta1: New value of exogenized variable is the value of edogenized variable plus delta1.
:type delta1: float.
:param delta2: New value of endogenized variable is the value of exogenized variable plus delta2.
:type delta2: float.
:param rng: Simulation range or a date.
:type rng: list of datetime.
:param reset: Flag to reset model swap dictionary.
:type reset: bool.
"""
from snowdrop.src.utils.util import simulationRange
simulationRange(self)
if reset:
self.mapSwap = None
m = self.__swap__(var1,var2,val1,val2,delta1,delta2,rng)
size = 0
for k in m:
size += len(m[k])
self.total_nmbr_shocks = size
return
def __swap__(self,var1,var2,val1=None,val2=None,delta1=None,delta2=None,rng=None):
"""
Swap variables with their shocks (a.k.a. exogenizes variable var1 and endogenizes variable var2).
Parameters:
:param self: Parameter.
:type self: `Model'.
:param var1: Endogenous variable.
:type var1: str or list of str or dict.
:param var2: Exogenous variable.
:type var2: str or list of str.
:param val1: New value of exogenized variable.
:type val1: float or pandas.Series.
:param val2: New value of endogenized variable.
:type val2: float.
:param delta1: New value of exogenized variable is the value of edogenized variable plus delta1.
:type delta1: float.
:param delta2: New value of endogenized variable is the value of exogenized variable plus delta2.
:type delta2: float.
:param rng: Simulation range or a date.
:type rng: list of datetime.
"""
if isinstance(var1,dict) and isinstance(var2,list):
sz1 = len(var1)
sz2 = len(var2)
if len(var1) != len(var2):
cprint("Length of exogenized/endogenized variables {0} - {1} and {2} - {3} is different... Skipping swap!".format(list(var1),sz1,list(var2),sz2),"red")
return
i = 0
for v1 in var1:
val1 = var1[v1]
v2 = var2[i]
self.__swap__(var1=v1,var2=v2,val1=val1,val2=v2)
i += 1
elif isinstance(val1,pd.Series) and isinstance(var2,str):
self.__swap__(var1=var1,var2=var2,val1=val1.values,rng=val1.index)
elif isinstance(var1,list) and isinstance(var2,list):
sz1 = len(var1)
sz2 = len(var2)
if len(var1) != len(var2) and len(var2) > 1:
cprint("Length of exogenized/endogenized variables {0}:{1} and {2}:{3} is different... Skipping swap!".format(var1,sz1,var2,sz2),"red")
return
for i,x1 in enumerate(var1):
x2 = var2[min(len(var2)-1,i)]
if not val1 is None and isinstance(val1,list) and i<len(val1):
v1 = val1[i]
else:
v1 = None
if not val2 is None and isinstance(val2,list) and i<len(val2):
v2 = val2[i]
else:
v2 = None
if not delta1 is None and isinstance(delta1,list) and i<len(delta1):
d1 = delta1[i]
else:
d1 = None
if not delta2 is None and isinstance(delta2,list) and i<len(delta2):
d2 = delta2[i]
else:
d2 = None
self.__swap__(var1=x1,var2=x2,val1=v1,val2=v2,delta1=d1,delta2=d2,rng=rng)
elif isinstance(var1,str) and isinstance(var2,str) and isinstance(val1,np.ndarray):
for i,v1 in enumerate(val1):
self.__swap__(var1=var1,var2=var2,val1=v1,rng=rng[i])
else:
variables = self.symbols["variables"]
if var1 in variables:
ind0 = variables.index(var1)
else:
if not var1 in lst1:
cprint("Swap: Variable '{}' is not in the list of variables".format(var1),"red")
lst1.append(var1)
labels = self.eqLabels
if var1 in labels:
ind1 = labels.index(var1)
else:
ind1 = None
if not var1 in lst2:
cprint("Swap: Variable '{}' is not in the list of equations labels".format(var1),"red")
lst2.append(var1)
shocks = self.symbols["shocks"]
if var2 in shocks:
ind2 = shocks.index(var2)
else:
ind2 = None
if not var1 in lst3:
cprint("Swap: Variable '{}' is not in the list of shocks".format(var2),"red")
lst3.append(var1)
if not delta1 is None:
val = self.calibration[var2]
val1 = val+delta1
if np.isnan(val1):
return
if not delta2 is None:
val = self.calibration[var1]
val2 = val+delta2
if self.mapSwap is None:
self.mapSwap = {}
if isinstance(rng,dt.datetime):
period = 0; b = False
n_range = len(self.date_range)
if rng == self.date_range[-1]:
b = True
period = n_range-1
else:
for i in range(n_range-1):
x1 = self.date_range[i]
x2 = self.date_range[1+i]
if rng >= x1 and rng < x2:
b = True
period = i
break
if not b:
return
elif isinstance(rng,int):
period = rng
else:
period = rng
if period in self.mapSwap:
if self.mapSwap[period] != [(ind0,ind1,val1,ind2,val2)]:
self.mapSwap[period] += [(ind0,ind1,val1,ind2,val2)]
else:
self.mapSwap[period] = [(ind0,ind1,val1,ind2,val2)]
return self.mapSwap
[docs]
def flip(self,endogVariables,exogVariables,bParams=True):
"""
Flip endogenous variables with their exogenous counterparts.
.. note:: This method is in DEVELOPMENT stage and needs testing.
Parameters:
:param self: instance of Model.
:type self: `Model'.
:param endogVariables: Endogenous variables to be flipped.
:type endogVariables: list.
:param exogVariables: Exogenous (shock) variables or parameters to be flipped.
:type exogVariables: list.
"""
if isinstance(endogVariables,str):
endogVariables = [endogVariables]
if isinstance(exogVariables,str):
exogVariables = [exogVariables]
assert len(endogVariables) == len(exogVariables), f"Number of endogenous variables {len(endogVariables)} and exogenous variables {len(exogVariables)} is different!"
if bParams:
variables_names = self.symbols['endogenous'].copy()
exog_names = self.symbols['parameters'].copy()
else:
variables_names = self.symbols['variables'].copy()
exog_names = self.symbols['shocks'].copy()
nv = len(variables_names)
ns = len(exog_names)
# Treat wildcards - replace astericks with the names of endogenous variables
arr_variables = []; arr1 = []; swapped_endog = []
for x in endogVariables:
if x.endswith("*"):
arr_variables += [v for v in variables_names if v.startswith(x[:-1]) and not v in arr_variables]
swapped_endog.extend(arr_variables)
arr1.extend(arr_variables)
else:
swapped_endog.append(x)
arr1.append(x)
swapped_endog = set(swapped_endog)
# Treat wildcards - replace astericks with the names of exoogenous variables or parameters names
arr_shocks = []; arr2 = []; swapped_exog = []
for x in exogVariables:
if x.endswith("*"):
arr_shocks += [v for v in exog_names if v.startswith(x[:-1]) and not v in arr_shocks]
swapped_exog.extend(arr_shocks)
arr2.extend(arr_shocks)
else:
swapped_exog.append(x)
arr2.append(x)
swapped_exog = set(swapped_exog)
assert len(swapped_endog) == len(swapped_exog), f"Number of swapped endogenous {swapped_endog} and exogenous variables {swapped_exog} have different number of elements and cannot be swapped!"
self.flippedEndogVariables = swapped_endog
self.flippedExogVariables = swapped_exog
# Order of variables in partial derivatives matrix: lead, current, lag variables (i+nv,i,i-nv)
# order of shocks in partial derivatives matrix: lag, current, lead shocks (i-ns,i,i+ns)
# Mapping: i-nv -> i+ns; i -> i; i+nv -> i-ns
if bParams:
self.indexOfFlippedEndogVariables = [i for i,x in enumerate(variables_names) if x in swapped_endog]
self.indexOfFlippedExogVariables = [i for i,x in enumerate(exog_names) if x in swapped_exog]
else:
if self.max_lead_shock == 0 and self.min_lag_shock == 0:
self.indexOfFlippedEndogVariables = [i+nv for i,x in enumerate(variables_names) if x in swapped_endog]
self.indexOfFlippedExogVariables = [i for i,x in enumerate(exog_names) if x in swapped_exog]
elif self.max_lead_shock < 0 and self.min_lag_shock == 0:
self.indexOfFlippedEndogVariables = [i+2*nv for i,x in enumerate(variables_names) if x in swapped_endog] \
+ [i+nv for i,x in enumerate(variables_names) if x in swapped_endog]
self.indexOfFlippedExogVariables = [i for i,x in enumerate(exog_names) if x in swapped_exog] \
+ [i+ns for i,x in enumerate(exog_names) if x in swapped_exog]
elif self.max_lead_shock == 0 and self.min_lag_shock > 0:
self.indexOfFlippedEndogVariables = [i for i,x in enumerate(variables_names) if x in swapped_endog] \
+ [i+nv for i,x in enumerate(variables_names) if x in swapped_endog]
self.indexOfFlippedExogVariables = [i+ns for i,x in enumerate(exog_names) if x in swapped_exog] \
+ [i for i,x in enumerate(exog_names) if x in swapped_exog]
else:
self.indexOfFlippedEndogVariables = [i for i,x in enumerate(variables_names) if x in swapped_endog] \
+ [i+nv for i,x in enumerate(variables_names) if x in swapped_endog] \
+ [i+2*nv for i,x in enumerate(variables_names) if x in swapped_endog]
self.indexOfFlippedExogVariables = [i+2*ns for i,x in enumerate(exog_names) if x in swapped_exog] \
+ [i+ns for i,x in enumerate(exog_names) if x in swapped_exog] \
+ [i for i,x in enumerate(exog_names) if x in swapped_exog]
#cprint(f"Endogenus variable(s)\n{','.join(swapped_endog)} \nwere flipped with exogenous variable(s)\n{','.join(swapped_exog)}\n","blue")
return
def __str__(self):
"""
Represent Model class object as a string.
Parameters:
:param self: Model object.
:type self: `Model'.
"""
from snowdrop.src.utils.util import findVariableLead,findVariableLag
from numpy import zeros
import re
delimiters = " ", ",", ";", "*", "/", ":", "^", "=", "+", "-", "(", ")"
regexPattern = '|'.join(map(re.escape, delimiters))
endog = self.symbols["variables"]
s = u'''
Model:
------
name: "{name}"
file: "{filename}\n'''.format(**self.infos)
b = [True for x in self.calibration['variables'] if np.isnan(x)]
if not any(b):
if self.isLinear:
ss = colored('\nLinear Model\n','blue',attrs=['underline'])
else:
ss = colored('\nNon-Linear Model\n','blue',attrs=['underline'])
else:
ss = ""
meas_eqs = self.symbolic.measurement_equations
if meas_eqs is None:
meas_eqs = []
numberOfNewEqs = self.numberOfNewEqs
ss += '\nTransition Equations:\n---------------------\n\n'
res = self.residuals()
res.update({'definitions': zeros(1)})
equations = {'equations': self.symbolic.equations.copy()}
definitions = self.symbolic.definitions
tmp = []
for deftype in definitions:
tmp.append(deftype + ' = ' + definitions[deftype])
definitions = {'definitions': tmp}
equations.update(definitions)
# for eqgroup, eqlist in self.symbolic.equations.items():
for eqgroup in res.keys():
if eqgroup == 'equations':
eqlist = equations
if eqgroup == 'definitions':
eqlist = equations[eqgroup]
# Update the residuals section with the right number of empty
# values. Note: adding 'zeros' was easiest (rather than empty
# cells), since other variable types have arrays of zeros.
res.update({'definitions': [None for i in range(len(eqlist))]})
else:
eqlist = equations[eqgroup]
#ss += u"{}\n".format(eqgroup)
eqLabels = self.eqLabels
neq = len(eqlist) - numberOfNewEqs
for i, eq in enumerate(eqlist):
if i >= neq:
continue
if not eqLabels is None and i < len(eqLabels):
label = eqLabels[i]
else:
label = str(i+1)
if not bool(label):
label = str(i+1)
val = res[eqgroup][i]
arr = re.split(regexPattern,eq)
arr = list(filter(None,arr))
e = eq
for v in arr:
if v in endog:
ind = e.find(v)
e = e[ind+len(v):].strip()
if e.startswith("(") and ")" in e:
ind = e.find(")")
vv = v + e[:1+ind]
if "_plus_" in vv:
lead = findVariableLead(vv.strip())
ind = vv.index("_plus")
name = vv[:ind] + "(" + str(lead) + ")"
eq = eq.replace(vv,name)
elif "_minus_" in vv:
lag = findVariableLag(vv.strip())
ind = vv.index("_minus")
name = vv[:ind] + "(" + str(lag) + ")"
eq = eq.replace(vv,name)
if self.symbolic.objective_function is not None or val is None or np.isnan(val):
ss += u" {label:4} : {eqn}\n".format(label=label, eqn=eq)
else:
if abs(val) < 1e-8:
val = 0
vals = '{:6.3f}'.format(val)
if abs(val) > 1e-8:
vals = colored(vals, 'red')
else:
vals = colored(vals, 'green')
if not str(label).isdigit():
lbl = str(1+i)
ss += u" {lbl:2}\t{vals:6} : {eqn}\n".format(lbl=lbl,vals=vals,eqn=eq)
else:
ss += u" {label:2}\t{vals:6} : {eqn}\n".format(label=label,vals=vals,eqn=eq)
ss += "\n"
if bool(self.symbolic.measurement_equations):
ss += '\nMeasurement Equations:\n----------------------\n\n'
for i,eq in enumerate(self.symbolic.measurement_equations):
ss += u" {eqn:2} : {eqs}\n".format(eqn=i+1, eqs=eq)
ss += "\n"
if not self.FILTER is None and not self.symbolic.measurement_equations is None:
ss += "Kalman Filter: {Filter}\n".format(Filter=colored(self.FILTER.name,"blue"))
if not self.SMOOTHER is None:
ss += "Kalman Smoother: {Smoother}\n".format(Smoother=colored(self.SMOOTHER.name,"blue"))
else:
ss += "No Kalman Smoother.\n"
if not self.INIT_COND_CORRECTION is None:
ss += "Method to initialize the error covariance matrix: {Prior}.\n".format(Prior=colored(self.INIT_COND_CORRECTION.name,"blue"))
if not self.PRIOR is None:
ss += "Method to initialize the error covariance matrix: {Prior}.\n".format(Prior=colored(self.PRIOR.name,"blue"))
else:
ss += "No transformation of the error covariance matrix.\n"
if not self.INITIAL_CONDITION is None:
ss += "Initial condition of endogenous variables: {initCond}\n".format(initCond=colored(self.INITIAL_CONDITION.name,"blue"))
else:
ss += "Initial values of endogenous variables are set in a model file.\n\n"
print ()
if self.estimate and not self.priors is None:
ss += colored("Prior distribution of parameters:\n","blue")
for k in self.priors:
distr = self.priors[k]['distribution']
params = self.priors[k]['parameters']
init_val = params[0]
lb = params[1]
ub = params[2]
prior_params = str(params[3:])
ss += f" {k} - {distr}, val={init_val}, lb={lb}, ub={ub}, prior parameters={prior_params} \n"
if not self.SAMPLING_ALGORITHM is None:
ss += colored(f"\nSampling algorithm: {self.SAMPLING_ALGORITHM.name}\n","blue")
if bool(self.symbolic.objective_function):
ss += colored("\nObjective Function: \n","blue")
ss += f"func = {self.symbolic.objective_function}\n"
if not self.symbolic.constraints is None:
ss += colored("\nConstraints:\n","blue")
#ss += "-"*12 + "\n"
for c in self.symbolic.constraints:
c = c.replace(".lt.", " < ")
c = c.replace(".le.", " <= ")
c = c.replace(".gt.", " > ")
c = c.replace(".ge.", " >= ")
ss += f" {c}\n"
s += ss
return s
def __repr__(self):
"""
Official representation of Model class object.
Parameters:
:param self: Model object.
:type self: `Model'.
"""
return self.__str__()
def _repr_html_(self):
"""
HTML representation of Model class object.
Parameters:
:param self: Model object.
:type self: `Model'.
"""
from snowdrop.src.misc.latex import eq2tex
# general informations
infos = self.infos
table_infos = """
<table>
<td><b>Model</b></td>
<tr>
<td>name</td>
<td>{name}</td>
</tr>
<tr>
<td>filename</td>
<td>{filename}</td>
</tr>
</table>""".format(name=infos['name'],filename=infos['filename'].replace("<","<").replace(">",">"))
# Equations and residuals
resids = self.residuals()
equations = {"equations": self.symbolic.equations.copy()}
# Create definitions equations and append to equations dictionary
definitions = self.symbolic.definitions
tmp = []
for deftype in definitions:
tmp.append(deftype + ' = ' + definitions[deftype])
definitions = {'definitions': tmp}
equations.update(definitions)
variables = self.symbols["variables"]
table = "<tr><td><b>Type</b></td><td><b>Equation</b></td><td><b>Residual</b></td></tr>\n"
for eq_type in equations:
eq_lines = []
for i in range(len(equations[eq_type])):
eq = equations[eq_type][i]
val = resids[eq_type][i]
if abs(val) > 1e-8:
vals = '<span style="color: red;">{:.4f}</span>'.format(val)
else:
vals = '{:.3f}'.format(val)
if '|' in eq:
# keep only lhs for now
eq, comp = str.split(eq,'|')
lat = eq2tex(variables, eq)
lat = '${}$'.format(lat)
line = [lat, vals]
h = eq_type if i==0 else ''
fmt_line = '<tr><td>{}</td><td>{}</td><td>{}</td></tr>'.format(h, *line)
# print(fmt_line)
eq_lines.append(fmt_line)
table += str.join("\n", eq_lines)
table = "<table>{}</table>".format(table)
return table_infos + table
[docs]
def residuals(self):
"""
Compute residuals of model equations.
Parameters:
:param self: Model object.
:type self: `Model'.
"""
from snowdrop.src.numeric.solver.util import getParameters
y = self.calibration['variables']
p = getParameters(model=self)
e = self.calibration['shocks']
if np.ndim(e) == 2:
e = e[-2]
if self.autodiff or self.jaxdiff:
func = self.functions['func']
n = 1+self.max_lead_shock-self.min_lag_shock
if n > 1:
e = np.repeat(e,n)
x = np.concatenate([y,y,y,e])
bHasAttr = hasattr(func,'py_func')
if bHasAttr:
res = func.py_func(x,p)
else:
res = func(x,p)
else:
func = self.functions['f_static']
res = func(y,p,e)
return {'equations': res}
[docs]
def setStartingValues(self,hist,skip_rows=0,bTreatMissingObs=True,debug=False):
"""
Set starting values for current and lagged endogenous variables.
Parameters:
:param self: Model object.
:type self: `Model'.
:param hist: Path to historical data file or dictionary.
:type hist: str or dict.
:param skip_rows: The number of rows to skip.
:type skip_rows: int.
:param bTreatMissingObs: If True find missing variables values by minimizing residuals of steady state equations given the observations.
:type bTreatMissingObs: bool.
:param debug: If True print missing variables information.
:type debug: bool.
"""
global itr
from snowdrop.src.model.util import getStartingValues
var_values = self.calibration["variables"]
var_names = self.symbols["variables"]
var_values,calib,missing = getStartingValues(hist=hist,var_names=var_names,orig_var_values=var_values,options=self.options,skip_rows=skip_rows,debug=debug)
if bTreatMissingObs and len(missing)>0:
from snowdrop.src.numeric.filters.utils import getMissingInitialVariables
var = [x for i,x in enumerate(var_names) if not "_plus_" in x and not "_minus_" in x]
ind1 = [i for i,x in enumerate(var_names) if not "_plus_" in x and not "_minus_" in x]
x0 = [var_values[i] for i,x in enumerate(var_names) if not "_plus_" in x and not "_minus_" in x]
ind2 = [i for i,x in enumerate(var) if not x in missing]
values = getMissingInitialVariables(self,ind=ind2,x0=x0)
var_values[ind1] = values
self.calibration["variables"] = var_values
#x = dict(zip(self.symbols["variables"],var_values ))
return
[docs]
def setShocks(self,d,start=None,reset=False):
"""
Set shocks values given the time of their appearance.
Args:
model : self
self object.
d : dict
Map of shock name and shock values.
start : datetime.
Start date of simulations. Default is None.
Returns:
None.
"""
shock_names = self.symbols["shocks"]
if reset:
shock_values = np.zeros(len(shock_names))
else:
if "shock_values" in self.options:
shock_values = self.options["shock_values"]
else:
shock_values = self.calibration["shocks"]
calib,startingDate,interval = setValues(model=self,d=d,names=shock_names,values=shock_values,start=start,isShock=True)
self.calibration["shocks"] = calib.T
self.options["shock_values"] = calib.T
n_shk,n_t = calib.shape
return
[docs]
def setParameters(self,d,start=None):
"""
Set parameters values given the time of their appearance.
Args:
self : Model
self object.
d : dict
Map of parameters name and parameters values.
start : datetime.
Start date of simulations. Default is None.
Returns:
None.
"""
param_names = self.symbols["parameters"]
param_values = self.calibration["parameters"]
calib,_,_ = setValues(model=self,d=d,names=param_names,values=param_values,start=start,isShock=False)
self.calibration["parameters"] = calib
[docs]
def setCalibration(self,param_name,param_value):
"""
Set calibration dictionary values given the time of their appearance.
Args:
self : Model
self object.
param_name : str
Parameter name.
param_value : numeric.
Parameter value.
Returns:
None.
"""
param_names = self.symbols["parameters"]
param_values = self.calibration["parameters"].copy()
ind = param_names.index(param_name)
value = param_values[ind]
if np.isscalar(value):
param_values[ind] = param_value
else:
param_values[ind] = np.zeros(len(value)) + param_value
self.calibration["parameters"] = param_values
### End of Model definition
if __name__ == '__main__':
"""Main entry point."""
import os
from snowdrop.src.model.factory import import_model
path = os.path.dirname(os.path.abspath(__file__))
fname = "../../models/Toy/JLMP98.yaml"
fpath = os.path.abspath(os.path.join(path,fname))
interface = import_model(fpath, return_interface=True)
infos = {
'name' : 'JLMP98',
'filename' : fpath
}
model = Model(interface, infos=infos)
print(model)
model.set_calibration(g=0.5)
# model.setCalibration('g', 0.5)
# model.set_calibration({'g': 0.5})
# model.set_calibration({'g': '0.5'})
# model.set_calibration(g=0.5)
# model.setParameters(dict) - set time changing parameters
p = model.calibration['parameters']
print('Parameters:')
print(p)
y = model.calibration['variables']
print('Variables values:')
print(y)