Files
2025-08-01 04:33:03 -04:00

340 lines
14 KiB
Python

from statsmodels.compat.pandas import assert_frame_equal, assert_series_equal
import numpy as np
from numpy.testing import assert_equal
import pandas as pd
import pytest
from scipy import sparse
from statsmodels.tools.grouputils import (dummy_sparse, Grouping, Group,
combine_indices, group_sums)
from statsmodels.datasets import grunfeld, anes96
class CheckGrouping:
@pytest.mark.smoke
def test_reindex(self):
self.grouping.reindex(self.grouping.index)
def test_count_categories(self):
self.grouping.count_categories(level=0)
np.testing.assert_equal(self.grouping.counts, self.expected_counts)
def test_sort(self):
# data frame
sorted_data, index = self.grouping.sort(self.data)
expected_sorted_data = self.data.sort_index()
assert_frame_equal(sorted_data, expected_sorted_data)
np.testing.assert_(isinstance(sorted_data, pd.DataFrame))
np.testing.assert_(not index.equals(self.grouping.index))
# make sure it copied
if hasattr(sorted_data, 'equals'): # newer pandas
np.testing.assert_(not sorted_data.equals(self.data))
# 2d arrays
sorted_data, index = self.grouping.sort(self.data.values)
np.testing.assert_array_equal(sorted_data,
expected_sorted_data.values)
np.testing.assert_(isinstance(sorted_data, np.ndarray))
# 1d series
series = self.data[self.data.columns[0]]
sorted_data, index = self.grouping.sort(series)
expected_sorted_data = series.sort_index()
assert_series_equal(sorted_data, expected_sorted_data)
np.testing.assert_(isinstance(sorted_data, pd.Series))
if hasattr(sorted_data, 'equals'):
np.testing.assert_(not sorted_data.equals(series))
# 1d array
array = series.values
sorted_data, index = self.grouping.sort(array)
expected_sorted_data = series.sort_index().values
np.testing.assert_array_equal(sorted_data, expected_sorted_data)
np.testing.assert_(isinstance(sorted_data, np.ndarray))
def test_transform_dataframe(self):
names = self.data.index.names
transformed_dataframe = self.grouping.transform_dataframe(
self.data,
lambda x : x.mean(),
level=0)
cols = [names[0]] + list(self.data.columns)
df = self.data.reset_index()[cols].set_index(names[0])
grouped = df[self.data.columns].groupby(level=0)
expected = grouped.apply(lambda x : x.mean())
np.testing.assert_allclose(transformed_dataframe,
expected.values)
if len(names) > 1:
transformed_dataframe = self.grouping.transform_dataframe(
self.data, lambda x : x.mean(),
level=1)
cols = [names[1]] + list(self.data.columns)
df = self.data.reset_index()[cols].set_index(names[1])
grouped = df.groupby(level=0)
expected = grouped.apply(lambda x: x.mean())[self.data.columns]
np.testing.assert_allclose(transformed_dataframe,
expected.values)
def test_transform_array(self):
names = self.data.index.names
transformed_array = self.grouping.transform_array(
self.data.values,
lambda x : x.mean(),
level=0)
cols = [names[0]] + list(self.data.columns)
df = self.data.reset_index()[cols].set_index(names[0])
grouped = df[self.data.columns].groupby(level=0)
expected = grouped.apply(lambda x: x.mean())
np.testing.assert_allclose(transformed_array,
expected.values)
if len(names) > 1:
transformed_array = self.grouping.transform_array(
self.data.values,
lambda x : x.mean(), level=1)
cols = [names[1]] + list(self.data.columns)
df = self.data.reset_index()[cols].set_index(names[1])
grouped = df[self.data.columns].groupby(level=0)
expected = grouped.apply(lambda x: x.mean())[self.data.columns]
np.testing.assert_allclose(transformed_array,
expected.values)
def test_transform_slices(self):
names = self.data.index.names
transformed_slices = self.grouping.transform_slices(
self.data.values,
lambda x, idx : x.mean(0), # noqa
level=0)
expected = self.data.reset_index().groupby(
names[0])[self.data.columns].mean()
np.testing.assert_allclose(transformed_slices, expected.values,
rtol=1e-12, atol=1e-25)
if len(names) > 1:
transformed_slices = self.grouping.transform_slices(
self.data.values,
lambda x, idx : x.mean(0), # noqa
level=1)
expected = self.data.reset_index().groupby(
names[1])[self.data.columns].mean()
np.testing.assert_allclose(transformed_slices, expected.values,
rtol=1e-12, atol=1e-25)
@pytest.mark.smoke
def test_dummies_groups(self):
# calls dummy_sparse under the hood
self.grouping.dummies_groups()
if len(self.grouping.group_names) > 1:
self.grouping.dummies_groups(level=1)
def test_dummy_sparse(self):
data = self.data
self.grouping.dummy_sparse()
values = data.index.get_level_values(0).values
expected = pd.get_dummies(pd.Series(values, dtype="category"),
drop_first=False)
np.testing.assert_equal(self.grouping._dummies.toarray(), expected)
if len(self.grouping.group_names) > 1:
self.grouping.dummy_sparse(level=1)
values = data.index.get_level_values(1).values
expected = pd.get_dummies(pd.Series(values, dtype="category"),
drop_first=False)
np.testing.assert_equal(self.grouping._dummies.toarray(),
expected)
class TestMultiIndexGrouping(CheckGrouping):
@classmethod
def setup_class(cls):
grun_data = grunfeld.load_pandas().data
multi_index_data = grun_data.set_index(['firm', 'year'])
multi_index_panel = multi_index_data.index
cls.grouping = Grouping(multi_index_panel)
cls.data = multi_index_data
cls.expected_counts = [20] * 11
class TestIndexGrouping(CheckGrouping):
@classmethod
def setup_class(cls):
grun_data = grunfeld.load_pandas().data
index_data = grun_data.set_index(['firm'])
index_group = index_data.index
cls.grouping = Grouping(index_group)
cls.data = index_data
cls.expected_counts = [20] * 11
def test_init_api():
# make a multi-index panel
grun_data = grunfeld.load_pandas().data
multi_index_panel = grun_data.set_index(['firm', 'year']).index
grouping = Grouping(multi_index_panel)
# check group_names
np.testing.assert_array_equal(grouping.group_names, ['firm', 'year'])
# check shape
np.testing.assert_array_equal(grouping.index_shape, (11, 20))
# check index_int
np.testing.assert_array_equal(grouping.labels,
[[ 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10,
10, 10, 10, 10, 10, 10, 10, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4,
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3,
4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]])
grouping = Grouping(multi_index_panel, names=['firms', 'year'])
np.testing.assert_array_equal(grouping.group_names, ['firms', 'year'])
# make a multi-index grouping
anes_data = anes96.load_pandas().data
multi_index_groups = anes_data.set_index(['educ', 'income',
'TVnews']).index
grouping = Grouping(multi_index_groups)
np.testing.assert_array_equal(grouping.group_names,
['educ', 'income', 'TVnews'])
np.testing.assert_array_equal(grouping.index_shape, (7, 24, 8))
# make a list multi-index panel
list_panel = multi_index_panel.tolist()
grouping = Grouping(list_panel, names=['firms', 'year'])
np.testing.assert_array_equal(grouping.group_names, ['firms', 'year'])
np.testing.assert_array_equal(grouping.index_shape, (11, 20))
# make a list multi-index grouping
list_groups = multi_index_groups.tolist()
grouping = Grouping(list_groups, names=['educ', 'income', 'TVnews'])
np.testing.assert_array_equal(grouping.group_names,
['educ', 'income', 'TVnews'])
np.testing.assert_array_equal(grouping.index_shape, (7, 24, 8))
# single-variable index grouping
index_group = multi_index_panel.get_level_values(0)
grouping = Grouping(index_group)
# the original multi_index_panel had it's name changed inplace above
np.testing.assert_array_equal(grouping.group_names, ['firms'])
np.testing.assert_array_equal(grouping.index_shape, (220,))
# single variable list grouping
list_group = multi_index_panel.get_level_values(0).tolist()
grouping = Grouping(list_group)
np.testing.assert_array_equal(grouping.group_names, ["group0"])
np.testing.assert_array_equal(grouping.index_shape, 11*20)
# test generic group names
grouping = Grouping(list_groups)
np.testing.assert_array_equal(grouping.group_names,
['group0', 'group1', 'group2'])
def test_combine_indices():
# Moved from grouputils __main__ section
np.random.seed(985367)
groups = np.random.randint(0, 2, size=(10, 2))
uv, ux, u, label = combine_indices(groups, return_labels=True)
uv, ux, u, label = combine_indices(groups, prefix='g1,g2=', sep=',',
return_labels=True)
group0 = np.array(['sector0', 'sector1'])[groups[:, 0]]
group1 = np.array(['region0', 'region1'])[groups[:, 1]]
uv, ux, u, label = combine_indices((group0, group1),
prefix='sector,region=',
sep=',',
return_labels=True)
uv, ux, u, label = combine_indices((group0, group1), prefix='', sep='.',
return_labels=True)
group_joint = np.array(label)[uv.flat]
group_joint_expected = np.array(['sector1.region0', 'sector0.region1',
'sector0.region0', 'sector0.region1',
'sector1.region1', 'sector0.region0',
'sector1.region0', 'sector1.region0',
'sector0.region1', 'sector0.region0'],
dtype='|U15')
assert_equal(group_joint, group_joint_expected)
@pytest.mark.smoke
def test_group_sums():
# Moved from grouputils __main__ section
g = np.array([0, 0, 1, 2, 1, 1, 2, 0])
group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2), g,
use_bincount=False).T
group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2)[:, :, 0], g)
group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2)[:, :, 1], g)
@pytest.mark.smoke
def test_group_class():
# Moved from grouputils __main__ section
g = np.array([0, 0, 1, 2, 1, 1, 2, 0])
x = np.arange(len(g)*3).reshape(len(g), 3, order='F')
mygroup = Group(g)
mygroup.group_int
mygroup.group_sums(x)
mygroup.labels()
def test_dummy_sparse():
# See GH#5687
g = np.array([0, 0, 2, 1, 1, 2, 0])
indi = dummy_sparse(g)
assert isinstance(indi, sparse.csr_matrix)
result = indi.todense()
expected = np.matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[0, 1, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0]], dtype=np.int8)
assert_equal(result, expected)
# current behavior with missing groups
g = np.array([0, 0, 2, 0, 2, 0])
indi = dummy_sparse(g)
result = indi.todense()
expected = np.matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0]], dtype=np.int8)
assert_equal(result, expected)