Source code for statsmodels.tsa.varma_process

# -*- coding: utf-8 -*-
""" Helper and filter functions for VAR and VARMA, and basic VAR class

Created on Mon Jan 11 11:04:23 2010
Author: josef-pktd
License: BSD

This is a new version, I didn't look at the old version again, but similar
ideas.

not copied/cleaned yet:
 * fftn based filtering, creating samples with fft
 * Tests: I ran examples but did not convert them to tests
   examples look good for parameter estimate and forecast, and filter functions

main TODOs:
* result statistics
* see whether Bayesian dummy observation can be included without changing
  the single call to linalg.lstsq
* impulse response function does not treat correlation, see Hamilton and jplv

Extensions
* constraints, Bayesian priors/penalization
* Error Correction Form and Cointegration
* Factor Models Stock-Watson,  ???


see also VAR section in Notes.txt

"""
from __future__ import print_function
import numpy as np
from numpy.testing import assert_equal
from scipy import signal
#might not (yet) need the following
from scipy.signal.signaltools import _centered as trim_centered

from statsmodels.tsa.tsatools import lagmat


[docs]def varfilter(x, a): '''apply an autoregressive filter to a series x Warning: I just found out that convolve doesn't work as I thought, this likely doesn't work correctly for nvars>3 x can be 2d, a can be 1d, 2d, or 3d Parameters ---------- x : array_like data array, 1d or 2d, if 2d then observations in rows a : array_like autoregressive filter coefficients, ar lag polynomial see Notes Returns ------- y : ndarray, 2d filtered array, number of columns determined by x and a Notes ----- In general form this uses the linear filter :: y = a(L)x where x : nobs, nvars a : nlags, nvars, npoly Depending on the shape and dimension of a this uses different Lag polynomial arrays case 1 : a is 1d or (nlags,1) one lag polynomial is applied to all variables (columns of x) case 2 : a is 2d, (nlags, nvars) each series is independently filtered with its own lag polynomial, uses loop over nvar case 3 : a is 3d, (nlags, nvars, npoly) the ith column of the output array is given by the linear filter defined by the 2d array a[:,:,i], i.e. :: y[:,i] = a(.,.,i)(L) * x y[t,i] = sum_p sum_j a(p,j,i)*x(t-p,j) for p = 0,...nlags-1, j = 0,...nvars-1, for all t >= nlags Note: maybe convert to axis=1, Not TODO: initial conditions ''' x = np.asarray(x) a = np.asarray(a) if x.ndim == 1: x = x[:,None] if x.ndim > 2: raise ValueError('x array has to be 1d or 2d') nvar = x.shape[1] nlags = a.shape[0] ntrim = nlags//2 # for x is 2d with ncols >1 if a.ndim == 1: # case: identical ar filter (lag polynomial) return signal.convolve(x, a[:,None], mode='valid') # alternative: #return signal.lfilter(a,[1],x.astype(float),axis=0) elif a.ndim == 2: if min(a.shape) == 1: # case: identical ar filter (lag polynomial) return signal.convolve(x, a, mode='valid') # case: independent ar #(a bit like recserar in gauss, but no x yet) #(no, reserar is inverse filter) result = np.zeros((x.shape[0]-nlags+1, nvar)) for i in range(nvar): # could also use np.convolve, but easier for swiching to fft result[:,i] = signal.convolve(x[:,i], a[:,i], mode='valid') return result elif a.ndim == 3: # case: vector autoregressive with lag matrices # #not necessary: # if np.any(a.shape[1:] != nvar): # raise ValueError('if 3d shape of a has to be (nobs,nvar,nvar)') yf = signal.convolve(x[:,:,None], a) yvalid = yf[ntrim:-ntrim, yf.shape[1]//2,:] return yvalid
[docs]def varinversefilter(ar, nobs, version=1): '''creates inverse ar filter (MA representation) recursively The VAR lag polynomial is defined by :: ar(L) y_t = u_t or y_t = -ar_{-1}(L) y_{t-1} + u_t the returned lagpolynomial is arinv(L)=ar^{-1}(L) in :: y_t = arinv(L) u_t Parameters ---------- ar : array, (nlags,nvars,nvars) matrix lagpolynomial, currently no exog first row should be identity Returns ------- arinv : array, (nobs,nvars,nvars) Notes ----- ''' nlags, nvars, nvarsex = ar.shape if nvars != nvarsex: print('exogenous variables not implemented not tested') arinv = np.zeros((nobs+1, nvarsex, nvars)) arinv[0,:,:] = ar[0] arinv[1:nlags,:,:] = -ar[1:] if version == 1: for i in range(2,nobs+1): tmp = np.zeros((nvars,nvars)) for p in range(1,nlags): tmp += np.dot(-ar[p],arinv[i-p,:,:]) arinv[i,:,:] = tmp if version == 0: for i in range(nlags+1,nobs+1): print(ar[1:].shape, arinv[i-1:i-nlags:-1,:,:].shape) #arinv[i,:,:] = np.dot(-ar[1:],arinv[i-1:i-nlags:-1,:,:]) #print(np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1])).shape #arinv[i,:,:] = np.tensordot(-ar[1:],arinv[i-1:i-nlags:-1,:,:],axes=([2],[1])) raise NotImplementedError('waiting for generalized ufuncs or something') return arinv
[docs]def vargenerate(ar, u, initvalues=None): '''generate an VAR process with errors u similar to gauss uses loop Parameters ---------- ar : array (nlags,nvars,nvars) matrix lagpolynomial u : array (nobs,nvars) exogenous variable, error term for VAR Returns ------- sar : array (1+nobs,nvars) sample of var process, inverse filtered u does not trim initial condition y_0 = 0 Examples -------- # generate random sample of VAR nobs, nvars = 10, 2 u = numpy.random.randn(nobs,nvars) a21 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.8, 0. ], [ 0., -0.6]]]) vargenerate(a21,u) # Impulse Response to an initial shock to the first variable imp = np.zeros((nobs, nvars)) imp[0,0] = 1 vargenerate(a21,imp) ''' nlags, nvars, nvarsex = ar.shape nlagsm1 = nlags - 1 nobs = u.shape[0] if nvars != nvarsex: print('exogenous variables not implemented not tested') if u.shape[1] != nvars: raise ValueError('u needs to have nvars columns') if initvalues is None: sar = np.zeros((nobs+nlagsm1, nvars)) start = nlagsm1 else: start = max(nlagsm1, initvalues.shape[0]) sar = np.zeros((nobs+start, nvars)) sar[start-initvalues.shape[0]:start] = initvalues #sar[nlagsm1:] = u sar[start:] = u #if version == 1: for i in range(start,start+nobs): for p in range(1,nlags): sar[i] += np.dot(sar[i-p,:],-ar[p]) return sar
[docs]def padone(x, front=0, back=0, axis=0, fillvalue=0): '''pad with zeros along one axis, currently only axis=0 can be used sequentially to pad several axis Examples -------- >>> padone(np.ones((2,3)),1,3,axis=1) array([[ 0., 1., 1., 1., 0., 0., 0.], [ 0., 1., 1., 1., 0., 0., 0.]]) >>> padone(np.ones((2,3)),1,1, fillvalue=np.nan) array([[ NaN, NaN, NaN], [ 1., 1., 1.], [ 1., 1., 1.], [ NaN, NaN, NaN]]) ''' #primitive version shape = np.array(x.shape) shape[axis] += (front + back) shapearr = np.array(x.shape) out = np.empty(shape) out.fill(fillvalue) startind = np.zeros(x.ndim) startind[axis] = front endind = startind + shapearr myslice = [slice(startind[k], endind[k]) for k in range(len(endind))] #print(myslice #print(out.shape #print(out[tuple(myslice)].shape out[tuple(myslice)] = x return out
[docs]def trimone(x, front=0, back=0, axis=0): '''trim number of array elements along one axis Examples -------- >>> xp = padone(np.ones((2,3)),1,3,axis=1) >>> xp array([[ 0., 1., 1., 1., 0., 0., 0.], [ 0., 1., 1., 1., 0., 0., 0.]]) >>> trimone(xp,1,3,1) array([[ 1., 1., 1.], [ 1., 1., 1.]]) ''' shape = np.array(x.shape) shape[axis] -= (front + back) #print(shape, front, back shapearr = np.array(x.shape) startind = np.zeros(x.ndim) startind[axis] = front endind = startind + shape myslice = [slice(startind[k], endind[k]) for k in range(len(endind))] #print(myslice #print(shape, endind #print(x[tuple(myslice)].shape return x[tuple(myslice)]
[docs]def ar2full(ar): '''make reduced lagpolynomial into a right side lagpoly array ''' nlags, nvar,nvarex = ar.shape return np.r_[np.eye(nvar,nvarex)[None,:,:],-ar]
[docs]def ar2lhs(ar): '''convert full (rhs) lagpolynomial into a reduced, left side lagpoly array this is mainly a reminder about the definition ''' return -ar[1:]
class _Var(object): '''obsolete VAR class, use tsa.VAR instead, for internal use only Examples -------- >>> v = Var(ar2s) >>> v.fit(1) >>> v.arhat array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.77784898, 0.01726193], [ 0.10733009, -0.78665335]]]) ''' def __init__(self, y): self.y = y self.nobs, self.nvars = y.shape def fit(self, nlags): '''estimate parameters using ols Parameters ---------- nlags : integer number of lags to include in regression, same for all variables Returns ------- None, but attaches arhat : array (nlags, nvar, nvar) full lag polynomial array arlhs : array (nlags-1, nvar, nvar) reduced lag polynomial for left hand side other statistics as returned by linalg.lstsq : need to be completed This currently assumes all parameters are estimated without restrictions. In this case SUR is identical to OLS estimation results are attached to the class instance ''' self.nlags = nlags # without current period nvars = self.nvars #TODO: ar2s looks like a module variable, bug? #lmat = lagmat(ar2s, nlags, trim='both', original='in') lmat = lagmat(self.y, nlags, trim='both', original='in') self.yred = lmat[:,:nvars] self.xred = lmat[:,nvars:] res = np.linalg.lstsq(self.xred, self.yred) self.estresults = res self.arlhs = res[0].reshape(nlags, nvars, nvars) self.arhat = ar2full(self.arlhs) self.rss = res[1] self.xredrank = res[2] def predict(self): '''calculate estimated timeseries (yhat) for sample ''' if not hasattr(self, 'yhat'): self.yhat = varfilter(self.y, self.arhat) return self.yhat def covmat(self): ''' covariance matrix of estimate # not sure it's correct, need to check orientation everywhere # looks ok, display needs getting used to >>> v.rss[None,None,:]*np.linalg.inv(np.dot(v.xred.T,v.xred))[:,:,None] array([[[ 0.37247445, 0.32210609], [ 0.1002642 , 0.08670584]], [[ 0.1002642 , 0.08670584], [ 0.45903637, 0.39696255]]]) >>> >>> v.rss[0]*np.linalg.inv(np.dot(v.xred.T,v.xred)) array([[ 0.37247445, 0.1002642 ], [ 0.1002642 , 0.45903637]]) >>> v.rss[1]*np.linalg.inv(np.dot(v.xred.T,v.xred)) array([[ 0.32210609, 0.08670584], [ 0.08670584, 0.39696255]]) ''' #check if orientation is same as self.arhat self.paramcov = (self.rss[None,None,:] * np.linalg.inv(np.dot(self.xred.T, self.xred))[:,:,None]) def forecast(self, horiz=1, u=None): '''calculates forcast for horiz number of periods at end of sample Parameters ---------- horiz : int (optional, default=1) forecast horizon u : array (horiz, nvars) error term for forecast periods. If None, then u is zero. Returns ------- yforecast : array (nobs+horiz, nvars) this includes the sample and the forecasts ''' if u is None: u = np.zeros((horiz, self.nvars)) return vargenerate(self.arhat, u, initvalues=self.y)
[docs]class VarmaPoly(object): '''class to keep track of Varma polynomial format Examples -------- ar23 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.6, 0. ], [ 0.2, -0.6]], [[-0.1, 0. ], [ 0.1, -0.1]]]) ma22 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[ 0.4, 0. ], [ 0.2, 0.3]]]) '''
[docs] def __init__(self, ar, ma=None): self.ar = ar self.ma = ma nlags, nvarall, nvars = ar.shape self.nlags, self.nvarall, self.nvars = nlags, nvarall, nvars self.isstructured = not (ar[0,:nvars] == np.eye(nvars)).all() if self.ma is None: self.ma = np.eye(nvars)[None,...] self.isindependent = True else: self.isindependent = not (ma[0] == np.eye(nvars)).all() self.malags = ar.shape[0] self.hasexog = nvarall > nvars self.arm1 = -ar[1:]
#@property
[docs] def vstack(self, a=None, name='ar'): '''stack lagpolynomial vertically in 2d array ''' if not a is None: a = a elif name == 'ar': a = self.ar elif name == 'ma': a = self.ma else: raise ValueError('no array or name given') return a.reshape(-1, self.nvarall)
#@property
[docs] def hstack(self, a=None, name='ar'): '''stack lagpolynomial horizontally in 2d array ''' if not a is None: a = a elif name == 'ar': a = self.ar elif name == 'ma': a = self.ma else: raise ValueError('no array or name given') return a.swapaxes(1,2).reshape(-1, self.nvarall).T
#@property
[docs] def stacksquare(self, a=None, name='ar', orientation='vertical'): '''stack lagpolynomial vertically in 2d square array with eye ''' if not a is None: a = a elif name == 'ar': a = self.ar elif name == 'ma': a = self.ma else: raise ValueError('no array or name given') astacked = a.reshape(-1, self.nvarall) lenpk, nvars = astacked.shape #[0] amat = np.eye(lenpk, k=nvars) amat[:,:nvars] = astacked return amat
#@property
[docs] def vstackarma_minus1(self): '''stack ar and lagpolynomial vertically in 2d array ''' a = np.concatenate((self.ar[1:], self.ma[1:]),0) return a.reshape(-1, self.nvarall)
#@property
[docs] def hstackarma_minus1(self): '''stack ar and lagpolynomial vertically in 2d array this is the Kalman Filter representation, I think ''' a = np.concatenate((self.ar[1:], self.ma[1:]),0) return a.swapaxes(1,2).reshape(-1, self.nvarall)
[docs] def getisstationary(self, a=None): '''check whether the auto-regressive lag-polynomial is stationary Returns ------- isstationary : boolean *attaches* areigenvalues : complex array eigenvalues sorted by absolute value References ---------- formula taken from NAG manual ''' if not a is None: a = a else: if self.isstructured: a = -self.reduceform(self.ar)[1:] else: a = -self.ar[1:] amat = self.stacksquare(a) ev = np.sort(np.linalg.eigvals(amat))[::-1] self.areigenvalues = ev return (np.abs(ev) < 1).all()
[docs] def getisinvertible(self, a=None): '''check whether the auto-regressive lag-polynomial is stationary Returns ------- isinvertible : boolean *attaches* maeigenvalues : complex array eigenvalues sorted by absolute value References ---------- formula taken from NAG manual ''' if not a is None: a = a else: if self.isindependent: a = self.reduceform(self.ma)[1:] else: a = self.ma[1:] if a.shape[0] == 0: # no ma lags self.maeigenvalues = np.array([], np.complex) return True amat = self.stacksquare(a) ev = np.sort(np.linalg.eigvals(amat))[::-1] self.maeigenvalues = ev return (np.abs(ev) < 1).all()
[docs] def reduceform(self, apoly): ''' this assumes no exog, todo ''' if apoly.ndim != 3: raise ValueError('apoly needs to be 3d') nlags, nvarsex, nvars = apoly.shape a = np.empty_like(apoly) try: a0inv = np.linalg.inv(a[0,:nvars, :]) except np.linalg.LinAlgError: raise ValueError('matrix not invertible', 'ask for implementation of pinv') for lag in range(nlags): a[lag] = np.dot(a0inv, apoly[lag]) return a
if __name__ == "__main__": # some example lag polynomials a21 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.8, 0. ], [ 0., -0.6]]]) a22 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.8, 0. ], [ 0.1, -0.8]]]) a23 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.8, 0.2], [ 0.1, -0.6]]]) a24 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.6, 0. ], [ 0.2, -0.6]], [[-0.1, 0. ], [ 0.1, -0.1]]]) a31 = np.r_[np.eye(3)[None,:,:], 0.8*np.eye(3)[None,:,:]] a32 = np.array([[[ 1. , 0. , 0. ], [ 0. , 1. , 0. ], [ 0. , 0. , 1. ]], [[ 0.8, 0. , 0. ], [ 0.1, 0.6, 0. ], [ 0. , 0. , 0.9]]]) ######## ut = np.random.randn(1000,2) ar2s = vargenerate(a22,ut) #res = np.linalg.lstsq(lagmat(ar2s,1)[:,1:], ar2s) res = np.linalg.lstsq(lagmat(ar2s,1), ar2s) bhat = res[0].reshape(1,2,2) arhat = ar2full(bhat) #print(maxabs(arhat - a22) v = _Var(ar2s) v.fit(1) v.forecast() v.forecast(25)[-30:] ar23 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-0.6, 0. ], [ 0.2, -0.6]], [[-0.1, 0. ], [ 0.1, -0.1]]]) ma22 = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[ 0.4, 0. ], [ 0.2, 0.3]]]) ar23ns = np.array([[[ 1. , 0. ], [ 0. , 1. ]], [[-1.9, 0. ], [ 0.4, -0.6]], [[ 0.3, 0. ], [ 0.1, -0.1]]]) vp = VarmaPoly(ar23, ma22) print(vars(vp)) print(vp.vstack()) print(vp.vstack(a24)) print(vp.hstackarma_minus1()) print(vp.getisstationary()) print(vp.getisinvertible()) vp2 = VarmaPoly(ar23ns) print(vp2.getisstationary()) print(vp2.getisinvertible()) # no ma lags