# -*- coding: utf-8 -*-
"""Tools for working with groups
This provides several functions to work with groups and a Group class that
keeps track of the different representations and has methods to work more
easily with groups.
Author: Josef Perktold,
Author: Nathaniel Smith, recipe for sparse_dummies on scipy user mailing list
Created on Tue Nov 29 15:44:53 2011 : sparse_dummies
Created on Wed Nov 30 14:28:24 2011 : combine_indices
changes: add Group class
Notes
~~~~~
This reverses the class I used before, where the class was for the data and
the group was auxiliary. Here, it is only the group, no data is kept.
sparse_dummies needs checking for corner cases, e.g.
what if a category level has zero elements? This can happen with subset
selection even if the original groups where defined as arange.
Not all methods and options have been tried out yet after refactoring
need more efficient loop if groups are sorted -> see GroupSorted.group_iter
"""
from __future__ import print_function
from statsmodels.compat.python import lrange, lzip, range
import numpy as np
import pandas as pd
from statsmodels.compat.numpy import npc_unique
import statsmodels.tools.data as data_util
from statsmodels.tools.decorators import cache_readonly
from pandas.core.index import Index, MultiIndex
[docs]def combine_indices(groups, prefix='', sep='.', return_labels=False):
'''use np.unique to get integer group indices for product, intersection
'''
if isinstance(groups, tuple):
groups = np.column_stack(groups)
else:
groups = np.asarray(groups)
dt = groups.dtype
#print(dt)
is2d = (groups.ndim == 2) # need to store
if is2d:
ncols = groups.shape[1]
if not groups.flags.c_contiguous:
groups = np.array(groups, order='C')
groups_ = groups.view([('', groups.dtype)] * groups.shape[1])
else:
groups_ = groups
uni, uni_idx, uni_inv = npc_unique(groups_, return_index=True,
return_inverse=True)
if is2d:
uni = uni.view(dt).reshape(-1, ncols)
#avoiding a view would be
#for t in uni.dtype.fields.values():
# assert (t[0] == dt)
#
#uni.dtype = dt
#uni.shape = (uni.size//ncols, ncols)
if return_labels:
label = [(prefix+sep.join(['%s']*len(uni[0]))) % tuple(ii)
for ii in uni]
return uni_inv, uni_idx, uni, label
else:
return uni_inv, uni_idx, uni
#written for and used in try_covariance_grouploop.py
[docs]def group_sums(x, group, use_bincount=True):
'''simple bincount version, again
group : array, integer
assumed to be consecutive integers
no dtype checking because I want to raise in that case
uses loop over columns of x
for comparison, simple python loop
'''
x = np.asarray(x)
if x.ndim == 1:
x = x[:, None]
elif x.ndim > 2 and use_bincount:
raise ValueError('not implemented yet')
if use_bincount:
return np.array([np.bincount(group, weights=x[:, col])
for col in range(x.shape[1])])
else:
uniques = np.unique(group)
result = np.zeros([len(uniques)] + list(x.shape[1:]))
for ii, cat in enumerate(uniques):
result[ii] = x[g == cat].sum(0)
return result
[docs]def group_sums_dummy(x, group_dummy):
'''sum by groups given group dummy variable
group_dummy can be either ndarray or sparse matrix
'''
if data_util._is_using_ndarray_type(group_dummy, None):
return np.dot(x.T, group_dummy)
else: # check for sparse
return x.T * group_dummy
[docs]def dummy_sparse(groups):
'''create a sparse indicator from a group array with integer labels
Parameters
----------
groups: ndarray, int, 1d (nobs,)
an array of group indicators for each observation. Group levels are
assumed to be defined as consecutive integers, i.e. range(n_groups)
where n_groups is the number of group levels. A group level with no
observations for it will still produce a column of zeros.
Returns
-------
indi : ndarray, int8, 2d (nobs, n_groups)
an indicator array with one row per observation, that has 1 in the
column of the group level for that observation
Examples
--------
>>> g = np.array([0, 0, 2, 1, 1, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi
<7x3 sparse matrix of type '<type 'numpy.int8'>'
with 7 stored elements in Compressed Sparse Row format>
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[0, 1, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
current behavior with missing groups
>>> g = np.array([0, 0, 2, 0, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
'''
from scipy import sparse
indptr = np.arange(len(groups)+1)
data = np.ones(len(groups), dtype=np.int8)
indi = sparse.csr_matrix((data, g, indptr))
return indi
[docs]class Group(object):
[docs] def __init__(self, group, name=''):
#self.group = np.asarray(group) #TODO: use checks in combine_indices
self.name = name
uni, uni_idx, uni_inv = combine_indices(group)
#TODO: rename these to something easier to remember
self.group_int, self.uni_idx, self.uni = uni, uni_idx, uni_inv
self.n_groups = len(self.uni)
#put this here so they can be overwritten before calling labels
self.separator = '.'
self.prefix = self.name
if self.prefix:
self.prefix = self.prefix + '='
#cache decorator
[docs] def counts(self):
return np.bincount(self.group_int)
#cache_decorator
[docs] def labels(self):
#is this only needed for product of groups (intersection)?
prefix = self.prefix
uni = self.uni
sep = self.separator
if uni.ndim > 1:
label = [(prefix+sep.join(['%s']*len(uni[0]))) % tuple(ii)
for ii in uni]
else:
label = [prefix + '%s' % ii for ii in uni]
return label
[docs] def dummy(self, drop_idx=None, sparse=False, dtype=int):
'''
drop_idx is only available if sparse=False
drop_idx is supposed to index into uni
'''
uni = self.uni
if drop_idx is not None:
idx = lrange(len(uni))
del idx[drop_idx]
uni = uni[idx]
group = self.group
if not sparse:
return (group[:, None] == uni[None, :]).astype(dtype)
else:
return dummy_sparse(self.group_int)
[docs] def interaction(self, other):
if isinstance(other, self.__class__):
other = other.group
return self.__class__((self, other))
[docs] def group_sums(self, x, use_bincount=True):
return group_sums(x, self.group_int, use_bincount=use_bincount)
[docs] def group_demean(self, x, use_bincount=True):
nobs = float(len(x))
means_g = group_sums(x / nobs, self.group_int,
use_bincount=use_bincount)
x_demeaned = x - means_g[self.group_int] # check reverse_index?
return x_demeaned, means_g
[docs]class GroupSorted(Group):
[docs] def __init__(self, group, name=''):
super(self.__class__, self).__init__(group, name=name)
idx = (np.nonzero(np.diff(group))[0]+1).tolist()
self.groupidx = groupidx = lzip([0]+idx, idx+[len(group)])
ngroups = len(groupidx)
[docs] def group_iter(self):
for low, upp in self.groupidx:
yield slice(low, upp)
[docs] def lag_indices(self, lag):
'''return the index array for lagged values
Warning: if k is larger then the number of observations for an
individual, then no values for that individual are returned.
TODO: for the unbalanced case, I should get the same truncation for
the array with lag=0. From the return of lag_idx we wouldn't know
which individual is missing.
TODO: do I want the full equivalent of lagmat in tsa?
maxlag or lag or lags.
not tested yet
'''
lag_idx = np.asarray(self.groupidx)[:, 1] - lag # asarray or already?
mask_ok = (lag <= lag_idx)
#still an observation that belongs to the same individual
return lag_idx[mask_ok]
def _is_hierarchical(x):
"""
Checks if the first item of an array-like object is also array-like
If so, we have a MultiIndex and returns True. Else returns False.
"""
item = x[0]
# is there a better way to do this?
if isinstance(item, (list, tuple, np.ndarray, pd.Series, pd.DataFrame)):
return True
else:
return False
def _make_hierarchical_index(index, names):
return MultiIndex.from_tuples(*[index], names=names)
def _make_generic_names(index):
n_names = len(index.names)
pad = str(len(str(n_names))) # number of digits
return [("group{0:0"+pad+"}").format(i) for i in range(n_names)]
[docs]class Grouping(object):
[docs] def __init__(self, index, names=None):
'''
index : index-like
Can be pandas MultiIndex or Index or array-like. If array-like
and is a MultipleIndex (more than one grouping variable),
groups are expected to be in each row. E.g., [('red', 1),
('red', 2), ('green', 1), ('green', 2)]
names : list or str, optional
The names to use for the groups. Should be a str if only
one grouping variable is used.
Notes
-----
If index is already a pandas Index then there is no copy.
'''
if isinstance(index, (Index, MultiIndex)):
if names is not None:
if hasattr(index, 'set_names'): # newer pandas
index.set_names(names, inplace=True)
else:
index.names = names
self.index = index
else: # array-like
if _is_hierarchical(index):
self.index = _make_hierarchical_index(index, names)
else:
self.index = Index(index, name=names)
if names is None:
names = _make_generic_names(self.index)
if hasattr(self.index, 'set_names'):
self.index.set_names(names, inplace=True)
else:
self.index.names = names
self.nobs = len(self.index)
self.nlevels = len(self.index.names)
self.slices = None
@property
def index_shape(self):
if hasattr(self.index, 'levshape'):
return self.index.levshape
else:
return self.index.shape
@property
def levels(self):
if hasattr(self.index, 'levels'):
return self.index.levels
else:
return pd.Categorical(self.index).levels
@property
def labels(self):
# this was index_int, but that's not a very good name...
if hasattr(self.index, 'labels'):
return self.index.labels
else: # pandas version issue here
# Compat code for the labels -> codes change in pandas 0.15
# FIXME: use .codes directly when we don't want to support pandas < 0.15
tmp = pd.Categorical(self.index)
try:
labl = tmp.labels
except AttributeError:
labl = tmp.codes
return labl[None]
@property
def group_names(self):
return self.index.names
[docs] def reindex(self, index=None, names=None):
"""
Resets the index in-place.
"""
#NOTE: this isn't of much use if the rest of the data doesn't change
#This needs to reset cache
if names is None:
names = self.group_names
self = Grouping(index, names)
[docs] def get_slices(self, level=0):
'''
Sets the slices attribute to be a list of indices of the sorted
groups for the first index level. I.e., self.slices[0] is the
index where each observation is in the first (sorted) group.
'''
#TODO: refactor this
groups = self.index.get_level_values(level).unique()
groups.sort()
if isinstance(self.index, MultiIndex):
self.slices = [self.index.get_loc_level(x, level=level)[0]
for x in groups]
else:
self.slices = [self.index.get_loc(x) for x in groups]
[docs] def count_categories(self, level=0):
"""
Sets the attribute counts to equal the bincount of the (integer-valued)
labels.
"""
#TODO: refactor this not to set an attribute. Why would we do this?
self.counts = np.bincount(self.labels[level])
[docs] def check_index(self, is_sorted=True, unique=True, index=None):
'''Sanity checks'''
if not index:
index = self.index
if is_sorted:
test = pd.DataFrame(lrange(len(index)), index=index)
test_sorted = test.sort()
if not test.index.equals(test_sorted.index):
raise Exception('Data is not be sorted')
if unique:
if len(index) != len(index.unique()):
raise Exception('Duplicate index entries')
[docs] def sort(self, data, index=None):
'''Applies a (potentially hierarchical) sort operation on a numpy array
or pandas series/dataframe based on the grouping index or a
user-supplied index. Returns an object of the same type as the
original data as well as the matching (sorted) Pandas index.
'''
if index is None:
index = self.index
if data_util._is_using_ndarray_type(data, None):
if data.ndim == 1:
out = pd.Series(data, index=index, copy=True)
out = out.sort_index()
else:
out = pd.DataFrame(data, index=index)
out = out.sort(inplace=False) # copies
return np.array(out), out.index
elif data_util._is_using_pandas(data, None):
out = data
out = out.reindex(index) # copies?
out = out.sort_index()
return out, out.index
else:
msg = 'data must be a Numpy array or a Pandas Series/DataFrame'
raise ValueError(msg)
#TODO: this isn't general needs to be a PanelGrouping object
[docs] def dummies_time(self):
self.dummy_sparse(level=1)
return self._dummies
[docs] def dummies_groups(self, level=0):
self.dummy_sparse(level=level)
return self._dummies
[docs] def dummy_sparse(self, level=0):
'''create a sparse indicator from a group array with integer labels
Parameters
----------
groups: ndarray, int, 1d (nobs,) an array of group indicators for each
observation. Group levels are assumed to be defined as consecutive
integers, i.e. range(n_groups) where n_groups is the number of
group levels. A group level with no observations for it will still
produce a column of zeros.
Returns
-------
indi : ndarray, int8, 2d (nobs, n_groups)
an indicator array with one row per observation, that has 1 in the
column of the group level for that observation
Examples
--------
>>> g = np.array([0, 0, 2, 1, 1, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi
<7x3 sparse matrix of type '<type 'numpy.int8'>'
with 7 stored elements in Compressed Sparse Row format>
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[0, 1, 0],
[0, 1, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
current behavior with missing groups
>>> g = np.array([0, 0, 2, 0, 2, 0])
>>> indi = dummy_sparse(g)
>>> indi.todense()
matrix([[1, 0, 0],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0],
[0, 0, 1],
[1, 0, 0]], dtype=int8)
'''
from scipy import sparse
groups = self.labels[level]
indptr = np.arange(len(groups)+1)
data = np.ones(len(groups), dtype=np.int8)
self._dummies = sparse.csr_matrix((data, groups, indptr))
if __name__ == '__main__':
#---------- examples combine_indices
from numpy.testing import assert_equal
np.random.seed(985367)
groups = np.random.randint(0, 2, size=(10, 2))
uv, ux, u, label = combine_indices(groups, return_labels=True)
uv, ux, u, label = combine_indices(groups, prefix='g1,g2=', sep=',',
return_labels=True)
group0 = np.array(['sector0', 'sector1'])[groups[:, 0]]
group1 = np.array(['region0', 'region1'])[groups[:, 1]]
uv, ux, u, label = combine_indices((group0, group1),
prefix='sector,region=',
sep=',',
return_labels=True)
uv, ux, u, label = combine_indices((group0, group1), prefix='', sep='.',
return_labels=True)
group_joint = np.array(label)[uv]
group_joint_expected = np.array(
['sector1.region0', 'sector0.region1', 'sector0.region0',
'sector0.region1', 'sector1.region1', 'sector0.region0',
'sector1.region0', 'sector1.region0', 'sector0.region1',
'sector0.region0'],
dtype='|S15')
assert_equal(group_joint, group_joint_expected)
'''
>>> uv
array([2, 1, 0, 0, 1, 0, 2, 0, 1, 0])
>>> label
['sector0.region0', 'sector1.region0', 'sector1.region1']
>>> np.array(label)[uv]
array(['sector1.region1', 'sector1.region0', 'sector0.region0',
'sector0.region0', 'sector1.region0', 'sector0.region0',
'sector1.region1', 'sector0.region0', 'sector1.region0',
'sector0.region0'],
dtype='|S15')
>>> np.column_stack((group0, group1))
array([['sector1', 'region1'],
['sector1', 'region0'],
['sector0', 'region0'],
['sector0', 'region0'],
['sector1', 'region0'],
['sector0', 'region0'],
['sector1', 'region1'],
['sector0', 'region0'],
['sector1', 'region0'],
['sector0', 'region0']],
dtype='|S7')
'''
#------------- examples sparse_dummies
from scipy import sparse
g = np.array([0, 0, 1, 2, 1, 1, 2, 0])
u = lrange(3)
indptr = np.arange(len(g)+1)
data = np.ones(len(g), dtype=np.int8)
a = sparse.csr_matrix((data, g, indptr))
print(a.todense())
print(np.all(a.todense() == (g[:, None] == np.arange(3)).astype(int)))
x = np.arange(len(g)*3).reshape(len(g), 3, order='F')
print('group means')
print(x.T * a)
print(np.dot(x.T, g[:, None] == np.arange(3)))
print(np.array([np.bincount(g, weights=x[:, col]) for col in range(3)]))
for cat in u:
print(x[g == cat].sum(0))
for cat in u:
x[g == cat].sum(0)
cc = sparse.csr_matrix([[0, 1, 0, 1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 1, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 1, 0, 0, 0],
[1, 0, 0, 0, 1, 0, 1, 0, 0],
[0, 1, 0, 1, 0, 1, 0, 1, 0],
[0, 0, 1, 0, 1, 0, 0, 0, 1],
[0, 0, 0, 1, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 1, 0, 1, 0, 1],
[0, 0, 0, 0, 0, 1, 0, 1, 0]])
#------------- groupsums
print(group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2), g,
use_bincount=False).T)
print(group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2)[:, :, 0], g))
print(group_sums(np.arange(len(g)*3*2).reshape(len(g), 3, 2)[:, :, 1], g))
#------------- examples class
x = np.arange(len(g)*3).reshape(len(g), 3, order='F')
mygroup = Group(g)
print(mygroup.group_int)
print(mygroup.group_sums(x))
print(mygroup.labels())