"""
Markov switching regression models
Author: Chad Fulton
License: BSD-3
"""
from __future__ import division, absolute_import, print_function
import numpy as np
import statsmodels.base.wrapper as wrap
from statsmodels.tsa.regime_switching import markov_switching
[docs]class MarkovRegression(markov_switching.MarkovSwitching):
    r"""
    First-order k-regime Markov switching regression model
    Parameters
    ----------
    endog : array_like
        The endogenous variable.
    k_regimes : integer
        The number of regimes.
    trend : {'nc', 'c', 't', 'ct'}
        Whether or not to include a trend. To include an intercept, time trend,
        or both, set `trend='c'`, `trend='t'`, or `trend='ct'`. For no trend,
        set `trend='nc'`. Default is an intercept.
    exog : array_like, optional
        Array of exogenous regressors, shaped nobs x k.
    order : integer, optional
        The order of the model describes the dependence of the likelihood on
        previous regimes. This depends on the model in question and should be
        set appropriately by subclasses.
    exog_tvtp : array_like, optional
        Array of exogenous or lagged variables to use in calculating
        time-varying transition probabilities (TVTP). TVTP is only used if this
        variable is provided. If an intercept is desired, a column of ones must
        be explicitly included in this array.
    switching_trend : boolean or iterable, optional
        If a boolean, sets whether or not all trend coefficients are
        switching across regimes. If an iterable, should be of length equal
        to the number of trend variables, where each element is
        a boolean describing whether the corresponding coefficient is
        switching. Default is True.
    switching_exog : boolean or iterable, optional
        If a boolean, sets whether or not all regression coefficients are
        switching across regimes. If an iterable, should be of length equal
        to the number of exogenous variables, where each element is
        a boolean describing whether the corresponding coefficient is
        switching. Default is True.
    switching_variance : boolean, optional
        Whether or not there is regime-specific heteroskedasticity, i.e.
        whether or not the error term has a switching variance. Default is
        False.
    Notes
    -----
    This model is new and API stability is not guaranteed, although changes
    will be made in a backwards compatible way if possible.
    The model can be written as:
    .. math::
        y_t = a_{S_t} + x_t' \beta_{S_t} + \varepsilon_t \\
        \varepsilon_t \sim N(0, \sigma_{S_t}^2)
    i.e. the model is a dynamic linear regression where the coefficients and
    the variance of the error term may be switching across regimes.
    The `trend` is accomodated by prepending columns to the `exog` array. Thus
    if `trend='c'`, the passed `exog` array should not already have a column of
    ones.
    References
    ----------
    Kim, Chang-Jin, and Charles R. Nelson. 1999.
    "State-Space Models with Regime Switching:
    Classical and Gibbs-Sampling Approaches with Applications".
    MIT Press Books. The MIT Press.
    """
    def __init__(self, endog, k_regimes, trend='c', exog=None, order=0,
                 exog_tvtp=None, switching_trend=True, switching_exog=True,
                 switching_variance=False, dates=None, freq=None,
                 missing='none'):
        # Properties
        self.trend = trend
        self.switching_trend = switching_trend
        self.switching_exog = switching_exog
        self.switching_variance = switching_variance
        # Exogenous data
        self.k_exog, exog = markov_switching.prepare_exog(exog)
        # Trend
        nobs = len(endog)
        self.k_trend = 0
        self._k_exog = self.k_exog
        trend_exog = None
        if trend == 'c':
            trend_exog = np.ones((nobs, 1))
            self.k_trend = 1
        elif trend == 't':
            trend_exog = (np.arange(nobs) + 1)[:, np.newaxis]
            self.k_trend = 1
        elif trend == 'ct':
            trend_exog = np.c_[np.ones((nobs, 1)),
                               (np.arange(nobs) + 1)[:, np.newaxis]]
            self.k_trend = 2
        if trend_exog is not None:
            exog = trend_exog if exog is None else np.c_[trend_exog, exog]
            self._k_exog += self.k_trend
        # Initialize the base model
        super(MarkovRegression, self).__init__(
            endog, k_regimes, order=order, exog_tvtp=exog_tvtp, exog=exog,
            dates=dates, freq=freq, missing=missing)
        # Switching options
        if self.switching_trend is True or self.switching_trend is False:
            self.switching_trend = [self.switching_trend] * self.k_trend
        elif not len(self.switching_trend) == self.k_trend:
            raise ValueError('Invalid iterable passed to `switching_trend`.')
        if self.switching_exog is True or self.switching_exog is False:
            self.switching_exog = [self.switching_exog] * self.k_exog
        elif not len(self.switching_exog) == self.k_exog:
            raise ValueError('Invalid iterable passed to `switching_exog`.')
        self.switching_coeffs = (
            np.r_[self.switching_trend,
                  self.switching_exog].astype(bool).tolist())
        # Parameters
        self.parameters['exog'] = self.switching_coeffs
        self.parameters['variance'] = [1] if self.switching_variance else [0]
[docs]    def predict_conditional(self, params):
        """
        In-sample prediction, conditional on the current regime
        Parameters
        ----------
        params : array_like
            Array of parameters at which to perform prediction.
        Returns
        -------
        predict : array_like
            Array of predictions conditional on current, and possibly past,
            regimes
        """
        params = np.array(params, ndmin=1)
        # Since in the base model the values are the same across columns, we
        # only compute a single column, and then expand it below.
        predict = np.zeros((self.k_regimes, self.nobs), dtype=params.dtype)
        for i in range(self.k_regimes):
            # Predict
            if self._k_exog > 0:
                coeffs = params[self.parameters[i, 'exog']]
                predict[i] = np.dot(self.exog, coeffs)
        return predict[:, None, :] 
    def _resid(self, params):
        predict = np.repeat(self.predict_conditional(params),
                            self.k_regimes, axis=1)
        return self.endog - predict
    def _conditional_likelihoods(self, params):
        """
        Compute likelihoods conditional on the current period's regime
        """
        # Get residuals
        resid = self._resid(params)
        # Compute the conditional likelihoods
        variance = params[self.parameters['variance']].squeeze()
        if self.switching_variance:
            variance = np.reshape(variance, (self.k_regimes, 1, 1))
        conditional_likelihoods = (
            np.exp(-0.5 * resid**2 / variance) / np.sqrt(2 * np.pi * variance))
        return conditional_likelihoods
    @property
    def _res_classes(self):
        return {'fit': (MarkovRegressionResults,
                        MarkovRegressionResultsWrapper)}
    def _em_iteration(self, params0):
        """
        EM iteration
        Notes
        -----
        This uses the inherited _em_iteration method for computing the
        non-TVTP transition probabilities and then performs the EM step for
        regression coefficients and variances.
        """
        # Inherited parameters
        result, params1 = super(MarkovRegression, self)._em_iteration(params0)
        tmp = np.sqrt(result.smoothed_marginal_probabilities)
        # Regression coefficients
        coeffs = None
        if self._k_exog > 0:
            coeffs = self._em_exog(result, self.endog, self.exog,
                                   self.parameters.switching['exog'], tmp)
            for i in range(self.k_regimes):
                params1[self.parameters[i, 'exog']] = coeffs[i]
        # Variances
        params1[self.parameters['variance']] = self._em_variance(
            result, self.endog, self.exog, coeffs, tmp)
        # params1[self.parameters['variance']] = 0.33282116
        return result, params1
    def _em_exog(self, result, endog, exog, switching, tmp=None):
        """
        EM step for regression coefficients
        """
        k_exog = exog.shape[1]
        coeffs = np.zeros((self.k_regimes, k_exog))
        # First, estimate non-switching coefficients
        if not np.all(switching):
            nonswitching_exog = exog[:, ~switching]
            nonswitching_coeffs = (
                np.dot(np.linalg.pinv(nonswitching_exog), endog))
            coeffs[:, ~switching] = nonswitching_coeffs
            endog = endog - np.dot(nonswitching_exog, nonswitching_coeffs)
        # Next, get switching coefficients
        if np.any(switching):
            switching_exog = exog[:, switching]
            if tmp is None:
                tmp = np.sqrt(result.smoothed_marginal_probabilities)
            for i in range(self.k_regimes):
                tmp_endog = tmp[i] * endog
                tmp_exog = tmp[i][:, np.newaxis] * switching_exog
                coeffs[i, switching] = (
                    np.dot(np.linalg.pinv(tmp_exog), tmp_endog))
        return coeffs
    def _em_variance(self, result, endog, exog, betas, tmp=None):
        """
        EM step for variances
        """
        k_exog = 0 if exog is None else exog.shape[1]
        if self.switching_variance:
            variance = np.zeros(self.k_regimes)
            for i in range(self.k_regimes):
                if k_exog > 0:
                    resid = endog - np.dot(exog, betas[i])
                else:
                    resid = endog
                variance[i] = (
                    np.sum(resid**2 *
                           result.smoothed_marginal_probabilities[i]) /
                    np.sum(result.smoothed_marginal_probabilities[i]))
        else:
            variance = 0
            if tmp is None:
                tmp = np.sqrt(result.smoothed_marginal_probabilities)
            for i in range(self.k_regimes):
                tmp_endog = tmp[i] * endog
                if k_exog > 0:
                    tmp_exog = tmp[i][:, np.newaxis] * exog
                    resid = tmp_endog - np.dot(tmp_exog, betas[i])
                else:
                    resid = tmp_endog
                variance += np.sum(resid**2)
            variance /= self.nobs
        return variance
    @property
    def start_params(self):
        """
        (array) Starting parameters for maximum likelihood estimation.
        Notes
        -----
        These are not very sophisticated and / or good. We set equal transition
        probabilities and interpolate regression coefficients between zero and
        the OLS estimates, where the interpolation is based on the regime
        number. We rely heavily on the EM algorithm to quickly find much better
        starting parameters, which are then used by the typical scoring
        approach.
        """
        # Inherited parameters
        params = markov_switching.MarkovSwitching.start_params.fget(self)
        # Regression coefficients
        if self._k_exog > 0:
            beta = np.dot(np.linalg.pinv(self.exog), self.endog)
            variance = np.var(self.endog - np.dot(self.exog, beta))
            if np.any(self.switching_coeffs):
                for i in range(self.k_regimes):
                    params[self.parameters[i, 'exog']] = (
                        beta * (i / self.k_regimes))
            else:
                params[self.parameters['exog']] = beta
        else:
            variance = np.var(self.endog)
        # Variances
        if self.switching_variance:
            params[self.parameters['variance']] = (
                np.linspace(variance / 10., variance, num=self.k_regimes))
        else:
            params[self.parameters['variance']] = variance
        return params
    @property
    def param_names(self):
        """
        (list of str) List of human readable parameter names (for parameters
        actually included in the model).
        """
        # Inherited parameters
        param_names = np.array(
            markov_switching.MarkovSwitching.param_names.fget(self),
            dtype=object)
        # Regression coefficients
        if np.any(self.switching_coeffs):
            for i in range(self.k_regimes):
                param_names[self.parameters[i, 'exog']] = [
                    '%s[%d]' % (exog_name, i) for exog_name in self.exog_names]
        else:
            param_names[self.parameters['exog']] = self.exog_names
        # Variances
        if self.switching_variance:
            for i in range(self.k_regimes):
                param_names[self.parameters[i, 'variance']] = 'sigma2[%d]' % i
        else:
            param_names[self.parameters['variance']] = 'sigma2'
        return param_names.tolist()
 
class MarkovRegressionResults(markov_switching.MarkovSwitchingResults):
    r"""
    Class to hold results from fitting a Markov switching regression model
    Parameters
    ----------
    model : MarkovRegression instance
        The fitted model instance
    params : array
        Fitted parameters
    filter_results : HamiltonFilterResults or KimSmootherResults instance
        The underlying filter and, optionally, smoother output
    cov_type : string
        The type of covariance matrix estimator to use. Can be one of 'approx',
        'opg', 'robust', or 'none'.
    Attributes
    ----------
    model : Model instance
        A reference to the model that was fit.
    filter_results : HamiltonFilterResults or KimSmootherResults instance
        The underlying filter and, optionally, smoother output
    nobs : float
        The number of observations used to fit the model.
    params : array
        The parameters of the model.
    scale : float
        This is currently set to 1.0 and not used by the model or its results.
    """
    pass
class MarkovRegressionResultsWrapper(
        markov_switching.MarkovSwitchingResultsWrapper):
    pass
wrap.populate_wrapper(MarkovRegressionResultsWrapper,  # noqa:E305
                      MarkovRegressionResults)