Source code for statsmodels.tsa.vector_ar.dynamic

# pylint: disable=W0201

from statsmodels.compat.python import iteritems, string_types, range
import numpy as np
from statsmodels.tools.decorators import cache_readonly
import pandas as pd

from . import var_model as _model
from . import util
from . import plotting

FULL_SAMPLE = 0
ROLLING = 1
EXPANDING = 2


def _get_window_type(window_type):
    if window_type in (FULL_SAMPLE, ROLLING, EXPANDING):
        return window_type
    elif isinstance(window_type, string_types):
        window_type_up = window_type.upper()

        if window_type_up in ('FULL SAMPLE', 'FULL_SAMPLE'):
            return FULL_SAMPLE
        elif window_type_up == 'ROLLING':
            return ROLLING
        elif window_type_up == 'EXPANDING':
            return EXPANDING

    raise Exception('Unrecognized window type: %s' % window_type)


[docs]class DynamicVAR(object): """ Estimates time-varying vector autoregression (VAR(p)) using equation-by-equation least squares Parameters ---------- data : pandas.DataFrame lag_order : int, default 1 window : int window_type : {'expanding', 'rolling'} min_periods : int or None Minimum number of observations to require in window, defaults to window size if None specified trend : {'c', 'nc', 'ct', 'ctt'} TODO Returns ------- **Attributes**: coefs : Panel items : coefficient names major_axis : dates minor_axis : VAR equation names """ def __init__(self, data, lag_order=1, window=None, window_type='expanding', trend='c', min_periods=None): self.lag_order = lag_order self.names = list(data.columns) self.neqs = len(self.names) self._y_orig = data # TODO: deal with trend self._x_orig = _make_lag_matrix(data, lag_order) self._x_orig['intercept'] = 1 (self.y, self.x, self.x_filtered, self._index, self._time_has_obs) = _filter_data(self._y_orig, self._x_orig) self.lag_order = lag_order self.trendorder = util.get_trendorder(trend) self._set_window(window_type, window, min_periods) def _set_window(self, window_type, window, min_periods): self._window_type = _get_window_type(window_type) if self._is_rolling: if window is None: raise Exception('Must pass window when doing rolling ' 'regression') if min_periods is None: min_periods = window else: window = len(self.x) if min_periods is None: min_periods = 1 self._window = int(window) self._min_periods = min_periods @cache_readonly
[docs] def T(self): """ Number of time periods in results """ return len(self.result_index)
@property def nobs(self): # Stub, do I need this? data = dict((eq, r.nobs) for eq, r in iteritems(self.equations)) return pd.DataFrame(data) @cache_readonly
[docs] def equations(self): eqs = {} for col, ts in iteritems(self.y): # TODO: Remove in favor of statsmodels implemetation model = pd.ols(y=ts, x=self.x, window=self._window, window_type=self._window_type, min_periods=self._min_periods) eqs[col] = model return eqs
@cache_readonly
[docs] def coefs(self): """ Return dynamic regression coefficients as Panel """ data = {} for eq, result in iteritems(self.equations): data[eq] = result.beta panel = pd.Panel.fromDict(data) # Coefficient names become items return panel.swapaxes('items', 'minor')
@property def result_index(self): return self.coefs.major_axis @cache_readonly def _coefs_raw(self): """ Reshape coefficients to be more amenable to dynamic calculations Returns ------- coefs : (time_periods x lag_order x neqs x neqs) """ coef_panel = self.coefs.copy() del coef_panel['intercept'] coef_values = coef_panel.swapaxes('items', 'major').values coef_values = coef_values.reshape((len(coef_values), self.lag_order, self.neqs, self.neqs)) return coef_values @cache_readonly def _intercepts_raw(self): """ Similar to _coefs_raw, return intercept values in easy-to-use matrix form Returns ------- intercepts : (T x K) """ return self.coefs['intercept'].values @cache_readonly
[docs] def resid(self): data = {} for eq, result in iteritems(self.equations): data[eq] = result.resid return pd.DataFrame(data)
[docs] def forecast(self, steps=1): """ Produce dynamic forecast Parameters ---------- steps Returns ------- forecasts : pandas.DataFrame """ output = np.empty((self.T - steps, self.neqs)) y_values = self.y.values y_index_map = dict((d, idx) for idx, d in enumerate(self.y.index)) result_index_map = dict((d, idx) for idx, d in enumerate(self.result_index)) coefs = self._coefs_raw intercepts = self._intercepts_raw # can only produce this many forecasts forc_index = self.result_index[steps:] for i, date in enumerate(forc_index): # TODO: check that this does the right thing in weird cases... idx = y_index_map[date] - steps result_idx = result_index_map[date] - steps y_slice = y_values[:idx] forcs = _model.forecast(y_slice, coefs[result_idx], intercepts[result_idx], steps) output[i] = forcs[-1] return pd.DataFrame(output, index=forc_index, columns=self.names)
[docs] def plot_forecast(self, steps=1, figsize=(10, 10)): """ Plot h-step ahead forecasts against actual realizations of time series. Note that forecasts are lined up with their respective realizations. Parameters ---------- steps : """ import matplotlib.pyplot as plt fig, axes = plt.subplots(figsize=figsize, nrows=self.neqs, sharex=True) forc = self.forecast(steps=steps) dates = forc.index y_overlay = self.y.reindex(dates) for i, col in enumerate(forc.columns): ax = axes[i] y_ts = y_overlay[col] forc_ts = forc[col] y_handle = ax.plot(dates, y_ts.values, 'k.', ms=2) forc_handle = ax.plot(dates, forc_ts.values, 'k-') lines = (y_handle[0], forc_handle[0]) labels = ('Y', 'Forecast') fig.legend(lines,labels) fig.autofmt_xdate() fig.suptitle('Dynamic %d-step forecast' % steps) # pretty things up a bit plotting.adjust_subplots(bottom=0.15, left=0.10) plt.draw_if_interactive()
@property def _is_rolling(self): return self._window_type == ROLLING @cache_readonly
[docs] def r2(self): """Returns the r-squared values.""" data = dict((eq, r.r2) for eq, r in iteritems(self.equations)) return pd.DataFrame(data)
class DynamicPanelVAR(DynamicVAR): """ Dynamic (time-varying) panel vector autoregression using panel ordinary least squares Parameters ---------- """ def __init__(self, data, lag_order=1, window=None, window_type='expanding', trend='c', min_periods=None): self.lag_order = lag_order self.neqs = len(data.columns) self._y_orig = data # TODO: deal with trend self._x_orig = _make_lag_matrix(data, lag_order) self._x_orig['intercept'] = 1 (self.y, self.x, self.x_filtered, self._index, self._time_has_obs) = _filter_data(self._y_orig, self._x_orig) self.lag_order = lag_order self.trendorder = util.get_trendorder(trend) self._set_window(window_type, window, min_periods) def _filter_data(lhs, rhs): """ Data filtering routine for dynamic VAR lhs : DataFrame original data rhs : DataFrame lagged variables Returns ------- """ def _has_all_columns(df): return np.isfinite(df.values).sum(1) == len(df.columns) rhs_valid = _has_all_columns(rhs) if not rhs_valid.all(): pre_filtered_rhs = rhs[rhs_valid] else: pre_filtered_rhs = rhs index = lhs.index.union(rhs.index) if not index.equals(rhs.index) or not index.equals(lhs.index): rhs = rhs.reindex(index) lhs = lhs.reindex(index) rhs_valid = _has_all_columns(rhs) lhs_valid = _has_all_columns(lhs) valid = rhs_valid & lhs_valid if not valid.all(): filt_index = rhs.index[valid] filtered_rhs = rhs.reindex(filt_index) filtered_lhs = lhs.reindex(filt_index) else: filtered_rhs, filtered_lhs = rhs, lhs return filtered_lhs, filtered_rhs, pre_filtered_rhs, index, valid def _make_lag_matrix(x, lags): data = {} columns = [] for i in range(1, 1 + lags): lagstr = 'L%d.'% i lag = x.shift(i).rename(columns=lambda c: lagstr + c) data.update(lag._series) columns.extend(lag.columns) return pd.DataFrame(data, columns=columns) class Equation(object): """ Stub, estimate one equation """ def __init__(self, y, x): pass if __name__ == '__main__': import pandas.util.testing as ptest ptest.N = 500 data = ptest.makeTimeDataFrame().cumsum(0) var = DynamicVAR(data, lag_order=2, window_type='expanding') var2 = DynamicVAR(data, lag_order=2, window=10, window_type='rolling')