"""
State Space Representation, Kalman Filter, Smoother, and Simulation Smoother
Author: Chad Fulton
License: Simplified-BSD
"""
import numpy as np
from .kalman_smoother import KalmanSmoother
from . import tools
SIMULATION_STATE = 0x01
SIMULATION_DISTURBANCE = 0x04
SIMULATION_ALL = (
SIMULATION_STATE | SIMULATION_DISTURBANCE
)
[docs]class SimulationSmoother(KalmanSmoother):
r"""
State space representation of a time series process, with Kalman filter
and smoother, and with simulation smoother.
Parameters
----------
k_endog : {array_like, int}
The observed time-series process :math:`y` if array like or the
number of variables in the process if an integer.
k_states : int
The dimension of the unobserved state process.
k_posdef : int, optional
The dimension of a guaranteed positive definite covariance matrix
describing the shocks in the measurement equation. Must be less than
or equal to `k_states`. Default is `k_states`.
simulation_smooth_results_class : class, optional
Default results class to use to save output of simulation smoothing.
Default is `SimulationSmoothResults`. If specified, class must extend
from `SimulationSmoothResults`.
simulation_smoother_classes : dict, optional
Dictionary with BLAS prefixes as keys and classes as values.
**kwargs
Keyword arguments may be used to provide default values for state space
matrices, for Kalman filtering options, for Kalman smoothing
options, or for Simulation smoothing options.
See `Representation`, `KalmanFilter`, and `KalmanSmoother` for more
details.
"""
simulation_outputs = [
'simulate_state', 'simulate_disturbance', 'simulate_all'
]
def __init__(self, k_endog, k_states, k_posdef=None,
simulation_smooth_results_class=None,
simulation_smoother_classes=None, **kwargs):
super(SimulationSmoother, self).__init__(
k_endog, k_states, k_posdef, **kwargs
)
if simulation_smooth_results_class is None:
simulation_smooth_results_class = SimulationSmoothResults
self.simulation_smooth_results_class = simulation_smooth_results_class
self.prefix_simulation_smoother_map = (
simulation_smoother_classes
if simulation_smoother_classes is not None
else tools.prefix_simulation_smoother_map.copy())
# Holder for an model-level simulation smoother objects, to use in
# simulating new time series.
self._simulators = {}
def get_simulation_output(self, simulation_output=None,
simulate_state=None, simulate_disturbance=None,
simulate_all=None, **kwargs):
r"""
Get simulation output bitmask
Helper method to get final simulation output bitmask from a set of
optional arguments including the bitmask itself and possibly boolean
flags.
Parameters
----------
simulation_output : int, optional
Simulation output bitmask. If this is specified, it is simply
returned and the other arguments are ignored.
simulate_state : bool, optional
Whether or not to include the state in the simulation output.
simulate_disturbance : bool, optional
Whether or not to include the state and observation disturbances
in the simulation output.
simulate_all : bool, optional
Whether or not to include all simulation output.
\*\*kwargs
Additional keyword arguments. Present so that calls to this method
can use \*\*kwargs without clearing out additional arguments.
"""
# If we do not explicitly have simulation_output, try to get it from
# kwargs
if simulation_output is None:
simulation_output = 0
if simulate_state:
simulation_output |= SIMULATION_STATE
if simulate_disturbance:
simulation_output |= SIMULATION_DISTURBANCE
if simulate_all:
simulation_output |= SIMULATION_ALL
# Handle case of no information in kwargs
if simulation_output == 0:
# If some arguments were passed, but we still do not have any
# simulation output, raise an exception
argument_set = not all([
simulate_state is None, simulate_disturbance is None,
simulate_all is None
])
if argument_set:
raise ValueError("Invalid simulation output options:"
" given options would result in no"
" output.")
# Otherwise set simulation output to be the same as smoother
# output
simulation_output = self.smoother_output
return simulation_output
def _simulate(self, nsimulations, measurement_shocks, state_shocks,
initial_state):
# Initialize the filter and representation
prefix, dtype, create_smoother, create_filter, create_statespace = (
self._initialize_smoother())
# Initialize the state
self._initialize_state(prefix=prefix)
# Create the simulator if necessary
if (prefix not in self._simulators or
not nsimulations == self._simulators[prefix].nobs):
simulation_output = 0
# Kalman smoother parameters
smoother_output = -1
# Kalman filter parameters
filter_method = self.filter_method
inversion_method = self.inversion_method
stability_method = self.stability_method
conserve_memory = self.conserve_memory
filter_timing = self.filter_timing
loglikelihood_burn = self.loglikelihood_burn
tolerance = self.tolerance
# Create a new simulation smoother object
cls = self.prefix_simulation_smoother_map[prefix]
self._simulators[prefix] = cls(
self._statespaces[prefix],
filter_method, inversion_method, stability_method,
conserve_memory, filter_timing, tolerance, loglikelihood_burn,
smoother_output, simulation_output, nsimulations
)
simulator = self._simulators[prefix]
# Set the disturbance variates
if measurement_shocks is not None and state_shocks is not None:
disturbance_variates = np.atleast_1d(np.array(
np.r_[measurement_shocks.ravel(), state_shocks.ravel()],
dtype=self.dtype
).squeeze())
simulator.set_disturbance_variates(disturbance_variates,
pretransformed=True)
elif measurement_shocks is None and state_shocks is None:
pass
elif measurement_shocks is not None:
raise ValueError('Must set `state_shocks` if `measurement_shocks`'
' is set.')
elif state_shocks is not None:
raise ValueError('Must set `measurement_shocks` if `state_shocks`'
' is set.')
# Set the intial state vector
initial_state = np.atleast_1d(np.array(
initial_state, dtype=self.dtype
).squeeze())
simulator.set_initial_state(initial_state)
# Perform simulation smoothing
# Note: simulation_output=-1 corresponds to whatever was setup when
# the simulation smoother was constructed
simulator.simulate(-1)
simulated_obs = np.array(simulator.generated_obs, copy=True)
simulated_state = np.array(simulator.generated_state, copy=True)
return (
simulated_obs[:, :nsimulations].T,
simulated_state[:, :nsimulations].T
)
[docs] def simulation_smoother(self, simulation_output=None,
results_class=None, prefix=None, **kwargs):
r"""
Retrieve a simulation smoother for the statespace model.
Parameters
----------
simulation_output : int, optional
Determines which simulation smoother output is calculated.
Default is all (including state and disturbances).
simulation_smooth_results_class : class, optional
Default results class to use to save output of simulation
smoothing. Default is `SimulationSmoothResults`. If specified,
class must extend from `SimulationSmoothResults`.
prefix : str
The prefix of the datatype. Usually only used internally.
**kwargs
Additional keyword arguments, used to set the simulation output.
See `set_simulation_output` for more details.
Returns
-------
SimulationSmoothResults
"""
# Set the class to be the default results class, if None provided
if results_class is None:
results_class = self.simulation_smooth_results_class
# Instantiate a new results object
if not issubclass(results_class, SimulationSmoothResults):
raise ValueError('Invalid results class provided.')
# Make sure we have the required Statespace representation
prefix, dtype, create_smoother, create_filter, create_statespace = (
self._initialize_smoother())
# Simulation smoother parameters
simulation_output = self.get_simulation_output(simulation_output,
**kwargs)
# Kalman smoother parameters
smoother_output = kwargs.get('smoother_output', simulation_output)
# Kalman filter parameters
filter_method = kwargs.get('filter_method', self.filter_method)
inversion_method = kwargs.get('inversion_method',
self.inversion_method)
stability_method = kwargs.get('stability_method',
self.stability_method)
conserve_memory = kwargs.get('conserve_memory',
self.conserve_memory)
filter_timing = kwargs.get('filter_timing',
self.filter_timing)
loglikelihood_burn = kwargs.get('loglikelihood_burn',
self.loglikelihood_burn)
tolerance = kwargs.get('tolerance', self.tolerance)
# Create a new simulation smoother object
cls = self.prefix_simulation_smoother_map[prefix]
simulation_smoother = cls(
self._statespaces[prefix],
filter_method, inversion_method, stability_method, conserve_memory,
filter_timing, tolerance, loglikelihood_burn, smoother_output,
simulation_output
)
# Create results object
results = results_class(self, simulation_smoother)
return results
[docs]class SimulationSmoothResults(object):
r"""
Results from applying the Kalman smoother and/or filter to a state space
model.
Parameters
----------
model : Representation
A Statespace representation
simulation_smoother : {{prefix}}SimulationSmoother object
The Cython simulation smoother object with which to simulation smooth.
Attributes
----------
model : Representation
A Statespace representation
dtype : dtype
Datatype of representation matrices
prefix : str
BLAS prefix of representation matrices
simulation_output : int
Bitmask controlling simulation output.
simulate_state : bool
Flag for if the state is included in simulation output.
simulate_disturbance : bool
Flag for if the state and observation disturbances are included in
simulation output.
simulate_all : bool
Flag for if simulation output should include everything.
generated_measurement_disturbance : array
Measurement disturbance variates used to genereate the observation
vector.
generated_state_disturbance : array
State disturbance variates used to genereate the state and
observation vectors.
generated_obs : array
Generated observation vector produced as a byproduct of simulation
smoothing.
generated_state : array
Generated state vector produced as a byproduct of simulation smoothing.
simulated_state : array
Simulated state.
simulated_measurement_disturbance : array
Simulated measurement disturbance.
simulated_state_disturbance : array
Simulated state disturbance.
"""
def __init__(self, model, simulation_smoother):
self.model = model
self.prefix = model.prefix
self.dtype = model.dtype
self._simulation_smoother = simulation_smoother
# Output
self._generated_measurement_disturbance = None
self._generated_state_disturbance = None
self._generated_obs = None
self._generated_state = None
self._simulated_state = None
self._simulated_measurement_disturbance = None
self._simulated_state_disturbance = None
@property
def simulation_output(self):
return self._simulation_smoother.simulation_output
@simulation_output.setter
def simulation_output(self, value):
self._simulation_smoother.simulation_output = value
@property
def simulate_state(self):
return bool(self.simulation_output & SIMULATION_STATE)
@simulate_state.setter
def simulate_state(self, value):
if bool(value):
self.simulation_output = self.simulation_output | SIMULATION_STATE
else:
self.simulation_output = self.simulation_output & ~SIMULATION_STATE
@property
def simulate_disturbance(self):
return bool(self.simulation_output & SIMULATION_DISTURBANCE)
@simulate_disturbance.setter
def simulate_disturbance(self, value):
if bool(value):
self.simulation_output = (
self.simulation_output | SIMULATION_DISTURBANCE)
else:
self.simulation_output = (
self.simulation_output & ~SIMULATION_DISTURBANCE)
@property
def simulate_all(self):
return bool(self.simulation_output & SIMULATION_ALL)
@simulate_all.setter
def simulate_all(self, value):
if bool(value):
self.simulation_output = self.simulation_output | SIMULATION_ALL
else:
self.simulation_output = self.simulation_output & ~SIMULATION_ALL
@property
def generated_measurement_disturbance(self):
r"""
Randomly drawn measurement disturbance variates
Used to construct `generated_obs`.
Notes
-----
.. math::
\varepsilon_t^+ ~ N(0, H_t)
If `disturbance_variates` were provided to the `simulate()` method,
then this returns those variates (which were N(0,1)) transformed to the
distribution above.
"""
if self._generated_measurement_disturbance is None:
end = self.model.nobs * self.model.k_endog
self._generated_measurement_disturbance = np.array(
self._simulation_smoother.disturbance_variates[:end],
copy=True).reshape(self.model.nobs, self.model.k_endog)
return self._generated_measurement_disturbance
@property
def generated_state_disturbance(self):
r"""
Randomly drawn state disturbance variates, used to construct
`generated_state` and `generated_obs`.
Notes
-----
.. math::
\eta_t^+ ~ N(0, Q_t)
If `disturbance_variates` were provided to the `simulate()` method,
then this returns those variates (which were N(0,1)) transformed to the
distribution above.
"""
if self._generated_state_disturbance is None:
start = self.model.nobs * self.model.k_endog
self._generated_state_disturbance = np.array(
self._simulation_smoother.disturbance_variates[start:],
copy=True).reshape(self.model.nobs, self.model.k_posdef)
return self._generated_state_disturbance
@property
def generated_obs(self):
r"""
Generated vector of observations by iterating on the observation and
transition equations, given a random initial state draw and random
disturbance draws.
Notes
-----
.. math::
y_t^+ = d_t + Z_t \alpha_t^+ + \varepsilon_t^+
"""
if self._generated_obs is None:
self._generated_obs = np.array(
self._simulation_smoother.generated_obs, copy=True
)
return self._generated_obs
@property
def generated_state(self):
r"""
Generated vector of states by iterating on the transition equation,
given a random initial state draw and random disturbance draws.
Notes
-----
.. math::
\alpha_{t+1}^+ = c_t + T_t \alpha_t^+ + \eta_t^+
"""
if self._generated_state is None:
self._generated_state = np.array(
self._simulation_smoother.generated_state, copy=True
)
return self._generated_state
@property
def simulated_state(self):
r"""
Random draw of the state vector from its conditional distribution.
Notes
-----
.. math::
\alpha ~ p(\alpha \mid Y_n)
"""
if self._simulated_state is None:
self._simulated_state = np.array(
self._simulation_smoother.simulated_state, copy=True
)
return self._simulated_state
@property
def simulated_measurement_disturbance(self):
r"""
Random draw of the measurement disturbance vector from its conditional
distribution.
Notes
-----
.. math::
\varepsilon ~ N(\hat \varepsilon, Var(\hat \varepsilon \mid Y_n))
"""
if self._simulated_measurement_disturbance is None:
self._simulated_measurement_disturbance = np.array(
self._simulation_smoother.simulated_measurement_disturbance,
copy=True
)
return self._simulated_measurement_disturbance
@property
def simulated_state_disturbance(self):
r"""
Random draw of the state disturbance vector from its conditional
distribution.
Notes
-----
.. math::
\eta ~ N(\hat \eta, Var(\hat \eta \mid Y_n))
"""
if self._simulated_state_disturbance is None:
self._simulated_state_disturbance = np.array(
self._simulation_smoother.simulated_state_disturbance,
copy=True
)
return self._simulated_state_disturbance
def simulate(self, simulation_output=-1, disturbance_variates=None,
initial_state_variates=None, pretransformed_variates=False):
r"""
Perform simulation smoothing
Does not return anything, but populates the object's `simulated_*`
attributes, as specified by simulation output.
Parameters
----------
simulation_output : int, optional
Bitmask controlling simulation output. Default is to use the
simulation output defined in object initialization.
disturbance_variates : array_likes, optional
Random values to use as disturbance variates, distributed standard
Normal. Usually only specified if results are to be replicated
(e.g. to enforce a seed) or for testing. If not specified, random
variates are drawn.
initial_state_variates : array_likes, optional
Random values to use as initial state variates. Usually only
specified if results are to be replicated (e.g. to enforce a seed)
or for testing. If not specified, random variates are drawn.
"""
# Clear any previous output
self._generated_measurement_disturbance = None
self._generated_state_disturbance = None
self._generated_state = None
self._generated_obs = None
self._generated_state = None
self._simulated_state = None
self._simulated_measurement_disturbance = None
self._simulated_state_disturbance = None
# Re-initialize the _statespace representation
prefix, dtype, create_smoother, create_filter, create_statespace = (
self.model._initialize_smoother())
# Initialize the state
self.model._initialize_state(prefix=prefix)
# Draw the (independent) random variates for disturbances in the
# simulation
if disturbance_variates is not None:
self._simulation_smoother.set_disturbance_variates(
np.array(disturbance_variates, dtype=self.dtype),
pretransformed=pretransformed_variates
)
else:
self._simulation_smoother.draw_disturbance_variates()
# Draw the (independent) random variates for the initial states in the
# simulation
if initial_state_variates is not None:
self._simulation_smoother.set_initial_state_variates(
np.array(initial_state_variates, dtype=self.dtype),
pretransformed=pretransformed_variates
)
else:
self._simulation_smoother.draw_initial_state_variates()
# Perform simulation smoothing
# Note: simulation_output=-1 corresponds to whatever was setup when
# the simulation smoother was constructed
self._simulation_smoother.simulate(simulation_output)