Vector Autoregressive Moving Average with eXogenous regressors model
Author: Chad Fulton
License: Simplified-BSD
from __future__ import division, absolute_import, print_function
from warnings import warn
from statsmodels.compat.collections import OrderedDict
import pandas as pd
import numpy as np
from .kalman_filter import (
KalmanFilter, FilterResults, INVERT_UNIVARIATE, SOLVE_LU
from .mlemodel import MLEModel, MLEResults, MLEResultsWrapper
from .tools import (
companion_matrix, diff, is_invertible,
constrain_stationary_multivariate, unconstrain_stationary_multivariate
from statsmodels.tools.tools import Bunch
from statsmodels.tools.data import _is_using_pandas
from statsmodels.tsa.vector_ar import var_model
import statsmodels.base.wrapper as wrap
from statsmodels.tools.sm_exceptions import (EstimationWarning, ValueWarning)
[docs]class VARMAX(MLEModel):
Vector Autoregressive Moving Average with eXogenous regressors model
endog : array_like
The observed time-series process :math:`y`, , shaped nobs x k_endog.
exog : array_like, optional
Array of exogenous regressors, shaped nobs x k.
order : iterable
The (p,q) order of the model for the number of AR and MA parameters to
trend : {'nc', 'c'}, optional
Parameter controlling the deterministic trend polynomial.
Can be specified as a string where 'c' indicates a constant intercept
and 'nc' indicates no intercept term.
error_cov_type : {'diagonal', 'unstructured'}, optional
The structure of the covariance matrix of the error term, where
"unstructured" puts no restrictions on the matrix and "diagonal"
requires it to be a diagonal matrix (uncorrelated errors). Default is
measurement_error : boolean, optional
Whether or not to assume the endogenous observations `endog` were
measured with error. Default is False.
enforce_stationarity : boolean, optional
Whether or not to transform the AR parameters to enforce stationarity
in the autoregressive component of the model. Default is True.
enforce_invertibility : boolean, optional
Whether or not to transform the MA parameters to enforce invertibility
in the moving average component of the model. Default is True.
Keyword arguments may be used to provide default values for state space
matrices or for Kalman filtering options. See `Representation`, and
`KalmanFilter` for more details.
Generically, the VARMAX model is specified (see for example chapter 18 of
.. math::
y_t = \nu + A_1 y_{t-1} + \dots + A_p y_{t-p} + B x_t + \epsilon_t +
M_1 \epsilon_{t-1} + \dots M_q \epsilon_{t-q}
where :math:`\epsilon_t \sim N(0, \Omega)`, and where :math:`y_t` is a
`k_endog x 1` vector. Additionally, this model allows considering the case
where the variables are measured with error.
Note that in the full VARMA(p,q) case there is a fundamental identification
problem in that the coefficient matrices :math:`\{A_i, M_j\}` are not
generally unique, meaning that for a given time series process there may
be multiple sets of matrices that equivalently represent it. See Chapter 12
of [1]_ for more informationl. Although this class can be used to estimate
VARMA(p,q) models, a warning is issued to remind users that no steps have
been taken to ensure identification in this case.
.. [1] Lutkepohl, Helmut. 2007.
New Introduction to Multiple Time Series Analysis.
Berlin: Springer.
def __init__(self, endog, exog=None, order=(1, 0), trend='c',
error_cov_type='unstructured', measurement_error=False,
enforce_stationarity=True, enforce_invertibility=True,
# Model parameters
self.error_cov_type = error_cov_type
self.measurement_error = measurement_error
self.enforce_stationarity = enforce_stationarity
self.enforce_invertibility = enforce_invertibility
# Save the given orders
self.order = order
self.trend = trend
# Model orders
self.k_ar = int(order[0])
self.k_ma = int(order[1])
self.k_trend = int(self.trend == 'c')
# Check for valid model
if trend not in ['c', 'nc']:
raise ValueError('Invalid trend specification.')
if error_cov_type not in ['diagonal', 'unstructured']:
raise ValueError('Invalid error covariance matrix type'
' specification.')
if self.k_ar == 0 and self.k_ma == 0:
raise ValueError('Invalid VARMAX(p,q) specification; at least one'
' p,q must be greater than zero.')
# Warn for VARMA model
if self.k_ar > 0 and self.k_ma > 0:
warn('Estimation of VARMA(p,q) models is not generically robust,'
' due especially to identification issues.',
# Exogenous data
self.k_exog = 0
if exog is not None:
exog_is_using_pandas = _is_using_pandas(exog, None)
if not exog_is_using_pandas:
exog = np.asarray(exog)
# Make sure we have 2-dimensional array
if exog.ndim == 1:
if not exog_is_using_pandas:
exog = exog[:, None]
exog = pd.DataFrame(exog)
self.k_exog = exog.shape[1]
# Note: at some point in the future might add state regression, as in
self.mle_regression = self.k_exog > 0
# We need to have an array or pandas at this point
if not _is_using_pandas(endog, None):
endog = np.asanyarray(endog)
# Model order
# Used internally in various places
_min_k_ar = max(self.k_ar, 1)
self._k_order = _min_k_ar + self.k_ma
# Number of states
k_endog = endog.shape[1]
k_posdef = k_endog
k_states = k_endog * self._k_order
# By default, initialize as stationary
kwargs.setdefault('initialization', 'stationary')
# By default, use LU decomposition
kwargs.setdefault('inversion_method', INVERT_UNIVARIATE | SOLVE_LU)
# Initialize the state space model
super(VARMAX, self).__init__(
endog, exog=exog, k_states=k_states, k_posdef=k_posdef, **kwargs
# Set as time-varying model if we have time-trend or exog
if self.k_exog > 0 or self.k_trend > 1:
self.ssm._time_invariant = False
# Initialize the parameters
self.parameters = OrderedDict()
self.parameters['trend'] = self.k_endog * self.k_trend
self.parameters['ar'] = self.k_endog**2 * self.k_ar
self.parameters['ma'] = self.k_endog**2 * self.k_ma
self.parameters['regression'] = self.k_endog * self.k_exog
if self.error_cov_type == 'diagonal':
self.parameters['state_cov'] = self.k_endog
# These parameters fill in a lower-triangular matrix which is then
# dotted with itself to get a positive definite matrix.
elif self.error_cov_type == 'unstructured':
self.parameters['state_cov'] = (
int(self.k_endog * (self.k_endog + 1) / 2)
self.parameters['obs_cov'] = self.k_endog * self.measurement_error
self.k_params = sum(self.parameters.values())
# Initialize known elements of the state space matrices
# If we have exog effects, then the state intercept needs to be
# time-varying
if self.k_exog > 0:
self.ssm['state_intercept'] = np.zeros((self.k_states, self.nobs))
# The design matrix is just an identity for the first k_endog states
idx = np.diag_indices(self.k_endog)
self.ssm[('design',) + idx] = 1
# The transition matrix is described in four blocks, where the upper
# left block is in companion form with the autoregressive coefficient
# matrices (so it is shaped k_endog * k_ar x k_endog * k_ar) ...
if self.k_ar > 0:
idx = np.diag_indices((self.k_ar - 1) * self.k_endog)
idx = idx[0] + self.k_endog, idx[1]
self.ssm[('transition',) + idx] = 1
# ... and the lower right block is in companion form with zeros as the
# coefficient matrices (it is shaped k_endog * k_ma x k_endog * k_ma).
idx = np.diag_indices((self.k_ma - 1) * self.k_endog)
idx = (idx[0] + (_min_k_ar + 1) * self.k_endog,
idx[1] + _min_k_ar * self.k_endog)
self.ssm[('transition',) + idx] = 1
# The selection matrix is described in two blocks, where the upper
# block selects the all k_posdef errors in the first k_endog rows
# (the upper block is shaped k_endog * k_ar x k) and the lower block
# also selects all k_posdef errors in the first k_endog rows (the lower
# block is shaped k_endog * k_ma x k).
idx = np.diag_indices(self.k_endog)
self.ssm[('selection',) + idx] = 1
idx = idx[0] + _min_k_ar * self.k_endog, idx[1]
if self.k_ma > 0:
self.ssm[('selection',) + idx] = 1
# Cache some indices
if self.trend == 'c' and self.k_exog == 0:
self._idx_state_intercept = np.s_['state_intercept', :k_endog]
elif self.k_exog > 0:
self._idx_state_intercept = np.s_['state_intercept', :k_endog, :]
if self.k_ar > 0:
self._idx_transition = np.s_['transition', :k_endog, :]
self._idx_transition = np.s_['transition', :k_endog, k_endog:]
if self.error_cov_type == 'diagonal':
self._idx_state_cov = (
('state_cov',) + np.diag_indices(self.k_endog))
elif self.error_cov_type == 'unstructured':
self._idx_lower_state_cov = np.tril_indices(self.k_endog)
if self.measurement_error:
self._idx_obs_cov = ('obs_cov',) + np.diag_indices(self.k_endog)
# Cache some slices
def _slice(key, offset):
length = self.parameters[key]
param_slice = np.s_[offset:offset + length]
offset += length
return param_slice, offset
offset = 0
self._params_trend, offset = _slice('trend', offset)
self._params_ar, offset = _slice('ar', offset)
self._params_ma, offset = _slice('ma', offset)
self._params_regression, offset = _slice('regression', offset)
self._params_state_cov, offset = _slice('state_cov', offset)
self._params_obs_cov, offset = _slice('obs_cov', offset)
[docs] def filter(self, params, **kwargs):
kwargs.setdefault('results_class', VARMAXResults)
kwargs.setdefault('results_wrapper_class', VARMAXResultsWrapper)
return super(VARMAX, self).filter(params, **kwargs)
[docs] def smooth(self, params, **kwargs):
kwargs.setdefault('results_class', VARMAXResults)
kwargs.setdefault('results_wrapper_class', VARMAXResultsWrapper)
return super(VARMAX, self).smooth(params, **kwargs)
def start_params(self):
params = np.zeros(self.k_params, dtype=np.float64)
# A. Run a multivariate regression to get beta estimates
endog = self.endog.copy()
exog = self.exog.copy() if self.k_exog > 0 else None
# Although the Kalman filter can deal with missing values in endog,
# conditional sum of squares cannot
if np.any(np.isnan(endog)):
mask = ~np.any(np.isnan(endog), axis=1)
endog = endog[mask]
if exog is not None:
exog = exog[mask]
# Regression effects via OLS
exog_params = np.zeros(0)
if self.k_exog > 0:
exog_params = np.linalg.pinv(exog).dot(endog).T
endog -= np.dot(exog, exog_params.T)
# B. Run a VAR model on endog to get trend, AR parameters
ar_params = []
k_ar = self.k_ar if self.k_ar > 0 else 1
mod_ar = var_model.VAR(endog)
res_ar = mod_ar.fit(maxlags=k_ar, ic=None, trend=self.trend)
ar_params = np.array(res_ar.params.T)
if self.trend == 'c':
trend_params = ar_params[:, 0]
if self.k_ar > 0:
ar_params = ar_params[:, 1:].ravel()
ar_params = []
elif self.k_ar > 0:
ar_params = ar_params.ravel()
ar_params = []
endog = res_ar.resid
# Test for stationarity
if self.k_ar > 0 and self.enforce_stationarity:
coefficient_matrices = (
self.k_endog * self.k_ar, self.k_endog
).reshape(self.k_endog, self.k_endog, self.k_ar).T
stationary = is_invertible([1] + list(-coefficient_matrices))
if not stationary:
raise ValueError('Non-stationary starting autoregressive'
' parameters found with `enforce_stationarity`'
' set to True.')
# C. Run a VAR model on the residuals to get MA parameters
ma_params = []
if self.k_ma > 0:
mod_ma = var_model.VAR(endog)
res_ma = mod_ma.fit(maxlags=self.k_ma, ic=None, trend='nc')
ma_params = np.array(res_ma.params.T).ravel()
# Test for invertibility
if self.enforce_invertibility:
coefficient_matrices = (
self.k_endog * self.k_ma, self.k_endog
).reshape(self.k_endog, self.k_endog, self.k_ma).T
invertible = is_invertible([1] + list(-coefficient_matrices))
if not invertible:
raise ValueError('Non-invertible starting moving-average'
' parameters found with `enforce_stationarity`'
' set to True.')
# 1. Intercept terms
if self.trend == 'c':
params[self._params_trend] = trend_params
# 2. AR terms
params[self._params_ar] = ar_params
# 3. MA terms
params[self._params_ma] = ma_params
# 4. Regression terms
if self.mle_regression:
params[self._params_regression] = exog_params.ravel()
# 5. State covariance terms
if self.error_cov_type == 'diagonal':
params[self._params_state_cov] = res_ar.sigma_u.diagonal()
elif self.error_cov_type == 'unstructured':
cov_factor = np.linalg.cholesky(res_ar.sigma_u)
params[self._params_state_cov] = (
# 5. Measurement error variance terms
if self.measurement_error:
if self.k_ma > 0:
params[self._params_obs_cov] = res_ma.sigma_u.diagonal()
params[self._params_obs_cov] = res_ar.sigma_u.diagonal()
return params
def param_names(self):
param_names = []
# 1. Intercept terms
if self.trend == 'c':
param_names += [
'const.%s' % self.endog_names[i]
for i in range(self.k_endog)
# 2. AR terms
param_names += [
'L%d.%s.%s' % (i+1, self.endog_names[k], self.endog_names[j])
for j in range(self.k_endog)
for i in range(self.k_ar)
for k in range(self.k_endog)
# 3. MA terms
param_names += [
'L%d.e(%s).%s' % (i+1, self.endog_names[k], self.endog_names[j])
for j in range(self.k_endog)
for i in range(self.k_ma)
for k in range(self.k_endog)
# 4. Regression terms
param_names += [
'beta.%s.%s' % (self.exog_names[j], self.endog_names[i])
for i in range(self.k_endog)
for j in range(self.k_exog)
# 5. State covariance terms
if self.error_cov_type == 'diagonal':
param_names += [
'sigma2.%s' % self.endog_names[i]
for i in range(self.k_endog)
elif self.error_cov_type == 'unstructured':
param_names += [
('sqrt.var.%s' % self.endog_names[i] if i == j else
'sqrt.cov.%s.%s' % (self.endog_names[j], self.endog_names[i]))
for i in range(self.k_endog)
for j in range(i+1)
# 5. Measurement error variance terms
if self.measurement_error:
param_names += [
'measurement_variance.%s' % self.endog_names[i]
for i in range(self.k_endog)
return param_names
[docs] def update(self, params, **kwargs):
params = super(VARMAX, self).update(params, **kwargs)
# 1. State intercept
if self.mle_regression:
exog_params = params[self._params_regression].reshape(
self.k_endog, self.k_exog).T
intercept = np.dot(self.exog, exog_params)
if self.trend == 'c':
intercept += params[self._params_trend]
self.ssm[self._idx_state_intercept] = intercept.T
elif self.trend == 'c':
self.ssm[self._idx_state_intercept] = params[self._params_trend]
# 2. Transition
ar = params[self._params_ar].reshape(
self.k_endog, self.k_endog * self.k_ar)
ma = params[self._params_ma].reshape(
self.k_endog, self.k_endog * self.k_ma)
self.ssm[self._idx_transition] = np.c_[ar, ma]
# 3. State covariance
if self.error_cov_type == 'diagonal':
self.ssm[self._idx_state_cov] = (
elif self.error_cov_type == 'unstructured':
state_cov_lower = np.zeros(self.ssm['state_cov'].shape,
state_cov_lower[self._idx_lower_state_cov] = (
self.ssm['state_cov'] = np.dot(state_cov_lower, state_cov_lower.T)
# 4. Observation covariance
if self.measurement_error:
self.ssm[self._idx_obs_cov] = params[self._params_obs_cov]
[docs]class VARMAXResults(MLEResults):
Class to hold results from fitting an VARMAX model.
model : VARMAX instance
The fitted model instance
specification : dictionary
Dictionary including all attributes from the VARMAX model instance.
coefficient_matrices_var : array
Array containing autoregressive lag polynomial coefficient matrices,
ordered from lowest degree to highest.
coefficient_matrices_vma : array
Array containing moving average lag polynomial coefficients,
ordered from lowest degree to highest.
See Also
def __init__(self, model, params, filter_results, cov_type='opg',
super(VARMAXResults, self).__init__(model, params, filter_results,
cov_type, **kwargs)
self.df_resid = np.inf # attribute required for wald tests
self.specification = Bunch(**{
# Set additional model parameters
'error_cov_type': self.model.error_cov_type,
'measurement_error': self.model.measurement_error,
'enforce_stationarity': self.model.enforce_stationarity,
'enforce_invertibility': self.model.enforce_invertibility,
'order': self.model.order,
# Model order
'k_ar': self.model.k_ar,
'k_ma': self.model.k_ma,
# Trend / Regression
'trend': self.model.trend,
'k_trend': self.model.k_trend,
'k_exog': self.model.k_exog,
# Polynomials / coefficient matrices
self.coefficient_matrices_var = None
self.coefficient_matrices_vma = None
if self.model.k_ar > 0:
ar_params = np.array(self.params[self.model._params_ar])
k_endog = self.model.k_endog
k_ar = self.model.k_ar
self.coefficient_matrices_var = (
ar_params.reshape(k_endog * k_ar, k_endog).T
).reshape(k_endog, k_endog, k_ar).T
if self.model.k_ma > 0:
ma_params = np.array(self.params[self.model._params_ma])
k_endog = self.model.k_endog
k_ma = self.model.k_ma
self.coefficient_matrices_vma = (
ma_params.reshape(k_endog * k_ma, k_endog).T
).reshape(k_endog, k_endog, k_ma).T
[docs] def get_prediction(self, start=None, end=None, dynamic=False, exog=None,
In-sample prediction and out-of-sample forecasting
start : int, str, or datetime, optional
Zero-indexed observation number at which to start forecasting, ie.,
the first forecast is start. Can also be a date string to
parse or a datetime type. Default is the the zeroth observation.
end : int, str, or datetime, optional
Zero-indexed observation number at which to end forecasting, ie.,
the first forecast is start. Can also be a date string to
parse or a datetime type. However, if the dates index does not
have a fixed frequency, end must be an integer index if you
want out of sample prediction. Default is the last observation in
the sample.
exog : array_like, optional
If the model includes exogenous regressors, you must provide
exactly enough out-of-sample values for the exogenous variables if
end is beyond the last observation in the sample.
dynamic : boolean, int, str, or datetime, optional
Integer offset relative to `start` at which to begin dynamic
prediction. Can also be an absolute date string to parse or a
datetime type (these are not interpreted as offsets).
Prior to this observation, true endogenous values will be used for
prediction; starting with this observation and continuing through
the end of prediction, forecasted endogenous values will be used
Additional arguments may required for forecasting beyond the end
of the sample. See `FilterResults.predict` for more details.
forecast : array
Array of out of sample forecasts.
if start is None:
start = 0
# Handle end (e.g. date)
_start = self.model._get_predict_start(start)
_end, _out_of_sample = self.model._get_predict_end(end)
# Handle exogenous parameters
if _out_of_sample and (self.model.k_exog + self.model.k_trend > 0):
# Create a new faux VARMAX model for the extended dataset
nobs = self.model.data.orig_endog.shape[0] + _out_of_sample
endog = np.zeros((nobs, self.model.k_endog))
if self.model.k_exog > 0:
if exog is None:
raise ValueError('Out-of-sample forecasting in a model'
' with a regression component requires'
' additional exogenous values via the'
' `exog` argument.')
exog = np.array(exog)
required_exog_shape = (_out_of_sample, self.model.k_exog)
if not exog.shape == required_exog_shape:
raise ValueError('Provided exogenous values are not of the'
' appropriate shape. Required %s, got %s.'
% (str(required_exog_shape),
exog = np.c_[self.model.data.orig_exog.T, exog.T].T
# TODO replace with init_kwds or specification or similar
model = VARMAX(
# Set the kwargs with the update time-varying state space
# representation matrices
for name in self.filter_results.shapes.keys():
if name == 'obs':
mat = getattr(model.ssm, name)
if mat.shape[-1] > 1:
if len(mat.shape) == 2:
kwargs[name] = mat[:, -_out_of_sample:]
kwargs[name] = mat[:, :, -_out_of_sample:]
elif self.model.k_exog == 0 and exog is not None:
warn('Exogenous array provided to predict, but additional data not'
' required. `exog` argument ignored.', ValueWarning)
return super(VARMAXResults, self).get_prediction(
start=start, end=end, dynamic=dynamic, exog=exog, **kwargs
[docs] def summary(self, alpha=.05, start=None, separate_params=True):
from statsmodels.iolib.summary import summary_params
# Create the model name
spec = self.specification
if spec.k_ar > 0 and spec.k_ma > 0:
model_name = 'VARMA'
order = '(%s,%s)' % (spec.k_ar, spec.k_ma)
elif spec.k_ar > 0:
model_name = 'VAR'
order = '(%s)' % (spec.k_ar)
model_name = 'VMA'
order = '(%s)' % (spec.k_ma)
if spec.k_exog > 0:
model_name += 'X'
model_name = [model_name + order]
if spec.trend == 'c':
if spec.measurement_error:
model_name.append('measurement error')
summary = super(VARMAXResults, self).summary(
alpha=alpha, start=start, model_name=model_name,
display_params=not separate_params
if separate_params:
indices = np.arange(len(self.params))
def make_table(self, mask, title, strip_end=True):
res = (self, self.params[mask], self.bse[mask],
self.zvalues[mask], self.pvalues[mask],
param_names = [
'.'.join(name.split('.')[:-1]) if strip_end else name
for name in
return summary_params(res, yname=None, xname=param_names,
alpha=alpha, use_t=False, title=title)
# Add parameter tables for each endogenous variable
k_endog = self.model.k_endog
k_ar = self.model.k_ar
k_ma = self.model.k_ma
k_exog = self.model.k_exog
endog_masks = []
for i in range(k_endog):
masks = []
offset = 0
# 1. Intercept terms
if self.model.trend == 'c':
masks.append(np.array(i, ndmin=1))
offset += k_endog
# 2. AR terms
if k_ar > 0:
start = i * k_endog * k_ar
end = (i + 1) * k_endog * k_ar
offset + np.arange(start, end))
offset += k_ar * k_endog**2
# 3. MA terms
if k_ma > 0:
start = i * k_endog * k_ma
end = (i + 1) * k_endog * k_ma
offset + np.arange(start, end))
offset += k_ma * k_endog**2
# 4. Regression terms
if k_exog > 0:
offset + np.arange(i * k_exog, (i + 1) * k_exog))
offset += k_endog * k_exog
# 5. Measurement error variance terms
if self.model.measurement_error:
masks.append(np.array(self.model.k_params - i - 1, ndmin=1))
# Create the table
mask = np.concatenate(masks)
title = "Results for equation %s" % self.model.endog_names[i]
table = make_table(self, mask, title)
# State covariance terms
state_cov_mask = (
table = make_table(self, state_cov_mask, "Error covariance matrix",
# Add a table for all other parameters
masks = []
for m in (endog_masks, [state_cov_mask]):
m = np.array(m).flatten()
if len(m) > 0:
masks = np.concatenate(masks)
inverse_mask = np.array(list(set(indices).difference(set(masks))))
if len(inverse_mask) > 0:
table = make_table(self, inverse_mask, "Other parameters",
return summary
summary.__doc__ = MLEResults.summary.__doc__
class VARMAXResultsWrapper(MLEResultsWrapper):
_attrs = {}
_wrap_attrs = wrap.union_dicts(MLEResultsWrapper._wrap_attrs,
_methods = {}
_wrap_methods = wrap.union_dicts(MLEResultsWrapper._wrap_methods,
wrap.populate_wrapper(VARMAXResultsWrapper, VARMAXResults)