Source code for snowdrop.src.numeric.solver.linear_solver

""" 
The linear solver. It uses direct and iterative solvers that are 
applicable to equations with maximum of one lead and one lag variables.

.. note::
    This solver was designed to solve system of linear equations with shocks.
    To include exogenous processes in addition, please use Villemot solver.
""" 

from __future__ import division

import numpy as np
from time import time
import scipy.linalg as la
from snowdrop.src.model.settings import SolverMethod
from snowdrop.src.numeric.solver.util import getAllShocks
from snowdrop.src.numeric.solver.util import getParameters
from snowdrop.src.numeric.solver.Klein import solve_klein
from snowdrop.src.numeric.solver.Villemot import solve_villemot
from snowdrop.src.numeric.solver.AndersonMoore import solve_am
from snowdrop.src.numeric.solver.BinderPesaran import solve_quadratic_determinantal_equation

it = 0

[docs] def solve(model,p=None,steady_state=None,suppress_warnings=False): """ Select solver. This is a convenience method to choose model equations solver. """ global it if steady_state is None: n = len(model.symbols['variables']) steady_state = np.zeros(n) if not model.solved: if model.SOLVER.value == SolverMethod.Villemot.value: model = solve_villemot(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) elif model.SOLVER.value == SolverMethod.Klein.value: model = solve_klein(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) elif model.SOLVER.value == SolverMethod.AndersonMoore.value: model = solve_am(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) elif model.SOLVER.value == SolverMethod.BinderPesaran.value: model = solve_quadratic_determinantal_equation(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) else: model = solve_villemot(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) # if it == 0: # it += 1 # from snowdrop.src.misc.termcolor import cprint # cprint("Linear models solver was not defined: using Binder and Pesaran solver ...","green") # model = solve_quadratic_determinantal_equation(model=model,steady_state=steady_state,p=p,suppress_warnings=suppress_warnings) model.solved = True return model
[docs] def simulate(model,T,periods,y0,steady_state,parameters=None,Npaths=1): """ Find first-order accuracy approximation solution. For details on an algorithm to select anticipated and un-anticipated shocks to bring the level of endogenous variables to the desired path please see: https://michalandrle.weebly.com/uploads/1/3/9/2/13921270/iris_simulate.pdf Parameters: :param model: Model object. :type model: `Model'. :param T: Time span. :type T: int. :param periods: Simulation periods. :type periods: numpy.ndarray. :param y0: Starting values of endogenous variables. :type y0: numpy.ndarray :param steady_state: Steady state values of endogenous variables. :type steady_state: numpy.ndarray :param parameters: Values of parameters. :type parameters: list. :param Npaths: Number of simulation paths. This is the number of paths of stochastic shocks. :type Npaths: int. :returns: Numerical solution. """ from snowdrop.src.numeric.solver.tunes import hasImaginaryShocks,hasImaginaryValues,forecast_with_tunes t0 = time() n = len(y0) M = None shock_var = model.symbols['shocks'] n_shocks = len(shock_var) parameters = getParameters(parameters=parameters,model=model) all_shocks = getAllShocks(model,periods,n_shocks,Npaths,T) #print(all_shocks) # Solve linear model at steady state solve(model=model,p=parameters,steady_state=np.zeros(n)) # State transition matrix F = model.linear_model["A"] # Array of constants C = model.linear_model["C"] # Matrix of coefficients of shocks R = model.linear_model["R"] U = model.linear_model.get("U",None) Z = model.linear_model["Z"] N = len(F) # Auxiliary matrices if model.SOLVER.value in [SolverMethod.Klein.value,SolverMethod.Villemot.value]: Xa = -model.linear_model["Xa"] Ru = model.linear_model["Ru"] J = model.linear_model["J"] M = [np.copy(R)] M.append(U @ Xa @ Ru) temp = J @ Ru for t in range(T+2): M.append(U @ Xa @ temp) temp = J @ temp elif model.SOLVER.value in [SolverMethod.BinderPesaran.value,SolverMethod.AndersonMoore.value]: M = [R] if not Z is None: temp = np.copy(R) for t in range(T+2): temp = Z @ temp M.append(temp) lst_exog = [] # Compute contribution of exogenous terms exogenous = model.symbols.get("exogenous",[]) if bool(exogenous) and not Z is None: from snowdrop.src.numeric.solver.util import getExogenousData from snowdrop.src.utils.equations import getExogMatrix eqs = model.symbolic.equations exog = model.symbols["exogenous"] W = getExogMatrix(eqs,exog) for t in range(T+2): exo = getExogenousData(model,1+t) x = Z @ W @ np.array(exo) lst_exog.append(x) ### Find solution. y = np.zeros((T+2,N)) for path in range(Npaths): yIter = [] shocks = np.array(all_shocks[path]) temp = np.copy(y0) y[0] = temp ### Correct solution for a 'fixed path' of endogenous variables. imaginary_shocks = hasImaginaryShocks(shocks) if bool(model.mapSwap): imaginary_values = hasImaginaryValues(model.mapSwap) else: imaginary_values = False if bool(model.mapSwap) or bool(model.condShocks) or imaginary_shocks or imaginary_values: y,adjusted_shocks,model = forecast_with_tunes(model=model,Nperiods=T,y=y,T=F,Re=M,C=C,shocks=shocks, has_imaginary_shocks=imaginary_shocks,has_imaginary_values=imaginary_values) # Add padding to shocks since we removed shocks at t=0. This is the starting values of shocks and can be disregarded. new_shocks = np.zeros((len(adjusted_shocks)+1,n_shocks)) new_shocks[1:] = adjusted_shocks model.calibration["adjusted_shocks"] = new_shocks else: for t in range(T+1): # Solution for un-anticipated shock. y[t+1] = F @ y[t] + C + R @ shocks[t] if bool(lst_exog): y[t+1] += lst_exog[t] # These are extra terms that arise when future shocks are anticipated. if model.anticipate: for j in range(t+1,T+1): y[t+1] += M[j-t] @ shocks[j] sol = y[:,:n] y = sol model.y = y; yIter.append(np.copy(y)) elapsed = time() - t0 count = 1 max_f = 0 return (count,y,yIter,max_f,elapsed)
[docs] def find_steady_state(model,method="iterative",debug=False): """ Find steady state solution. Parameters: :param model: Model object. :type model: `Model'. :param method: Find steady state solution by iterations, by minimizing square of equations error, etc... :type model: str. :returns: Array of endogenous variables steady states and their growth. """ #from snowdrop.src.numeric.solver.util import getStableUnstableRowsColumns from snowdrop.src.numeric.solver.util import getStableUnstableRowsColumns var = model.symbols["variables"] n = len(var) I = np.eye(n) steady_state = np.zeros(n) growth = np.zeros(n) if not "A" in model.linear_model: solve(model) # State transition matrix T = model.linear_model["A"][:n,:n] # Array of constants K = model.linear_model["C"][:n] ev,ew = la.eig(T) nev = sum(abs(e) > 1 for e in ev) rowStable,colStable,rowUnstable,colUnstable = getStableUnstableRowsColumns(model) n_stable = len(colStable) if nev == 0: # Model is stationary... # So, apply direct method to find steady state solution. steady_state = la.pinv(I-T) @ K else: # stable = [True]*n # for i in range(n): # for j in range(n): # if abs(ev[j]) > 1 and abs(ew[i,j] > 1.e-12): # stable[i] = False # break if method=="iterative": z = zprev = np.zeros(n) for j in range(1000): zn = T @ z + K #Stop if growth of endogenous variables is constant. if abs(la.norm(zn-2*z+zprev)) < 1.e-15: break zprev = np.copy(z) z = np.copy(zn) growth = z - zprev growth = [growth[i] if abs(growth[i])>1.e-10 else 0 for i in range(n)] steady_state = [zn[i] if growth[i]==0 else 0 for i in range(n)] steady_state = np.round(steady_state,10) elif method=="root": from scipy.optimize import root T_stable = T[np.ix_(rowStable,colStable)] # Define objective function. def fobj(x): y = np.zeros(n) y[rowStable] = x func = f_dynamic(np.concatenate([y,y,y,e]),p,order=0) return func[rowStable] # Define jacobian. def fjac(x): # Jacobian is constant for linear models. return T_stable f_dynamic = model.functions["f_dynamic"] x0 = np.zeros(len(colStable)) p = getParameters(model=model) e = np.zeros(n) sol = root(fobj,x0,method='lm',tol=1e-12,options={"maxiter":1000}) print(f"Number of function evaluations: {sol.nfev}") steady_state[rowStable] = -sol.x growth = (T-I)@steady_state + K else: T_stable = T[np.ix_(rowStable,colStable)] K_stable = K[rowStable] sol = la.pinv(np.eye(n_stable)-T_stable) @ K_stable steady_state[rowStable] = sol growth = (T-I)@steady_state + K # # Write the steady-state equations at two different times: t and t+d. # d = 10 # Ta = T[np.ix_(rowStable,colStable)] # Ka = K[rowStable] # tmp1 = np.concatenate((np.eye(n_stable), np.zeros((n_stable,n_stable))),axis=1) # tmp2 = np.concatenate((np.eye(n_stable), d*np.eye(n_stable)),axis=1) # E1 = np.concatenate((tmp1,tmp2),axis=0) # tmp1 = np.concatenate((Ta, -Ta),axis=1) # tmp2 = np.concatenate((Ta, (d-1)*Ta),axis=1) # E2 = np.concatenate((tmp1,tmp2),axis=0) # tmp = np.concatenate((Ka,Ka),axis=0) # dx = la.pinv(E1-E2) @ tmp # growth[rowUnstable] = T[np.ix_(rowUnstable,colStable)] @ dx[:n_stable] if debug: if model.count == -1: print('Steady-State Solution:') sv = [var[i]+"="+str(round(steady_state[i],4)) for i in colStable if not "_minus_" in var[i] and not "_plus_" in var[i]] sv.sort() print(sv) print() for k,g,ss in zip(var,growth,steady_state): if k.endswith("_GAP"): print(f"{k}: \t{g:.2f} , \t{ss:.2f}") # Sanity check for i,v in enumerate(var): if "_minus_" in v: ind = v.index("_minus_") var_name = v[:ind] elif "_plus_" in v: ind = v.index("_plus_") var_name = v[:ind] else: var_name = None if var_name in var: k = var.index(var_name) steady_state[i] = steady_state[k] return steady_state, growth