Source code for statsmodels.tsa.statespace.exponential_smoothing

"""
Linear exponential smoothing models

Author: Chad Fulton
License: BSD-3
"""

import numpy as np
import pandas as pd
from statsmodels.base.data import PandasData

from statsmodels.genmod.generalized_linear_model import GLM
from statsmodels.tools.validation import (array_like, bool_like, float_like,
                                          string_like, int_like)

from statsmodels.tsa.exponential_smoothing import initialization as es_init
from statsmodels.tsa.statespace import initialization as ss_init
from statsmodels.tsa.statespace.kalman_filter import (
    MEMORY_CONSERVE, MEMORY_NO_FORECAST)

from statsmodels.compat.pandas import Appender
import statsmodels.base.wrapper as wrap

from statsmodels.iolib.summary import forg
from statsmodels.iolib.table import SimpleTable
from statsmodels.iolib.tableformatting import fmt_params

from .mlemodel import MLEModel, MLEResults, MLEResultsWrapper


[docs]class ExponentialSmoothing(MLEModel): """ Linear exponential smoothing models Parameters ---------- endog : array_like The observed time-series process :math:`y` trend : bool, optional Whether or not to include a trend component. Default is False. damped_trend : bool, optional Whether or not an included trend component is damped. Default is False. seasonal : int, optional The number of periods in a complete seasonal cycle for seasonal (Holt-Winters) models. For example, 4 for quarterly data with an annual cycle or 7 for daily data with a weekly cycle. Default is no seasonal effects. initialization_method : str, optional Method for initialize the recursions. One of: * 'estimated' * 'concentrated' * 'heuristic' * 'known' If 'known' initialization is used, then `initial_level` must be passed, as well as `initial_slope` and `initial_seasonal` if applicable. Default is 'estimated'. initial_level : float, optional The initial level component. Only used if initialization is 'known'. initial_trend : float, optional The initial trend component. Only used if initialization is 'known'. initial_seasonal : array_like, optional The initial seasonal component. An array of length `seasonal` or length `seasonal - 1` (in which case the last initial value is computed to make the average effect zero). Only used if initialization is 'known'. bounds : iterable[tuple], optional An iterable containing bounds for the parameters. Must contain four elements, where each element is a tuple of the form (lower, upper). Default is (0.0001, 0.9999) for the level, trend, and seasonal smoothing parameters and (0.8, 0.98) for the trend damping parameter. concentrate_scale : bool, optional Whether or not to concentrate the scale (variance of the error term) out of the likelihood. Notes ----- The parameters and states of this model are estimated by setting up the exponential smoothing equations as a special case of a linear Gaussian state space model and applying the Kalman filter. As such, it has slightly worse performance than the dedicated exponential smoothing model, `sm.tsa.ExponentialSmoothing`, and it does not support multiplicative (nonlinear) exponential smoothing models. However, as a subclass of the state space models, this model class shares a consistent set of functionality with those models, which can make it easier to work with. In addition, it supports computing confidence intervals for forecasts and it supports concentrating the initial state out of the likelihood function. References ---------- [1] Hyndman, Rob, Anne B. Koehler, J. Keith Ord, and Ralph D. Snyder. Forecasting with exponential smoothing: the state space approach. Springer Science & Business Media, 2008. """ def __init__(self, endog, trend=False, damped_trend=False, seasonal=None, initialization_method='estimated', initial_level=None, initial_trend=None, initial_seasonal=None, bounds=None, concentrate_scale=True, dates=None, freq=None, missing='none'): # Model definition self.trend = bool_like(trend, 'trend') self.damped_trend = bool_like(damped_trend, 'damped_trend') self.seasonal_periods = int_like(seasonal, 'seasonal', optional=True) self.seasonal = self.seasonal_periods is not None self.initialization_method = string_like( initialization_method, 'initialization_method').lower() self.concentrate_scale = bool_like(concentrate_scale, 'concentrate_scale') # TODO: add validation for bounds (e.g. have all bounds, upper > lower) # TODO: add `bounds_method` argument to choose between "usual" and # "admissible" as in Hyndman et al. (2008) self.bounds = bounds if self.bounds is None: self.bounds = [(1e-4, 1-1e-4)] * 3 + [(0.8, 0.98)] # Validation if self.seasonal_periods == 1: raise ValueError('Cannot have a seasonal period of 1.') if self.seasonal and self.seasonal_periods is None: raise NotImplementedError('Unable to detect season automatically;' ' please specify `seasonal_periods`.') if self.initialization_method not in ['concentrated', 'estimated', 'simple', 'heuristic', 'known']: raise ValueError('Invalid initialization method "%s".' % initialization_method) if self.initialization_method == 'known': if initial_level is None: raise ValueError('`initial_level` argument must be provided' ' when initialization method is set to' ' "known".') if initial_trend is None and self.trend: raise ValueError('`initial_trend` argument must be provided' ' for models with a trend component when' ' initialization method is set to "known".') if initial_seasonal is None and self.seasonal: raise ValueError('`initial_seasonal` argument must be provided' ' for models with a seasonal component when' ' initialization method is set to "known".') # Initialize the state space model if not self.seasonal or self.seasonal_periods is None: self._seasonal_periods = 0 else: self._seasonal_periods = self.seasonal_periods k_states = 2 + int(self.trend) + self._seasonal_periods k_posdef = 1 init = ss_init.Initialization(k_states, 'known', constant=[0] * k_states) super(ExponentialSmoothing, self).__init__( endog, k_states=k_states, k_posdef=k_posdef, initialization=init, dates=dates, freq=freq, missing=missing) # Concentrate the scale out of the likelihood function if self.concentrate_scale: self.ssm.filter_concentrated = True # Setup fixed elements of the system matrices # Observation error self.ssm['design', 0, 0] = 1. self.ssm['selection', 0, 0] = 1. self.ssm['state_cov', 0, 0] = 1. # Level self.ssm['design', 0, 1] = 1. self.ssm['transition', 1, 1] = 1. # Trend if self.trend: self.ssm['transition', 1:3, 2] = 1. # Seasonal if self.seasonal: k = 2 + int(self.trend) self.ssm['design', 0, k] = 1. self.ssm['transition', k, -1] = 1. self.ssm['transition', k + 1:k_states, k:k_states - 1] = ( np.eye(self.seasonal_periods - 1)) # Initialization of the states if self.initialization_method != 'known': msg = ('Cannot give `%%s` argument when initialization is "%s"' % initialization_method) if initial_level is not None: raise ValueError(msg % 'initial_level') if initial_trend is not None: raise ValueError(msg % 'initial_trend') if initial_seasonal is not None: raise ValueError(msg % 'initial_seasonal') if self.initialization_method == 'simple': initial_level, initial_trend, initial_seasonal = ( es_init._initialization_simple( self.endog[:, 0], trend='add' if self.trend else None, seasonal='add' if self.seasonal else None, seasonal_periods=self.seasonal_periods)) elif self.initialization_method == 'heuristic': initial_level, initial_trend, initial_seasonal = ( es_init._initialization_heuristic( self.endog[:, 0], trend='add' if self.trend else None, seasonal='add' if self.seasonal else None, seasonal_periods=self.seasonal_periods)) elif self.initialization_method == 'known': initial_level = float_like(initial_level, 'initial_level') if self.trend: initial_trend = float_like(initial_trend, 'initial_trend') if self.seasonal: initial_seasonal = array_like(initial_seasonal, 'initial_seasonal') if len(initial_seasonal) == self.seasonal_periods - 1: initial_seasonal = np.r_[initial_seasonal, 0 - np.sum(initial_seasonal)] if len(initial_seasonal) != self.seasonal_periods: raise ValueError( 'Invalid length of initial seasonal values. Must be' ' one of s or s-1, where s is the number of seasonal' ' periods.') self._initial_level = initial_level self._initial_trend = initial_trend self._initial_seasonal = initial_seasonal self._initial_state = None # Initialize now if possible (if we have a damped trend, then # initialization will depend on the phi parameter, and so has to be # done at each `update`) methods = ['simple', 'heuristic', 'known'] if not self.damped_trend and self.initialization_method in methods: self._initialize_constant_statespace(initial_level, initial_trend, initial_seasonal) # Save keys for kwarg initialization self._init_keys += ['trend', 'damped_trend', 'seasonal', 'initialization_method', 'initial_level', 'initial_trend', 'initial_seasonal', 'bounds', 'concentrate_scale', 'dates', 'freq', 'missing'] def _get_init_kwds(self): kwds = super()._get_init_kwds() kwds['seasonal'] = self.seasonal_periods return kwds @property def _res_classes(self): return {'fit': (ExponentialSmoothingResults, ExponentialSmoothingResultsWrapper)} def clone(self, endog, exog=None, **kwargs): if exog is not None: raise NotImplementedError( 'ExponentialSmoothing does not support `exog`.') return self._clone_from_init_kwds(endog, **kwargs) @property def state_names(self): state_names = ['error', 'level'] if self.trend: state_names += ['trend'] if self.seasonal: state_names += ['seasonal.%d' % i for i in range(self.seasonal_periods)] return state_names @property def param_names(self): param_names = ['smoothing_level'] if self.trend: param_names += ['smoothing_trend'] if self.seasonal: param_names += ['smoothing_seasonal'] if self.damped_trend: param_names += ['damping_trend'] if not self.concentrate_scale: param_names += ['sigma2'] # Initialization if self.initialization_method == 'estimated': param_names += ['initial_level'] if self.trend: param_names += ['initial_trend'] if self.seasonal: param_names += ['initial_seasonal.%d' % i for i in range(self.seasonal_periods - 1)] return param_names @property def start_params(self): # Make sure starting parameters aren't beyond or right on the bounds bounds = [(x[0] + 1e-3, x[1] - 1e-3) for x in self.bounds] # See Hyndman p.24 start_params = [np.clip(0.1, *bounds[0])] if self.trend: start_params += [np.clip(0.01, *bounds[1])] if self.seasonal: start_params += [np.clip(0.01, *bounds[2])] if self.damped_trend: start_params += [np.clip(0.98, *bounds[3])] if not self.concentrate_scale: start_params += [np.var(self.endog)] # Initialization if self.initialization_method == 'estimated': initial_level, initial_trend, initial_seasonal = ( es_init._initialization_simple( self.endog[:, 0], trend='add' if self.trend else None, seasonal='add' if self.seasonal else None, seasonal_periods=self.seasonal_periods)) start_params += [initial_level] if self.trend: start_params += [initial_trend] if self.seasonal: start_params += initial_seasonal.tolist()[:-1] return np.array(start_params) @property def k_params(self): k_params = ( 1 + int(self.trend) + int(self.seasonal) + int(not self.concentrate_scale) + int(self.damped_trend)) if self.initialization_method == 'estimated': k_params += ( 1 + int(self.trend) + int(self.seasonal) * (self._seasonal_periods - 1)) return k_params def transform_params(self, unconstrained): unconstrained = np.array(unconstrained, ndmin=1) constrained = np.zeros_like(unconstrained) # Alpha in (0, 1) low, high = self.bounds[0] constrained[0] = ( 1 / (1 + np.exp(-unconstrained[0])) * (high - low) + low) i = 1 # Beta in (0, alpha) if self.trend: low, high = self.bounds[1] high = min(high, constrained[0]) constrained[i] = ( 1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low) i += 1 # Gamma in (0, 1 - alpha) if self.seasonal: low, high = self.bounds[2] high = min(high, 1 - constrained[0]) constrained[i] = ( 1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low) i += 1 # Phi in bounds (e.g. default is [0.8, 0.98]) if self.damped_trend: low, high = self.bounds[3] constrained[i] = ( 1 / (1 + np.exp(-unconstrained[i])) * (high - low) + low) i += 1 # sigma^2 positive if not self.concentrate_scale: constrained[i] = unconstrained[i]**2 i += 1 # Initial parameters are as-is if self.initialization_method == 'estimated': constrained[i:] = unconstrained[i:] return constrained def untransform_params(self, constrained): constrained = np.array(constrained, ndmin=1) unconstrained = np.zeros_like(constrained) # Alpha in (0, 1) low, high = self.bounds[0] tmp = (constrained[0] - low) / (high - low) unconstrained[0] = np.log(tmp / (1 - tmp)) i = 1 # Beta in (0, alpha) if self.trend: low, high = self.bounds[1] high = min(high, constrained[0]) tmp = (constrained[i] - low) / (high - low) unconstrained[i] = np.log(tmp / (1 - tmp)) i += 1 # Gamma in (0, 1 - alpha) if self.seasonal: low, high = self.bounds[2] high = min(high, 1 - constrained[0]) tmp = (constrained[i] - low) / (high - low) unconstrained[i] = np.log(tmp / (1 - tmp)) i += 1 # Phi in bounds (e.g. default is [0.8, 0.98]) if self.damped_trend: low, high = self.bounds[3] tmp = (constrained[i] - low) / (high - low) unconstrained[i] = np.log(tmp / (1 - tmp)) i += 1 # sigma^2 positive if not self.concentrate_scale: unconstrained[i] = constrained[i]**0.5 i += 1 # Initial parameters are as-is if self.initialization_method == 'estimated': unconstrained[i:] = constrained[i:] return unconstrained def _initialize_constant_statespace(self, initial_level, initial_trend=None, initial_seasonal=None): # Note: this should be run after `update` has already put any new # parameters into the transition matrix, since it uses the transition # matrix explicitly. # Due to timing differences, the state space representation integrates # the trend into the level in the "predicted_state" (only the # "filtered_state" corresponds to the timing of the exponential # smoothing models) # Initial values are interpreted as "filtered" values constant = np.array([0., initial_level]) if self.trend and initial_trend is not None: constant = np.r_[constant, initial_trend] if self.seasonal and initial_seasonal is not None: constant = np.r_[constant, initial_seasonal] self._initial_state = constant[1:] # Apply the prediction step to get to what we need for our Kalman # filter implementation constant = np.dot(self.ssm['transition'], constant) self.initialization.constant = constant def _initialize_stationary_cov_statespace(self): R = self.ssm['selection'] Q = self.ssm['state_cov'] self.initialization.stationary_cov = R.dot(Q).dot(R.T) def update(self, params, transformed=True, includes_fixed=False, complex_step=False): params = self.handle_params(params, transformed=transformed, includes_fixed=includes_fixed) # State space system matrices self.ssm['selection', 0, 0] = 1 - params[0] self.ssm['selection', 1, 0] = params[0] i = 1 if self.trend: self.ssm['selection', 2, 0] = params[i] i += 1 if self.seasonal: self.ssm['selection', 0, 0] -= params[i] self.ssm['selection', i + 1, 0] = params[i] i += 1 if self.damped_trend: self.ssm['transition', 1:3, 2] = params[i] i += 1 if not self.concentrate_scale: self.ssm['state_cov', 0, 0] = params[i] i += 1 # State initialization if self.initialization_method == 'estimated': initial_level = params[i] i += 1 initial_trend = None initial_seasonal = None if self.trend: initial_trend = params[i] i += 1 if self.seasonal: initial_seasonal = params[i: i + self.seasonal_periods - 1] initial_seasonal = np.r_[initial_seasonal, 0 - np.sum(initial_seasonal)] self._initialize_constant_statespace(initial_level, initial_trend, initial_seasonal) methods = ['simple', 'heuristic', 'known'] if self.damped_trend and self.initialization_method in methods: self._initialize_constant_statespace( self._initial_level, self._initial_trend, self._initial_seasonal) self._initialize_stationary_cov_statespace() def _compute_concentrated_states(self, params, *args, **kwargs): # Apply the usual filter, but keep forecasts kwargs['conserve_memory'] = MEMORY_CONSERVE & ~MEMORY_NO_FORECAST super().loglike(params, *args, **kwargs) # Compute the initial state vector y_tilde = np.array(self.ssm._kalman_filter.forecast_error[0], copy=True) # Need to modify our state space system matrices slightly to get them # back into the form of the innovations framework of # De Livera et al. (2011) T = self['transition', 1:, 1:] R = self['selection', 1:] Z = self['design', :, 1:].copy() i = 1 if self.trend: Z[0, i] = 1. i += 1 if self.seasonal: Z[0, i] = 0. Z[0, -1] = 1. # Now compute the regression components as described in # De Livera et al. (2011), equation (10). D = T - R.dot(Z) w = np.zeros((self.nobs, self.k_states - 1), dtype=D.dtype) w[0] = Z for i in range(self.nobs - 1): w[i + 1] = w[i].dot(D) mod_ols = GLM(y_tilde, w) # If we have seasonal parameters, constrain them to sum to zero # (otherwise the initial level gets confounded with the sum of the # seasonals). if self.seasonal: R = np.zeros_like(Z) R[0, -self.seasonal_periods:] = 1. q = np.zeros((1, 1)) res_ols = mod_ols.fit_constrained((R, q)) else: res_ols = mod_ols.fit() # Separate into individual components initial_level = res_ols.params[0] initial_trend = res_ols.params[1] if self.trend else None initial_seasonal = ( res_ols.params[-self.seasonal_periods:] if self.seasonal else None) return initial_level, initial_trend, initial_seasonal @Appender(MLEModel.loglike.__doc__) def loglike(self, params, *args, **kwargs): if self.initialization_method == 'concentrated': self._initialize_constant_statespace( *self._compute_concentrated_states(params, *args, **kwargs)) llf = self.ssm.loglike() self.ssm.initialization.constant = np.zeros(self.k_states) else: llf = super().loglike(params, *args, **kwargs) return llf @Appender(MLEModel.filter.__doc__) def filter(self, params, cov_type=None, cov_kwds=None, return_ssm=False, results_class=None, results_wrapper_class=None, *args, **kwargs): if self.initialization_method == 'concentrated': self._initialize_constant_statespace( *self._compute_concentrated_states(params, *args, **kwargs)) results = super().filter( params, cov_type=cov_type, cov_kwds=cov_kwds, return_ssm=return_ssm, results_class=results_class, results_wrapper_class=results_wrapper_class, *args, **kwargs) if self.initialization_method == 'concentrated': self.ssm.initialization.constant = np.zeros(self.k_states) return results @Appender(MLEModel.smooth.__doc__) def smooth(self, params, cov_type=None, cov_kwds=None, return_ssm=False, results_class=None, results_wrapper_class=None, *args, **kwargs): if self.initialization_method == 'concentrated': self._initialize_constant_statespace( *self._compute_concentrated_states(params, *args, **kwargs)) results = super().smooth( params, cov_type=cov_type, cov_kwds=cov_kwds, return_ssm=return_ssm, results_class=results_class, results_wrapper_class=results_wrapper_class, *args, **kwargs) if self.initialization_method == 'concentrated': self.ssm.initialization.constant = np.zeros(self.k_states) return results
[docs]class ExponentialSmoothingResults(MLEResults): def __init__(self, model, params, filter_results, cov_type=None, **kwargs): super().__init__(model, params, filter_results, cov_type, **kwargs) # Save the states self.initial_state = model._initial_state if isinstance(self.data, PandasData): index = self.data.row_labels self.initial_state = pd.DataFrame( [model._initial_state], columns=model.state_names[1:]) if model._index_dates and model._index_freq is not None: self.initial_state.index = index.shift(-1)[:1] @Appender(MLEResults.summary.__doc__) def summary(self, alpha=.05, start=None): specification = ['A'] if self.model.trend and self.model.damped_trend: specification.append('Ad') elif self.model.trend: specification.append('A') else: specification.append('N') if self.model.seasonal: specification.append('A') else: specification.append('N') model_name = 'ETS(' + ', '.join(specification) + ')' summary = super(ExponentialSmoothingResults, self).summary( alpha=alpha, start=start, title='Exponential Smoothing Results', model_name=model_name) if self.model.initialization_method != 'estimated': params = np.array(self.initial_state) if params.ndim > 1: params = params[0] names = self.model.state_names param_header = ['initialization method: %s' % self.model.initialization_method] params_stubs = names params_data = [[forg(params[i], prec=4)] for i in range(len(params))] initial_state_table = SimpleTable(params_data, param_header, params_stubs, txt_fmt=fmt_params) summary.tables.insert(-1, initial_state_table) return summary
class ExponentialSmoothingResultsWrapper(MLEResultsWrapper): _attrs = {} _wrap_attrs = wrap.union_dicts(MLEResultsWrapper._wrap_attrs, _attrs) _methods = {} _wrap_methods = wrap.union_dicts(MLEResultsWrapper._wrap_methods, _methods) wrap.populate_wrapper(ExponentialSmoothingResultsWrapper, # noqa:E305 ExponentialSmoothingResults)