Files
2025-08-01 04:33:03 -04:00

585 lines
20 KiB
Python

# -*- coding: utf-8 -*-
#
# Methods for optimizing auto-arima
from numpy.linalg import LinAlgError
import numpy as np
from datetime import datetime
from joblib import Parallel, delayed
from sklearn.utils import check_random_state
import abc
import functools
import time
import warnings
import traceback
from .arima import ARIMA
from ..warnings import ModelFitWarning
from ._context import ContextType, ContextStore
from . import _validation
from ..compat import statsmodels as sm_compat
def _root_test(model, ic, trace):
"""
Check the roots of the new model, and set IC to inf if the roots are
near non-invertible. This is a little bit different than how Rob does it:
https://github.com/robjhyndman/forecast/blob/master/R/newarima2.R#L780
In our test, we look directly at the inverse roots to see if they come
anywhere near the unit circle border
"""
max_invroot = 0
p, d, q = model.order
P, D, Q, m = model.seasonal_order
if p + P > 0:
max_invroot = max(0, *np.abs(1 / model.arroots()))
if q + Q > 0 and np.isfinite(ic):
max_invroot = max(0, *np.abs(1 / model.maroots()))
if max_invroot > 1 - 1e-2:
ic = np.inf
if trace > 1:
print(
"Near non-invertible roots for order "
"(%i, %i, %i)(%i, %i, %i, %i); setting score to inf (at "
"least one inverse root too close to the border of the "
"unit circle: %.3f)"
% (p, d, q, P, D, Q, m, max_invroot))
return ic
class _SolverMixin(metaclass=abc.ABCMeta):
"""The solver interface implemented by wrapper classes"""
@abc.abstractmethod
def solve(self):
"""Must be implemented by subclasses"""
class _RandomFitWrapper(_SolverMixin):
"""Searches for the best model using a random search"""
def __init__(self, y, X, fit_partial, d, D, m, max_order,
max_p, max_q, max_P, max_Q, random, random_state,
n_fits, n_jobs, seasonal, trace, with_intercept,
sarimax_kwargs):
# NOTE: pre-1.5.2, we started at start_p, start_q, etc. However, when
# using stepwise=FALSE in R, hyndman starts at 0. He only uses start_*
# when stepwise=TRUE.
# generate the set of (p, q, P, Q) FIRST, since it is contingent
# on whether or not the user is interested in a seasonal ARIMA result.
# This will reduce the search space for non-seasonal ARIMA models.
# loop p, q. Make sure to loop at +1 interval,
# since max_{p|q} is inclusive.
if seasonal:
gen = (
((p, d, q), (P, D, Q, m))
for p in range(0, max_p + 1)
for q in range(0, max_q + 1)
for P in range(0, max_P + 1)
for Q in range(0, max_Q + 1)
if p + q + P + Q <= max_order
)
else:
# otherwise it's not seasonal and we don't need the seasonal pieces
gen = (
((p, d, q), (0, 0, 0, 0))
for p in range(0, max_p + 1)
for q in range(0, max_q + 1)
if p + q <= max_order
)
# if we are fitting a random search rather than an exhaustive one, we
# will scramble up the generator (as a list) and only fit n_fits ARIMAs
if random:
random_state = check_random_state(random_state)
# make a list to scramble... `gen` may have a ragged nested
# sequence, so we have to explicitly use dtype='object', otherwise
# it will raise a ValueError on numpy >= 1.24
gen = random_state.permutation(np.array(list(gen), dtype='object'))[:n_fits] # noqa: E501
self.gen = gen
self.n_jobs = n_jobs
self.trace = trace
# New partial containing y, X
self.fit_partial = functools.partial(
fit_partial,
y=y,
X=X,
with_intercept=with_intercept,
**sarimax_kwargs,
)
def solve(self):
"""Do a random search"""
fit_partial = self.fit_partial
n_jobs = self.n_jobs
gen = self.gen
# get results in parallel
all_res = Parallel(n_jobs=n_jobs)(
delayed(fit_partial)(
order=order,
seasonal_order=seasonal_order,
)
for order, seasonal_order in gen
)
sorted_fits = _sort_and_filter_fits(all_res)
if self.trace and sorted_fits:
print(f"\nBest model: {str(sorted_fits[0])}")
return sorted_fits
class _StepwiseFitWrapper(_SolverMixin):
"""Searches for the best model using the stepwise algorithm.
The stepwise algorithm fluctuates the more sensitive pieces of the ARIMA
(the seasonal components) first, adjusting towards the direction of the
smaller {A|B|HQ}IC(c), and continues to step down as long as the error
shrinks. As long as the error term decreases and the best parameters have
not shifted to a point where they can no longer change, ``k`` will
increase, and the models will continue to be fit until the ``max_k`` is
reached.
References
----------
.. [1] R's auto-arima stepwise source code: https://github.com/robjhyndman/forecast/blob/30308a4e314ff29338291462e81bf68ff0c5f86d/R/newarima2.R#L366
.. [2] https://robjhyndman.com/hyndsight/arma-roots/
""" # noqa
def __init__(self, y, X, start_params, trend, method, maxiter,
fit_params, suppress_warnings, trace, error_action,
out_of_sample_size, scoring, scoring_args,
p, d, q, P, D, Q, m, max_p, max_q, max_P, max_Q, seasonal,
information_criterion, with_intercept, **kwargs):
self.trace = _validation.check_trace(trace)
# Create a partial of the fit call so we don't have arg bloat all over
self._fit_arima = functools.partial(
_fit_candidate_model,
y=y,
X=X,
start_params=start_params,
trend=trend,
method=method,
maxiter=maxiter,
fit_params=fit_params,
suppress_warnings=suppress_warnings,
trace=self.trace,
error_action=error_action,
out_of_sample_size=out_of_sample_size,
scoring=scoring,
scoring_args=scoring_args,
information_criterion=information_criterion,
**kwargs)
self.information_criterion = information_criterion
self.with_intercept = with_intercept
# order stuff we will be incrementing
self.p = p
self.d = d
self.q = q
self.P = P
self.D = D
self.Q = Q
self.m = m
self.max_p = max_p
self.max_q = max_q
self.max_P = max_P
self.max_Q = max_Q
self.seasonal = seasonal
# execution context passed through the context manager
self.exec_context = ContextStore.get_or_empty(ContextType.STEPWISE)
# other internal start vars
self.k = self.start_k = 0
self.max_k = 100 if self.exec_context.max_steps is None \
else self.exec_context.max_steps
self.max_dur = self.exec_context.max_dur
# results list to store intermittent hashes of orders to determine if
# we've seen this order before...
self.results_dict = dict() # dict[tuple -> ARIMA]
self.ic_dict = dict() # dict[tuple -> float]
self.fit_time_dict = dict() # dict[tuple -> float]
self.bestfit = None
self.bestfit_key = None # (order, seasonal_order, constant)
def _do_fit(self, order, seasonal_order, constant=None):
"""Do a fit and determine whether the model is better"""
if not self.seasonal:
seasonal_order = (0, 0, 0, 0)
seasonal_order = sm_compat.check_seasonal_order(seasonal_order)
# we might be fitting without a constant
if constant is None:
constant = self.with_intercept
if (order, seasonal_order, constant) not in self.results_dict:
# increment the number of fits
self.k += 1
fit, fit_time, new_ic = self._fit_arima(
order=order,
seasonal_order=seasonal_order,
with_intercept=constant)
# use the orders as a key to be hashed for
# the dictionary (pointing to fit)
self.results_dict[(order, seasonal_order, constant)] = fit
# cache this so we can lookup best model IC downstream
self.ic_dict[(order, seasonal_order, constant)] = new_ic
self.fit_time_dict[(order, seasonal_order, constant)] = fit_time
# Determine if the new fit is better than the existing fit
if fit is None or np.isinf(new_ic):
return False
# no benchmark model
if self.bestfit is None:
self.bestfit = fit
self.bestfit_key = (order, seasonal_order, constant)
if self.trace > 1:
print("First viable model found (%.3f)" % new_ic)
return True
# otherwise there's a current best
current_ic = self.ic_dict[self.bestfit_key]
if new_ic < current_ic:
if self.trace > 1:
print("New best model found (%.3f < %.3f)"
% (new_ic, current_ic))
self.bestfit = fit
self.bestfit_key = (order, seasonal_order, constant)
return True
# we've seen this model before
return False
def solve(self):
start_time = datetime.now()
p, d, q = self.p, self.d, self.q
P, D, Q, m = self.P, self.D, self.Q, self.m
max_p, max_q = self.max_p, self.max_q
max_P, max_Q = self.max_P, self.max_Q
if self.trace:
print("Performing stepwise search to minimize %s"
% self.information_criterion)
# fit a baseline p, d, q model
self._do_fit((p, d, q), (P, D, Q, m))
# null model with possible constant
if self._do_fit((0, d, 0), (0, D, 0, m)):
p = q = P = Q = 0
# A basic AR model
if max_p > 0 or max_P > 0:
_p = 1 if max_p > 0 else 0
_P = 1 if (m > 1 and max_P > 0) else 0
if self._do_fit((_p, d, 0), (_P, D, 0, m)):
p = _p
P = _P
q = Q = 0
# Basic MA model
if max_q > 0 or max_Q > 0:
_q = 1 if max_q > 0 else 0
_Q = 1 if (m > 1 and max_Q > 0) else 0
if self._do_fit((0, d, _q), (0, D, _Q, m)):
p = P = 0
Q = _Q
q = _q
# Null model with NO constant (if we haven't tried it yet)
if self.with_intercept:
if self._do_fit((0, d, 0), (0, D, 0, m), constant=False):
p = q = P = Q = 0
while self.start_k < self.k < self.max_k:
self.start_k = self.k
# break loop if execution time exceeds the timeout threshold. needs
# to be at front of loop, since a single pass may reach termination
# criteria by end and we only want to warn and break if the loop is
# continuing again
dur = (datetime.now() - start_time).total_seconds()
if self.max_dur and dur > self.max_dur:
warnings.warn('early termination of stepwise search due to '
'max_dur threshold (%.3f > %.3f)'
% (dur, self.max_dur))
break
# NOTE: k changes for every fit, so we might need to bail halfway
# through the loop, hence the multiple checks.
if P > 0 and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P - 1, D, Q, m)):
P -= 1
continue
if Q > 0 and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P, D, Q - 1, m)):
Q -= 1
continue
if P < max_P and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P + 1, D, Q, m)):
P += 1
continue
if Q < max_Q and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P, D, Q + 1, m)):
Q += 1
continue
if Q > 0 and P > 0 and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P - 1, D, Q - 1, m)):
Q -= 1
P -= 1
continue
if Q < max_Q and P > 0 and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P - 1, D, Q + 1, m)):
Q += 1
P -= 1
continue
if Q > 0 and P < max_P and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P + 1, D, Q - 1, m)):
Q -= 1
P += 1
continue
if Q < max_Q and P < max_P and \
self.k < self.max_k and \
self._do_fit((p, d, q), (P + 1, D, Q + 1, m)):
Q += 1
P += 1
continue
if p > 0 and \
self.k < self.max_k and \
self._do_fit((p - 1, d, q), (P, D, Q, m)):
p -= 1
continue
if q > 0 and \
self.k < self.max_k and \
self._do_fit((p, d, q - 1), (P, D, Q, m)):
q -= 1
continue
if p < max_p and \
self.k < self.max_k and \
self._do_fit((p + 1, d, q), (P, D, Q, m)):
p += 1
continue
if q < max_q and \
self.k < self.max_k and \
self._do_fit((p, d, q + 1), (P, D, Q, m)):
q += 1
continue
if q > 0 and p > 0 and \
self.k < self.max_k and \
self._do_fit((p - 1, d, q - 1), (P, D, Q, m)):
q -= 1
p -= 1
continue
if q < max_q and p > 0 and \
self.k < self.max_k and \
self._do_fit((p - 1, d, q + 1), (P, D, Q, m)):
q += 1
p -= 1
continue
if q > 0 and p < max_p and \
self.k < self.max_k and \
self._do_fit((p + 1, d, q - 1), (P, D, Q, m)):
q -= 1
p += 1
continue
if q < max_q and p < max_p and \
self.k < self.max_k and \
self._do_fit((p + 1, d, q + 1), (P, D, Q, m)):
q += 1
p += 1
continue
# R: if (allowdrift || allowmean)
# we don't have these args, so we just default this case to true to
# evaluate all corners
if self.k < self.max_k and \
self._do_fit((p, d, q),
(P, D, Q, m),
constant=not self.with_intercept):
self.with_intercept = not self.with_intercept
continue
# check if the search has been ended after max_steps
if self.exec_context.max_steps is not None \
and self.k >= self.exec_context.max_steps:
warnings.warn('stepwise search has reached the maximum number '
'of tries to find the best fit model')
# TODO: if (approximation && !is.null(bestfit$arma)) - refit best w MLE
filtered_models_ics = sorted(
[(v, self.fit_time_dict[k], self.ic_dict[k])
for k, v in self.results_dict.items()
if v is not None],
key=(lambda fit_ic: fit_ic[1]),
)
sorted_fits = _sort_and_filter_fits(filtered_models_ics)
if self.trace and sorted_fits:
print(f"\nBest model: {str(sorted_fits[0])}")
return sorted_fits
def _fit_candidate_model(y,
X,
order,
seasonal_order,
start_params,
trend,
method,
maxiter,
fit_params,
suppress_warnings,
trace,
error_action,
out_of_sample_size,
scoring,
scoring_args,
with_intercept,
information_criterion,
**kwargs):
"""Instantiate and fit a candidate model
1. Initialize a model
2. Fit model
3. Perform a root test
4. Return model, information criterion
"""
start = time.time()
fit_time = np.nan
ic = np.inf
# Fit outside try block, so if there is a type error in user input we
# don't mask it with a warning or worse
fit = ARIMA(order=order, seasonal_order=seasonal_order,
start_params=start_params, trend=trend, method=method,
maxiter=maxiter, suppress_warnings=suppress_warnings,
out_of_sample_size=out_of_sample_size, scoring=scoring,
scoring_args=scoring_args,
with_intercept=with_intercept, **kwargs)
try:
fit.fit(y, X=X, **fit_params)
# for non-stationarity errors or singular matrices, return None
except (LinAlgError, ValueError) as v:
if error_action == "raise":
raise v
elif error_action in ("warn", "trace"):
warning_str = 'Error fitting %s ' \
'(if you do not want to see these warnings, run ' \
'with error_action="ignore").' \
% str(fit)
if error_action == 'trace':
warning_str += "\nTraceback:\n" + traceback.format_exc()
warnings.warn(warning_str, ModelFitWarning)
else:
fit_time = time.time() - start
ic = getattr(fit, information_criterion)() # aic, bic, aicc, etc.
# check the roots of the new model, and set IC to inf if the
# roots are near non-invertible
ic = _root_test(fit, ic, trace)
# log the model fit
if trace:
print(
"{model} : {ic_name}={ic:.3f}, Time={time:.2f} sec"
.format(model=str(fit),
ic_name=information_criterion.upper(),
ic=ic,
time=fit_time)
)
return fit, fit_time, ic
def _sort_and_filter_fits(models):
"""Sort the results in ascending order, by information criterion
If there are no suitable models, raise a ValueError.
Otherwise, return ``a``. In the case that ``a`` is an iterable
(i.e., it made it to the end of the function), this method will
filter out the None values and assess whether the list is empty.
Parameters
----------
models : tuple or list
The list or (model, fit_time, information_criterion), or a single tuple
"""
# if it's a result of making it to the end, it will be a list of models
if not isinstance(models, list):
models = [models]
# Filter out the Nones or Infs (the failed models)...
filtered = [(mod, ic) for mod, _, ic in models
if mod is not None and np.isfinite(ic)]
# if the list is empty, or if it was an ARIMA and it's None
if not filtered:
raise ValueError(
"Could not successfully fit a viable ARIMA model "
"to input data.\nSee "
"http://alkaline-ml.com/pmdarima/no-successful-model.html "
"for more information on why this can happen."
)
# sort by the criteria - lower is better for both AIC and BIC
# (https://stats.stackexchange.com/questions/81427/aic-guidelines-in-model-selection) # noqa
sorted_res = sorted(filtered, key=(lambda mod_ic: mod_ic[1]))
# TODO: break ties with fit time?
models, _ = zip(*sorted_res)
return models