293 lines
8.5 KiB
Python
293 lines
8.5 KiB
Python
|
|
from pmdarima.utils.array import diff, diff_inv, c, is_iterable, as_series, \
|
|
check_exog, check_endog
|
|
from pmdarima.utils import get_callable
|
|
from numpy.testing import assert_array_equal, assert_array_almost_equal
|
|
|
|
import datetime
|
|
import pytest
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
x = np.arange(5)
|
|
m = np.array([10, 5, 12, 23, 18, 3, 2, 0, 12]).reshape(3, 3).T
|
|
X = pd.DataFrame.from_records(
|
|
np.random.RandomState(2).rand(4, 4),
|
|
columns=['a', 'b', 'c', 'd']
|
|
)
|
|
|
|
# need some infinite values in X for testing check_exog
|
|
X_nan = X.copy()
|
|
X_nan.loc[0, 'a'] = np.nan
|
|
|
|
X_inf = X.copy()
|
|
X_inf.loc[0, 'a'] = np.inf
|
|
|
|
# for diffinv
|
|
x_mat = (np.arange(9) + 1).reshape(3, 3).T
|
|
|
|
|
|
def series_with_dt_index(n):
|
|
"""Helper fn to create a monotonic series with Datetime index"""
|
|
time_column = []
|
|
date = datetime.date(2022, 1, 1)
|
|
|
|
for i in range(n):
|
|
time_column.append(date + datetime.timedelta(days=i))
|
|
|
|
return pd.Series(range(n), index=time_column)
|
|
|
|
|
|
def test_diff():
|
|
# test vector for lag = (1, 2), diff = (1, 2)
|
|
assert_array_equal(diff(x, lag=1, differences=1), np.ones(4))
|
|
assert_array_equal(diff(x, lag=1, differences=2), np.zeros(3))
|
|
assert_array_equal(diff(x, lag=2, differences=1), np.ones(3) * 2)
|
|
assert_array_equal(diff(x, lag=2, differences=2), np.zeros(1))
|
|
|
|
# test matrix for lag = (1, 2), diff = (1, 2)
|
|
assert_array_equal(diff(m, lag=1, differences=1),
|
|
np.array([[-5, -5, -2], [7, -15, 12]]))
|
|
assert_array_equal(diff(m, lag=1, differences=2),
|
|
np.array([[12, -10, 14]]))
|
|
assert_array_equal(diff(m, lag=2, differences=1), np.array([[2, -20, 10]]))
|
|
assert diff(m, lag=2, differences=2).shape[0] == 0
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'arr,lag,differences,xi,expected', [
|
|
# VECTORS -------------------------------------------------------------
|
|
# > x = c(0, 1, 2, 3, 4)
|
|
# > diffinv(x, lag=1, differences=1)
|
|
# [1] 0 0 1 3 6 10
|
|
pytest.param(x, 1, 1, None, [0, 0, 1, 3, 6, 10]),
|
|
|
|
# > diffinv(x, lag=1, differences=2)
|
|
# [1] 0 0 0 1 4 10 20
|
|
pytest.param(x, 1, 2, None, [0, 0, 0, 1, 4, 10, 20]),
|
|
|
|
# > diffinv(x, lag=2, differences=1)
|
|
# [1] 0 0 0 1 2 4 6
|
|
pytest.param(x, 2, 1, None, [0, 0, 0, 1, 2, 4, 6]),
|
|
|
|
# > diffinv(x, lag=2, differences=2)
|
|
# [1] 0 0 0 0 0 1 2 5 8
|
|
pytest.param(x, 2, 2, None, [0, 0, 0, 0, 0, 1, 2, 5, 8]),
|
|
|
|
# This is a test of the intermediate stage when x == [1, 0, 3, 2]
|
|
pytest.param([1, 0, 3, 2], 1, 1, [0], [0, 1, 1, 4, 6]),
|
|
|
|
# This is an intermediate stage when x == [0, 1, 2, 3, 4]
|
|
pytest.param(x, 1, 1, [0], [0, 0, 1, 3, 6, 10]),
|
|
|
|
# MATRICES ------------------------------------------------------------
|
|
# > matrix(data=c(1, 2, 3, 4, 5, 6, 7, 8, 9), nrow=3, ncol=3)
|
|
# [,1] [,2] [,3]
|
|
# [1,] 1 4 7
|
|
# [2,] 2 5 8
|
|
# [3,] 3 6 9
|
|
# > diffinv(X, 1, 1)
|
|
# [,1] [,2] [,3]
|
|
# [1,] 0 0 0
|
|
# [2,] 1 4 7
|
|
# [3,] 3 9 15
|
|
# [4,] 6 15 24
|
|
pytest.param(x_mat, 1, 1, None,
|
|
[[0, 0, 0],
|
|
[1, 4, 7],
|
|
[3, 9, 15],
|
|
[6, 15, 24]]),
|
|
|
|
# > diffinv(X, 1, 2)
|
|
# [,1] [,2] [,3]
|
|
# [1,] 0 0 0
|
|
# [2,] 0 0 0
|
|
# [3,] 1 4 7
|
|
# [4,] 4 13 22
|
|
# [5,] 10 28 46
|
|
pytest.param(x_mat, 1, 2, None,
|
|
[[0, 0, 0],
|
|
[0, 0, 0],
|
|
[1, 4, 7],
|
|
[4, 13, 22],
|
|
[10, 28, 46]]),
|
|
|
|
# > diffinv(X, 2, 1)
|
|
# [,1] [,2] [,3]
|
|
# [1,] 0 0 0
|
|
# [2,] 0 0 0
|
|
# [3,] 1 4 7
|
|
# [4,] 2 5 8
|
|
# [5,] 4 10 16
|
|
pytest.param(x_mat, 2, 1, None,
|
|
[[0, 0, 0],
|
|
[0, 0, 0],
|
|
[1, 4, 7],
|
|
[2, 5, 8],
|
|
[4, 10, 16]]),
|
|
|
|
# > diffinv(X, 2, 2)
|
|
# [,1] [,2] [,3]
|
|
# [1,] 0 0 0
|
|
# [2,] 0 0 0
|
|
# [3,] 0 0 0
|
|
# [4,] 0 0 0
|
|
# [5,] 1 4 7
|
|
# [6,] 2 5 8
|
|
# [7,] 5 14 23
|
|
pytest.param(x_mat, 2, 2, None,
|
|
[[0, 0, 0],
|
|
[0, 0, 0],
|
|
[0, 0, 0],
|
|
[0, 0, 0],
|
|
[1, 4, 7],
|
|
[2, 5, 8],
|
|
[5, 14, 23]]),
|
|
]
|
|
)
|
|
def test_diff_inv(arr, lag, differences, xi, expected):
|
|
res = diff_inv(arr, lag=lag, differences=differences, xi=xi)
|
|
expected = np.array(expected, dtype=float)
|
|
assert_array_equal(expected, res)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'y,preserve_series,exp,exp_error',
|
|
[
|
|
# base case, preserve=True, but not a series
|
|
pytest.param(
|
|
np.arange(5),
|
|
True,
|
|
np.arange(5),
|
|
None,
|
|
),
|
|
|
|
# base case, preserve=False, but not a series
|
|
pytest.param(
|
|
np.arange(5),
|
|
False,
|
|
np.arange(5),
|
|
None,
|
|
),
|
|
|
|
# series, with preserve=True
|
|
pytest.param(
|
|
series_with_dt_index(5),
|
|
True,
|
|
series_with_dt_index(5),
|
|
None,
|
|
),
|
|
|
|
# series, with preserve=False
|
|
pytest.param(
|
|
series_with_dt_index(5),
|
|
False,
|
|
np.arange(5),
|
|
None,
|
|
),
|
|
|
|
# dataframe w n_cols>1. assert error
|
|
pytest.param(
|
|
pd.DataFrame([[1, 2, 3], [4, 5, 6]]),
|
|
True,
|
|
None,
|
|
ValueError, # raised by sklearn
|
|
),
|
|
]
|
|
)
|
|
def test_check_endog(y, preserve_series, exp, exp_error):
|
|
if exp_error is not None:
|
|
with pytest.raises(exp_error):
|
|
check_endog(y, preserve_series=preserve_series)
|
|
else:
|
|
got = check_endog(y, preserve_series=preserve_series)
|
|
|
|
# pd.series
|
|
if hasattr(got, "eq"):
|
|
assert isinstance(exp, pd.Series)
|
|
assert exp.eq(got).all()
|
|
else:
|
|
assert_array_equal(exp, got)
|
|
|
|
|
|
def test_concatenate():
|
|
assert_array_equal(c(1, np.zeros(3)), np.array([1.0, 0.0, 0.0, 0.0]))
|
|
assert_array_equal(c([1], np.zeros(3)), np.array([1.0, 0.0, 0.0, 0.0]))
|
|
assert_array_equal(c(1), np.ones(1))
|
|
assert c() is None
|
|
assert_array_equal(c([1]), np.ones(1))
|
|
|
|
|
|
def test_corner_in_callable():
|
|
# test the ValueError in the get-callable method
|
|
with pytest.raises(ValueError):
|
|
get_callable('fake-key', {'a': 1})
|
|
|
|
|
|
def test_corner():
|
|
# fails because lag < 1
|
|
with pytest.raises(ValueError):
|
|
diff(x=x, lag=0)
|
|
with pytest.raises(ValueError):
|
|
diff_inv(x=x, lag=0)
|
|
|
|
# fails because differences < 1
|
|
with pytest.raises(ValueError):
|
|
diff(x=x, differences=0)
|
|
with pytest.raises(ValueError):
|
|
diff_inv(x=x, differences=0)
|
|
|
|
# Passing in xi with the incorrect shape to a 2-d array
|
|
with pytest.raises(IndexError):
|
|
diff_inv(x=np.array([[1, 1], [1, 1]]), xi=np.array([[1]]))
|
|
|
|
|
|
def test_is_iterable():
|
|
assert not is_iterable("this string")
|
|
assert is_iterable(["this", "list"])
|
|
assert not is_iterable(None)
|
|
assert is_iterable(np.array([1, 2]))
|
|
|
|
|
|
def test_as_series():
|
|
assert isinstance(as_series([1, 2, 3]), pd.Series)
|
|
assert isinstance(as_series(np.arange(5)), pd.Series)
|
|
assert isinstance(as_series(pd.Series([1, 2, 3])), pd.Series)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'arr', [
|
|
np.random.rand(5),
|
|
pd.Series(np.random.rand(5)),
|
|
]
|
|
)
|
|
def test_check_exog_ndim_value_err(arr):
|
|
with pytest.raises(ValueError):
|
|
check_exog(arr)
|
|
|
|
|
|
@pytest.mark.parametrize('arr', [X_nan, X_inf])
|
|
def test_check_exog_infinite_value_err(arr):
|
|
with pytest.raises(ValueError):
|
|
check_exog(arr, force_all_finite=True)
|
|
|
|
# show it passes when False
|
|
assert check_exog(
|
|
arr, force_all_finite=False, dtype=None, copy=False) is arr
|
|
|
|
|
|
def test_exog_pd_dataframes():
|
|
# test with copy
|
|
assert check_exog(X, force_all_finite=True, copy=True).equals(X)
|
|
|
|
# test without copy
|
|
assert check_exog(X, force_all_finite=True, copy=False) is X
|
|
|
|
|
|
def test_exog_np_array():
|
|
X_np = np.random.RandomState(1).rand(5, 5)
|
|
|
|
# show works on a list
|
|
assert_array_almost_equal(X_np, check_exog(X_np.tolist()))
|
|
assert_array_almost_equal(X_np, check_exog(X_np))
|