Source code for snowdrop.src.numeric.filters.utils

"""Compute initial error covariance matrices Pstar and Pinf.

The initial values of error covariance matrix P are assumed diffusive.
During the first d diffusive periods matrix P is decomposed into Pstar and Pinf
and separate equations for these matrices are applied.
After the d diffusive periods the standard Kalman filter is used and only one error covariance
matrix P is used.  The initial condition for P(d) is Pstar(d).
 
Translated from Dynare Matlab code to Python by A.G.
"""

import numpy as np
from scipy import linalg as la
#from warnings import warn 
itr = 0

[docs] def sorter(x): """Sort eigen values.""" return abs(x)>=1
[docs] def compute_Pinf_Pstar(T,R,Q,N=0,n_shocks=-1,mf=None,qz_criterium=1.000001): """ Compute of Pinf and Pstar for Durbin and Koopman diffuse filter. Kitagawa transformation of state space system with a quasi-triangular transition matrix with unit roots at the top, but excluding zero columns of the transition matrix. For references please see G. Kitagawa, "An algorithm for solving the matrix equation", International Journal of Control, Volume 25, 1977 - Issue 5 The transformed state space is y = [ss z x], were: s: static variables (zero columns of T) z: unit roots x: stable roots ss: s - z = stationarized static variables Args: T: double matrix of transition equations R: double matrix of structural shock effects Q: double matrix of covariance of structural shocks N: int shocks maximum lead number minus minimum lag number n_shocks: int number pf shocks mf: integer vector of indices of observed variables in state vector qz_criterium: double numerical criterium for unit roots Returns: Pinf: double matrix of covariance initialization for nonstationary part Pstar: double matrix of covariance of stationary part Algorithm: Real Schur transformation of transition equation .. note:: Translated from Dynare version 4.5.1 to Python by AG """ fn,_ = R.shape nplus,_ = T.shape ST,QT,sdim = la.schur(T,output='real',sort = lambda x: abs(x) > 2-qz_criterium) #ST,QT = la.schur(T,output='complex') # Check correctness of Schur decomposition err1 = la.norm(QT@ST@QT.T - T) / la.norm(T) if err1 > 1.e-10: raise Exception("compute_Pinf_Pstar: Schur decomposition error. \n Inconsistency of T matrix decomposition of {0}".format(round(err1,4))) # Re-arrange matrices to make unstable generalized eigenvalues appear in the upper left corner of matrices: s,t # from snowdrop.src.utils.sortSchur import sort_schur_decomposition # QT,ST,ap = sort_schur_decomposition(Q=QT,R=ST,z=2-qz_criterium,b=0) # Check correctness of Schur decomposition # err2 = la.norm(QT@ST@QT.T - T) / la.norm(T) # if err2 > 1.e-10: # raise Exception("compute_Pinf_Pstar: Schur decomposition error. \n Inconsistency of T matrix decomposition of {0}".format(round(err2,4))) nk = sum(abs(np.diag(ST))>2-qz_criterium) nk1 = nk+1 Pstar = np.zeros((nplus,nplus)) if n_shocks == -1: R1 = QT.T @ R B = np.real(R1 @ Q @ R1.T) else: B = 0 for i in range(1+N): R1 = QT.T @ R[:,i*n_shocks:(1+i)*n_shocks] B += np.real(R1 @ Q @ R1.T) i = nplus-1 while i >= nk1: if ST[i,i-1] == 0: if i == nplus-1: c = np.zeros(nplus-nk) else: c = ST[nk:i+1,:]@(Pstar[:,i:]@ST[i,i:].T) \ + ST[i,i]*ST[nk:i+1,i:]@Pstar[i:,i] qq = np.eye(i+1-nk)-ST[nk:i+1,nk:i+1]*ST[i,i] Pstar[nk:i+1,i] = la.solve(qq, B[nk:i+1,i]+c) Pstar[i,nk:i] = Pstar[nk:i,i].T i -= 1 else: if i == nplus-1: c = np.zeros(nplus-nk) c1 = np.zeros(nplus-nk) else: c = ST[nk:i+1,:]@(Pstar[:,i:]@ST[i,i:].T) \ + ST[i,i]*ST[nk:i+1,i:]@Pstar[i:,i] \ + ST[i,i-1]*ST[nk:i+1,i:]@Pstar[i:,i-1] c1 = ST[nk:i+1,:]@(Pstar[:,i:]@ST[i-1,i:].T) \ + ST[i-1,i-1]*ST[nk:i+1,i:]@Pstar[i:,i-1] \ + ST[i-1,i]*ST[nk:i+1,i:]@Pstar[i:,i] t11 = np.eye(i-nk+1)-ST[nk:i+1,nk:i+1]*ST[i,i] t12 = -ST[nk:i+1,nk:i+1]*ST[i,i-1] t1 = np.concatenate((t11,t12),axis=1) t21 = -ST[nk:i+1,nk:i+1]*ST[i-1,i] t22 = np.eye(i-nk+1)-ST[nk:i+1,nk:i+1]*ST[i-1,i-1] t2 = np.concatenate((t21,t22),axis=1) qq = np.concatenate((t1,t2), axis=0) t = np.concatenate((B[nk:i+1,i]+c, B[nk:i+1,i-1]+c1),axis=0) z = la.solve(qq, t) Pstar[nk:i+1,i] = z[:i-nk+1] Pstar[nk:i+1,i-1] = z[i-nk+1:] Pstar[i,nk:i] = Pstar[nk:i,i].T Pstar[i-1,nk:i-1] = Pstar[nk:i-1,i-1].T i -= 2 if i == nk: c = ST[nk,:]@(Pstar[:,nk1:]@ST[nk,nk1:].T)+ST[nk,nk]*ST[nk,nk1:]@Pstar[nk1:,nk] Pstar[nk,nk]=(B[nk,nk]+c)/(1-ST[nk,nk]*ST[nk,nk]) # stochastic trends with no influence on observed variables are arbitrarily initialized to zero Pinf = np.zeros((nplus,nplus)) Pinf[:nk,:nk] = np.eye(nk) if not mf is None: for k in range(nk): if la.norm(QT[mf,:]@ST[:,k]) < 1e-8: Pinf[k,k] = 0 P1inf = QT@Pinf@QT.T P1star = QT@Pstar@QT.T return P1inf,P1star
[docs] def getTimeShift(model): """Get time shift between start of simulation and start of filtration.""" import pandas as pd from pandas.tseries.offsets import DateOffset import datetime as dt from snowdrop.src.utils.util import getDate n = -1 simulation_range = model.options['range'] start = getDate(simulation_range[0]) filter_range = model.options['filter_range'] end = getDate(filter_range[0]) frequency = model.options['frequency'] if start == end: # Starting simulation and filtration ranges are the same... Skip n = 0 else: if frequency == 0: freq = DateOffset(months=12) elif frequency == 1: freq = DateOffset(months=3) elif frequency == 2: freq = DateOffset(months=1) elif frequency == 3: freq = DateOffset(weeks=1) elif frequency == 4: freq = DateOffset(days=1) rng = pd.date_range(start,end,freq=freq) n = len(rng) return n-1
[docs] def getSteadyStateCovarianceMatrix(T,R,Qm,Hm,Z,n,Nd,n_shocks): r"""Solve Lyapunov equation for stable part of error covariance matrix. Predict: .. math:: P_{t|t-1} = T_{t} * P_{t-1|t-1} * F'_{t} + Q_{t} For details, see https://en.wikipedia.org/wiki/Kalman_filter Args: T: is the state-transition matrix, R: is the shock matrix, Qm: is the covariance matrix of state variables (endogenous variables), Hm: is the covariance matrix of space variables (measurement variables) Z: is the observation matrix, n: is the number of endogenous variables, Nd: is the shocks maximum lead number minus minimum lag number, n_shocks: is the number of shocks, iterate: is the boolean variable. If True then iterative method is used, otherwise Lyapunov equation solver is applied. For details, see https://en.wikipedia.org/wiki/Kalman_filter """ from scipy import linalg as la ITER = 1000 EPS = 1.e-4 n = len(T) I = np.eye(n) Q = np.real(R @ Qm @ R.T) P = np.copy(Q) prev = np.copy(P) for i in range(ITER): P = T @ P @ T.T + Q # predicted error covariance F = Hm + Z @ P @ Z.T # pre-fit residual covariance iF = la.pinv(F) # matrix inverse K = P @ Z.T @ iF # optimal Kalman gain P = (I-K@Z)@P # update (a posteriori) estimate covariance. This formula is valid for normal distribution. err = la.norm(P-prev)/la.norm(P) prev = np.copy(P) if err < EPS: print(f"Iter = {i}, error = {err:.3e}") break return P
[docs] def getMissingInitialVariables(model,ind,x0): """ Return initial values of endogenous variables satisfying model equations. Parameters: :param model: Model object. :type model: `Model'. :param ind: Indices of variables with fixed values. :type ind: list. :param x0: Initial guess. :type x0: numpy ndarray. """ from scipy.optimize import root from snowdrop.src.numeric.solver.util import getParameters from snowdrop.src.numeric.solver.util import getExogenousData from snowdrop.src.misc.termcolor import cprint #global TOLERANCE, NITERATIONS TOLERANCE = 1.e-6; NITERATIONS = 1000 f_static = model.functions["f_steady"] bHasAttrStatic = hasattr(f_static,"py_func") # Define objective function def fobj(x): global itr itr += 1 x[ind] = z[ind] try: y = np.concatenate([x,e]) if bHasAttrStatic: func = f_static.py_func(y,p,exog) else: func = f_static(y,p,exog) except ValueError: func = np.zeros(len(x)) + 1.e10 + itr return func p = getParameters(model=model) exog = getExogenousData(model,t=0) n_shk = len(model.symbols['shocks']) e = np.zeros(n_shk) z = np.copy(x0) sol = root(fobj,x0=z,method='lm',tol=TOLERANCE,options={"maxiter":NITERATIONS}) x = sol.x if not sol.success: ind_min = sol.fun.argmin() ind_max = sol.fun.argmax() ind = ind_min if abs(sol.fun[ind_min]) > abs(sol.fun[ind_max]) else ind_max err = la.norm(sol.fun) cprint(f"filters.itils.getInitialVariables:\n Root solver failed: Number of iterations {itr}, Error {err:.3e}","yellow") cprint(f" The following equation has the largest residual of {sol.fun[ind]:.2e},","yellow") cprint(" "+model.symbolic.equations[ind],"yellow") print() return x
[docs] def getCovarianceMatrix(Qm,shocks,equations,n): """ Return error covariance matrix of endogenous variables. Parameters ---------- Qm : numpy 2D ndarray. Error covariance matrix. shocks : list. Shocks names. equations : list. Measuremen equations. n : int. Nimber of variables. Returns ------- Q : numpy 2D array. Full error covariance matrix of endogenous variables. """ import re delimiters = " ", ",", ";", "*", "/", ":", "+", "-", "^", "{", "}", "(", ")", "=" regexPattern = '|'.join(map(re.escape, delimiters)) Q = 1.e-10*np.eye(n) for i,eq in enumerate(equations): arr = re.split(regexPattern,eq) arr = list(filter(None,arr)) for v in arr: if v in shocks: ind = shocks.index(v) Q[i,i] = Qm[ind,ind] return Q
[docs] def getEndogVarInMeasEqs(variables, measurement_equations): """ Retrun list of endogenous variables that are present in measurement equations. Parameters ---------- variables : list. Names of variables. measurement_equations : list. Measurement equations. Returns ------- ind_var : list. Indices of endogenous variables. var_meas : list. Names of endogenous variables. """ import re # Get list of endogenous variables present in measurement equations delimiters = " ", ",", ";", "*", "/", ":", "+", "-", "^", "{", "}", "(", ")", "=" regexPattern = '|'.join(map(re.escape, delimiters)) var_meas = [] for eq in measurement_equations: arr = re.split(regexPattern,eq) arr = list(filter(None,arr)) for v in arr: if v in variables and not v in var_meas: var_meas.append(v) ind_var = [i for i,v in enumerate(variables) if v in var_meas] return ind_var,var_meas
if __name__ == "__main__": """Main entry point.""" from mat4py import loadmat import os path = os.path.dirname(os.path.abspath(__file__)) working_dir = os.path.abspath(os.path.join(os.path.abspath(path + "../../../.."))) data = loadmat(working_dir + "/data/toy/data.mat") mf = np.array(data['mf']) mf = np.squeeze(mf) - 1 T = np.array(data['T']) R = np.array(data['R']) Q = np.array(data['Q']) Pstar = np.array(data['Pstar']) Pinf = np.array(data['Pinf']) P1inf,P1star = compute_Pinf_Pstar(mf=mf,T=T,R=R,Q=Q) # Test of P1star, P1inf calculations #mf = [1, 2] # T = np.array( [[ 0, 0, 0, 0.75, 0.9, 1, 0], # [0, 0, 0, 0.75, 0.9, 1, 0], # [0, 0, 0, 0.333, 0, 0, 1], # [0, 0, 0, 0.75, 0, 0, 0], # [0, 0, 0, 0, 0.9, 0, 0], # [0, 0, 0, 0, 0.9, 1, 0], # [0, 0, 0, 0.333, 0, 0, 1]]) # R = np.array( [[ 1, 1, 1, 0, 0, 0], # [1, 1, 1, 0, 1, 0], # [0, 0, 0.444, 1.333, 0, 1], # [0, 0, 1, 0, 0, 0], # [0, 1, 0, 0, 0, 0], # [1, 1, 0, 0, 0, 0], # [0, 0, 0.444, 1.333, 0, 0]]) # Q = np.array([[ 1, 0, 0, 0, 0, 0], # [0, 1, 0, 0, 0, 0], # [0, 0, 1, 0, 0, 0], # [0, 0, 0, 1, 0, 0], # [0, 0, 0, 0, 0, 0], # [0, 0, 0, 0, 0, 0]]) # static_rows = [0] print('Pinf, Pstar difference:') print(np.max(abs(P1star-Pstar))) print(np.max(abs(P1inf-Pinf))) # print() # print("P1star:") # print(np.round(P1star,4)) # print() # print("P1inf:") # print(np.round(P1inf,4))