import types
from functools import wraps
import numpy as np
import datetime
import collections
import warnings
import copy
from pandas.compat import(
zip, range, long, lzip,
callable, map
)
from pandas import compat
from pandas.compat.numpy import function as nv
from pandas.compat.numpy import _np_version_under1p8
from pandas.types.common import (_DATELIKE_DTYPES,
is_numeric_dtype,
is_timedelta64_dtype, is_datetime64_dtype,
is_categorical_dtype,
is_datetime_or_timedelta_dtype,
is_bool, is_integer_dtype,
is_complex_dtype,
is_bool_dtype,
is_scalar,
_ensure_float64,
_ensure_platform_int,
_ensure_int64,
_ensure_object,
_ensure_float)
from pandas.types.cast import _possibly_downcast_to_dtype
from pandas.types.missing import isnull, notnull, _maybe_fill
from pandas.core.common import _values_from_object, AbstractMethodError
from pandas.core.base import (PandasObject, SelectionMixin, GroupByError,
DataError, SpecificationError)
from pandas.core.categorical import Categorical
from pandas.core.frame import DataFrame
from pandas.core.generic import NDFrame
from pandas.core.index import (Index, MultiIndex, CategoricalIndex,
_ensure_index)
from pandas.core.internals import BlockManager, make_block
from pandas.core.series import Series
from pandas.core.panel import Panel
from pandas.util.decorators import (cache_readonly, Substitution, Appender,
make_signature, deprecate_kwarg)
from pandas.formats.printing import pprint_thing
from pandas.util.validators import validate_kwargs
import pandas.core.algorithms as algos
import pandas.core.common as com
from pandas.core.config import option_context
import pandas.lib as lib
from pandas.lib import Timestamp
import pandas.tslib as tslib
import pandas.algos as _algos
import pandas.hashtable as _hash
_doc_template = """
See also
--------
pandas.Series.%(name)s
pandas.DataFrame.%(name)s
pandas.Panel.%(name)s
"""
# special case to prevent duplicate plots when catching exceptions when
# forwarding methods from NDFrames
_plotting_methods = frozenset(['plot', 'boxplot', 'hist'])
_common_apply_whitelist = frozenset([
'last', 'first',
'head', 'tail', 'median',
'mean', 'sum', 'min', 'max',
'cumsum', 'cumprod', 'cummin', 'cummax', 'cumcount',
'resample',
'describe',
'rank', 'quantile',
'fillna',
'mad',
'any', 'all',
'take',
'idxmax', 'idxmin',
'shift', 'tshift',
'ffill', 'bfill',
'pct_change', 'skew',
'corr', 'cov', 'diff',
]) | _plotting_methods
_series_apply_whitelist = \
(_common_apply_whitelist - set(['boxplot'])) | \
frozenset(['dtype', 'unique'])
_dataframe_apply_whitelist = \
_common_apply_whitelist | frozenset(['dtypes', 'corrwith'])
_cython_transforms = frozenset(['cumprod', 'cumsum', 'shift'])
def _groupby_function(name, alias, npfunc, numeric_only=True,
_convert=False):
_local_template = "Compute %(f)s of group values"
@Substitution(name='groupby', f=name)
@Appender(_doc_template)
@Appender(_local_template)
def f(self):
self._set_group_selection()
try:
return self._cython_agg_general(alias, numeric_only=numeric_only)
except AssertionError as e:
raise SpecificationError(str(e))
except Exception:
result = self.aggregate(lambda x: npfunc(x, axis=self.axis))
if _convert:
result = result._convert(datetime=True)
return result
f.__name__ = name
return f
def _first_compat(x, axis=0):
def _first(x):
x = np.asarray(x)
x = x[notnull(x)]
if len(x) == 0:
return np.nan
return x[0]
if isinstance(x, DataFrame):
return x.apply(_first, axis=axis)
else:
return _first(x)
def _last_compat(x, axis=0):
def _last(x):
x = np.asarray(x)
x = x[notnull(x)]
if len(x) == 0:
return np.nan
return x[-1]
if isinstance(x, DataFrame):
return x.apply(_last, axis=axis)
else:
return _last(x)
[docs]class Grouper(object):
"""
A Grouper allows the user to specify a groupby instruction for a target
object
This specification will select a column via the key parameter, or if the
level and/or axis parameters are given, a level of the index of the target
object.
These are local specifications and will override 'global' settings,
that is the parameters axis and level which are passed to the groupby
itself.
Parameters
----------
key : string, defaults to None
groupby key, which selects the grouping column of the target
level : name/number, defaults to None
the level for the target index
freq : string / frequency object, defaults to None
This will groupby the specified frequency if the target selection
(via key or level) is a datetime-like object. For full specification
of available frequencies, please see
`here <http://pandas.pydata.org/pandas-docs/stable/timeseries.html>`_.
axis : number/name of the axis, defaults to 0
sort : boolean, default to False
whether to sort the resulting labels
additional kwargs to control time-like groupers (when freq is passed)
closed : closed end of interval; left or right
label : interval boundary to use for labeling; left or right
convention : {'start', 'end', 'e', 's'}
If grouper is PeriodIndex
Returns
-------
A specification for a groupby instruction
Examples
--------
Syntactic sugar for ``df.groupby('A')``
>>> df.groupby(Grouper(key='A'))
Specify a resample operation on the column 'date'
>>> df.groupby(Grouper(key='date', freq='60s'))
Specify a resample operation on the level 'date' on the columns axis
with a frequency of 60s
>>> df.groupby(Grouper(level='date', freq='60s', axis=1))
"""
def __new__(cls, *args, **kwargs):
if kwargs.get('freq') is not None:
from pandas.tseries.resample import TimeGrouper
cls = TimeGrouper
return super(Grouper, cls).__new__(cls)
def __init__(self, key=None, level=None, freq=None, axis=0, sort=False):
self.key = key
self.level = level
self.freq = freq
self.axis = axis
self.sort = sort
self.grouper = None
self.obj = None
self.indexer = None
self.binner = None
@property
def ax(self):
return self.grouper
def _get_grouper(self, obj):
"""
Parameters
----------
obj : the subject object
Returns
-------
a tuple of binner, grouper, obj (possibly sorted)
"""
self._set_grouper(obj)
self.grouper, exclusions, self.obj = _get_grouper(self.obj, [self.key],
axis=self.axis,
level=self.level,
sort=self.sort)
return self.binner, self.grouper, self.obj
def _set_grouper(self, obj, sort=False):
"""
given an object and the specifications, setup the internal grouper
for this particular specification
Parameters
----------
obj : the subject object
"""
if self.key is not None and self.level is not None:
raise ValueError(
"The Grouper cannot specify both a key and a level!")
# the key must be a valid info item
if self.key is not None:
key = self.key
if key not in obj._info_axis:
raise KeyError("The grouper name {0} is not found".format(key))
ax = Index(obj[key], name=key)
else:
ax = obj._get_axis(self.axis)
if self.level is not None:
level = self.level
# if a level is given it must be a mi level or
# equivalent to the axis name
if isinstance(ax, MultiIndex):
level = ax._get_level_number(level)
ax = Index(ax.get_level_values(
level), name=ax.names[level])
else:
if level not in (0, ax.name):
raise ValueError(
"The level {0} is not valid".format(level))
# possibly sort
if (self.sort or sort) and not ax.is_monotonic:
# use stable sort to support first, last, nth
indexer = self.indexer = ax.argsort(kind='mergesort')
ax = ax.take(indexer)
obj = obj.take(indexer, axis=self.axis,
convert=False, is_copy=False)
self.obj = obj
self.grouper = ax
return self.grouper
def _get_binner_for_grouping(self, obj):
""" default to the standard binner here """
group_axis = obj._get_axis(self.axis)
return Grouping(group_axis, None, obj=obj, name=self.key,
level=self.level, sort=self.sort, in_axis=False)
@property
def groups(self):
return self.grouper.groups
class GroupByPlot(PandasObject):
"""
Class implementing the .plot attribute for groupby objects
"""
def __init__(self, groupby):
self._groupby = groupby
def __call__(self, *args, **kwargs):
def f(self):
return self.plot(*args, **kwargs)
f.__name__ = 'plot'
return self._groupby.apply(f)
def __getattr__(self, name):
def attr(*args, **kwargs):
def f(self):
return getattr(self.plot, name)(*args, **kwargs)
return self._groupby.apply(f)
return attr
class _GroupBy(PandasObject, SelectionMixin):
_group_selection = None
_apply_whitelist = frozenset([])
def __init__(self, obj, keys=None, axis=0, level=None,
grouper=None, exclusions=None, selection=None, as_index=True,
sort=True, group_keys=True, squeeze=False, **kwargs):
self._selection = selection
if isinstance(obj, NDFrame):
obj._consolidate_inplace()
self.level = level
if not as_index:
if not isinstance(obj, DataFrame):
raise TypeError('as_index=False only valid with DataFrame')
if axis != 0:
raise ValueError('as_index=False only valid for axis=0')
self.as_index = as_index
self.keys = keys
self.sort = sort
self.group_keys = group_keys
self.squeeze = squeeze
self.mutated = kwargs.pop('mutated', False)
if grouper is None:
grouper, exclusions, obj = _get_grouper(obj, keys,
axis=axis,
level=level,
sort=sort,
mutated=self.mutated)
self.obj = obj
self.axis = obj._get_axis_number(axis)
self.grouper = grouper
self.exclusions = set(exclusions) if exclusions else set()
# we accept no other args
validate_kwargs('group', kwargs, {})
def __len__(self):
return len(self.groups)
def __unicode__(self):
# TODO: Better unicode/repr for GroupBy object
return object.__repr__(self)
def _assure_grouper(self):
"""
we create the grouper on instantiation
sub-classes may have a different policy
"""
pass
@property
def groups(self):
""" dict {group name -> group labels} """
self._assure_grouper()
return self.grouper.groups
@property
def ngroups(self):
self._assure_grouper()
return self.grouper.ngroups
@property
def indices(self):
""" dict {group name -> group indices} """
self._assure_grouper()
return self.grouper.indices
def _get_indices(self, names):
"""
safe get multiple indices, translate keys for
datelike to underlying repr
"""
def get_converter(s):
# possibly convert to the actual key types
# in the indices, could be a Timestamp or a np.datetime64
if isinstance(s, (Timestamp, datetime.datetime)):
return lambda key: Timestamp(key)
elif isinstance(s, np.datetime64):
return lambda key: Timestamp(key).asm8
else:
return lambda key: key
if len(names) == 0:
return []
if len(self.indices) > 0:
index_sample = next(iter(self.indices))
else:
index_sample = None # Dummy sample
name_sample = names[0]
if isinstance(index_sample, tuple):
if not isinstance(name_sample, tuple):
msg = ("must supply a tuple to get_group with multiple"
" grouping keys")
raise ValueError(msg)
if not len(name_sample) == len(index_sample):
try:
# If the original grouper was a tuple
return [self.indices[name] for name in names]
except KeyError:
# turns out it wasn't a tuple
msg = ("must supply a a same-length tuple to get_group"
" with multiple grouping keys")
raise ValueError(msg)
converters = [get_converter(s) for s in index_sample]
names = [tuple([f(n) for f, n in zip(converters, name)])
for name in names]
else:
converter = get_converter(index_sample)
names = [converter(name) for name in names]
return [self.indices.get(name, []) for name in names]
def _get_index(self, name):
""" safe get index, translate keys for datelike to underlying repr """
return self._get_indices([name])[0]
@cache_readonly
def _selected_obj(self):
if self._selection is None or isinstance(self.obj, Series):
if self._group_selection is not None:
return self.obj[self._group_selection]
return self.obj
else:
return self.obj[self._selection]
def _reset_group_selection(self):
"""
Clear group based selection. Used for methods needing to return info on
each group regardless of whether a group selection was previously set.
"""
if self._group_selection is not None:
self._group_selection = None
# GH12839 clear cached selection too when changing group selection
self._reset_cache('_selected_obj')
def _set_group_selection(self):
"""
Create group based selection. Used when selection is not passed
directly but instead via a grouper.
"""
grp = self.grouper
if self.as_index and getattr(grp, 'groupings', None) is not None and \
self.obj.ndim > 1:
ax = self.obj._info_axis
groupers = [g.name for g in grp.groupings
if g.level is None and g.in_axis]
if len(groupers):
self._group_selection = ax.difference(Index(groupers)).tolist()
# GH12839 clear selected obj cache when group selection changes
self._reset_cache('_selected_obj')
def _set_result_index_ordered(self, result):
# set the result index on the passed values object and
# return the new object, xref 8046
# the values/counts are repeated according to the group index
# shortcut if we have an already ordered grouper
if not self.grouper.is_monotonic:
index = Index(np.concatenate(
self._get_indices(self.grouper.result_index)))
result.set_axis(self.axis, index)
result = result.sort_index(axis=self.axis)
result.set_axis(self.axis, self.obj._get_axis(self.axis))
return result
def _dir_additions(self):
return self.obj._dir_additions() | self._apply_whitelist
def __getattr__(self, attr):
if attr in self._internal_names_set:
return object.__getattribute__(self, attr)
if attr in self.obj:
return self[attr]
if hasattr(self.obj, attr):
return self._make_wrapper(attr)
raise AttributeError("%r object has no attribute %r" %
(type(self).__name__, attr))
plot = property(GroupByPlot)
def _make_wrapper(self, name):
if name not in self._apply_whitelist:
is_callable = callable(getattr(self._selected_obj, name, None))
kind = ' callable ' if is_callable else ' '
msg = ("Cannot access{0}attribute {1!r} of {2!r} objects, try "
"using the 'apply' method".format(kind, name,
type(self).__name__))
raise AttributeError(msg)
# need to setup the selection
# as are not passed directly but in the grouper
self._set_group_selection()
f = getattr(self._selected_obj, name)
if not isinstance(f, types.MethodType):
return self.apply(lambda self: getattr(self, name))
f = getattr(type(self._selected_obj), name)
def wrapper(*args, **kwargs):
# a little trickery for aggregation functions that need an axis
# argument
kwargs_with_axis = kwargs.copy()
if 'axis' not in kwargs_with_axis or \
kwargs_with_axis['axis'] is None:
kwargs_with_axis['axis'] = self.axis
def curried_with_axis(x):
return f(x, *args, **kwargs_with_axis)
def curried(x):
return f(x, *args, **kwargs)
# preserve the name so we can detect it when calling plot methods,
# to avoid duplicates
curried.__name__ = curried_with_axis.__name__ = name
# special case otherwise extra plots are created when catching the
# exception below
if name in _plotting_methods:
return self.apply(curried)
try:
return self.apply(curried_with_axis)
except Exception:
try:
return self.apply(curried)
except Exception:
# related to : GH3688
# try item-by-item
# this can be called recursively, so need to raise
# ValueError
# if we don't have this method to indicated to aggregate to
# mark this column as an error
try:
return self._aggregate_item_by_item(name,
*args, **kwargs)
except (AttributeError):
raise ValueError
return wrapper
def get_group(self, name, obj=None):
"""
Constructs NDFrame from group with provided name
Parameters
----------
name : object
the name of the group to get as a DataFrame
obj : NDFrame, default None
the NDFrame to take the DataFrame out of. If
it is None, the object groupby was called on will
be used
Returns
-------
group : type of obj
"""
if obj is None:
obj = self._selected_obj
inds = self._get_index(name)
if not len(inds):
raise KeyError(name)
return obj.take(inds, axis=self.axis, convert=False)
def __iter__(self):
"""
Groupby iterator
Returns
-------
Generator yielding sequence of (name, subsetted object)
for each group
"""
return self.grouper.get_iterator(self.obj, axis=self.axis)
@Substitution(name='groupby')
def apply(self, func, *args, **kwargs):
"""
Apply function and combine results together in an intelligent way. The
split-apply-combine combination rules attempt to be as common sense
based as possible. For example:
case 1:
group DataFrame
apply aggregation function (f(chunk) -> Series)
yield DataFrame, with group axis having group labels
case 2:
group DataFrame
apply transform function ((f(chunk) -> DataFrame with same indexes)
yield DataFrame with resulting chunks glued together
case 3:
group Series
apply function with f(chunk) -> DataFrame
yield DataFrame with result of chunks glued together
Parameters
----------
func : function
Notes
-----
See online documentation for full exposition on how to use apply.
In the current implementation apply calls func twice on the
first group to decide whether it can take a fast or slow code
path. This can lead to unexpected behavior if func has
side-effects, as they will take effect twice for the first
group.
See also
--------
aggregate, transform"""
func = self._is_builtin_func(func)
# this is needed so we don't try and wrap strings. If we could
# resolve functions to their callable functions prior, this
# wouldn't be needed
if args or kwargs:
if callable(func):
@wraps(func)
def f(g):
return func(g, *args, **kwargs)
else:
raise ValueError('func must be a callable if args or '
'kwargs are supplied')
else:
f = func
# ignore SettingWithCopy here in case the user mutates
with option_context('mode.chained_assignment', None):
return self._python_apply_general(f)
def _python_apply_general(self, f):
keys, values, mutated = self.grouper.apply(f, self._selected_obj,
self.axis)
return self._wrap_applied_output(
keys,
values,
not_indexed_same=mutated or self.mutated)
def _iterate_slices(self):
yield self.name, self._selected_obj
def transform(self, func, *args, **kwargs):
raise AbstractMethodError(self)
def _cumcount_array(self, ascending=True):
"""
Parameters
----------
ascending : bool, default True
If False, number in reverse, from length of group - 1 to 0.
Note
----
this is currently implementing sort=False
(though the default is sort=True) for groupby in general
"""
ids, _, ngroups = self.grouper.group_info
sorter = _get_group_index_sorter(ids, ngroups)
ids, count = ids[sorter], len(ids)
if count == 0:
return np.empty(0, dtype=np.int64)
run = np.r_[True, ids[:-1] != ids[1:]]
rep = np.diff(np.r_[np.nonzero(run)[0], count])
out = (~run).cumsum()
if ascending:
out -= np.repeat(out[run], rep)
else:
out = np.repeat(out[np.r_[run[1:], True]], rep) - out
rev = np.empty(count, dtype=np.intp)
rev[sorter] = np.arange(count, dtype=np.intp)
return out[rev].astype(np.int64, copy=False)
def _index_with_as_index(self, b):
"""
Take boolean mask of index to be returned from apply, if as_index=True
"""
# TODO perf, it feels like this should already be somewhere...
from itertools import chain
original = self._selected_obj.index
gp = self.grouper
levels = chain((gp.levels[i][gp.labels[i][b]]
for i in range(len(gp.groupings))),
(original.get_level_values(i)[b]
for i in range(original.nlevels)))
new = MultiIndex.from_arrays(list(levels))
new.names = gp.names + original.names
return new
def _try_cast(self, result, obj):
"""
try to cast the result to our obj original type,
we may have roundtripped thru object in the mean-time
"""
if obj.ndim > 1:
dtype = obj.values.dtype
else:
dtype = obj.dtype
if not is_scalar(result):
result = _possibly_downcast_to_dtype(result, dtype)
return result
def _cython_transform(self, how, numeric_only=True):
output = {}
for name, obj in self._iterate_slices():
is_numeric = is_numeric_dtype(obj.dtype)
if numeric_only and not is_numeric:
continue
try:
result, names = self.grouper.transform(obj.values, how)
except AssertionError as e:
raise GroupByError(str(e))
output[name] = self._try_cast(result, obj)
if len(output) == 0:
raise DataError('No numeric types to aggregate')
return self._wrap_transformed_output(output, names)
def _cython_agg_general(self, how, numeric_only=True):
output = {}
for name, obj in self._iterate_slices():
is_numeric = is_numeric_dtype(obj.dtype)
if numeric_only and not is_numeric:
continue
try:
result, names = self.grouper.aggregate(obj.values, how)
except AssertionError as e:
raise GroupByError(str(e))
output[name] = self._try_cast(result, obj)
if len(output) == 0:
raise DataError('No numeric types to aggregate')
return self._wrap_aggregated_output(output, names)
def _python_agg_general(self, func, *args, **kwargs):
func = self._is_builtin_func(func)
f = lambda x: func(x, *args, **kwargs)
# iterate through "columns" ex exclusions to populate output dict
output = {}
for name, obj in self._iterate_slices():
try:
result, counts = self.grouper.agg_series(obj, f)
output[name] = self._try_cast(result, obj)
except TypeError:
continue
if len(output) == 0:
return self._python_apply_general(f)
if self.grouper._filter_empty_groups:
mask = counts.ravel() > 0
for name, result in compat.iteritems(output):
# since we are masking, make sure that we have a float object
values = result
if is_numeric_dtype(values.dtype):
values = _ensure_float(values)
output[name] = self._try_cast(values[mask], result)
return self._wrap_aggregated_output(output)
def _wrap_applied_output(self, *args, **kwargs):
raise AbstractMethodError(self)
def _concat_objects(self, keys, values, not_indexed_same=False):
from pandas.tools.merge import concat
def reset_identity(values):
# reset the identities of the components
# of the values to prevent aliasing
for v in values:
if v is not None:
ax = v._get_axis(self.axis)
ax._reset_identity()
return values
if not not_indexed_same:
result = concat(values, axis=self.axis)
ax = self._selected_obj._get_axis(self.axis)
if isinstance(result, Series):
result = result.reindex(ax)
else:
result = result.reindex_axis(ax, axis=self.axis)
elif self.group_keys:
values = reset_identity(values)
if self.as_index:
# possible MI return case
group_keys = keys
group_levels = self.grouper.levels
group_names = self.grouper.names
result = concat(values, axis=self.axis, keys=group_keys,
levels=group_levels, names=group_names)
else:
# GH5610, returns a MI, with the first level being a
# range index
keys = list(range(len(values)))
result = concat(values, axis=self.axis, keys=keys)
else:
values = reset_identity(values)
result = concat(values, axis=self.axis)
if (isinstance(result, Series) and
getattr(self, 'name', None) is not None):
result.name = self.name
return result
def _apply_filter(self, indices, dropna):
if len(indices) == 0:
indices = np.array([], dtype='int64')
else:
indices = np.sort(np.concatenate(indices))
if dropna:
filtered = self._selected_obj.take(indices, axis=self.axis)
else:
mask = np.empty(len(self._selected_obj.index), dtype=bool)
mask.fill(False)
mask[indices.astype(int)] = True
# mask fails to broadcast when passed to where; broadcast manually.
mask = np.tile(mask, list(self._selected_obj.shape[1:]) + [1]).T
filtered = self._selected_obj.where(mask) # Fill with NaNs.
return filtered
class GroupBy(_GroupBy):
"""
Class for grouping and aggregating relational data. See aggregate,
transform, and apply functions on this object.
It's easiest to use obj.groupby(...) to use GroupBy, but you can also do:
::
grouped = groupby(obj, ...)
Parameters
----------
obj : pandas object
axis : int, default 0
level : int, default None
Level of MultiIndex
groupings : list of Grouping objects
Most users should ignore this
exclusions : array-like, optional
List of columns to exclude
name : string
Most users should ignore this
Notes
-----
After grouping, see aggregate, apply, and transform functions. Here are
some other brief notes about usage. When grouping by multiple groups, the
result index will be a MultiIndex (hierarchical) by default.
Iteration produces (key, group) tuples, i.e. chunking the data by group. So
you can write code like:
::
grouped = obj.groupby(keys, axis=axis)
for key, group in grouped:
# do something with the data
Function calls on GroupBy, if not specially implemented, "dispatch" to the
grouped data. So if you group a DataFrame and wish to invoke the std()
method on each group, you can simply do:
::
df.groupby(mapper).std()
rather than
::
df.groupby(mapper).aggregate(np.std)
You can pass arguments to these "wrapped" functions, too.
See the online documentation for full exposition on these topics and much
more
Returns
-------
**Attributes**
groups : dict
{group name -> group labels}
len(grouped) : int
Number of groups
"""
_apply_whitelist = _common_apply_whitelist
def irow(self, i):
"""
DEPRECATED. Use ``.nth(i)`` instead
"""
# 10177
warnings.warn("irow(i) is deprecated. Please use .nth(i)",
FutureWarning, stacklevel=2)
return self.nth(i)
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def count(self):
"""Compute count of group, excluding missing values"""
# defined here for API doc
raise NotImplementedError
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def mean(self, *args, **kwargs):
"""
Compute mean of groups, excluding missing values
For multiple groupings, the result index will be a MultiIndex
"""
nv.validate_groupby_func('mean', args, kwargs)
try:
return self._cython_agg_general('mean')
except GroupByError:
raise
except Exception: # pragma: no cover
self._set_group_selection()
f = lambda x: x.mean(axis=self.axis)
return self._python_agg_general(f)
@Substitution(name='groupby')
@Appender(_doc_template)
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def std(self, ddof=1, *args, **kwargs):
"""
Compute standard deviation of groups, excluding missing values
For multiple groupings, the result index will be a MultiIndex
Parameters
----------
ddof : integer, default 1
degrees of freedom
"""
# TODO: implement at Cython level?
nv.validate_groupby_func('std', args, kwargs)
return np.sqrt(self.var(ddof=ddof))
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def var(self, ddof=1, *args, **kwargs):
"""
Compute variance of groups, excluding missing values
For multiple groupings, the result index will be a MultiIndex
Parameters
----------
ddof : integer, default 1
degrees of freedom
"""
nv.validate_groupby_func('var', args, kwargs)
if ddof == 1:
return self._cython_agg_general('var')
else:
self._set_group_selection()
f = lambda x: x.var(ddof=ddof)
return self._python_agg_general(f)
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def sem(self, ddof=1):
"""
Compute standard error of the mean of groups, excluding missing values
For multiple groupings, the result index will be a MultiIndex
Parameters
----------
ddof : integer, default 1
degrees of freedom
"""
return self.std(ddof=ddof) / np.sqrt(self.count())
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def size(self):
"""Compute group sizes"""
return self.grouper.size()
sum = _groupby_function('sum', 'add', np.sum)
prod = _groupby_function('prod', 'prod', np.prod)
min = _groupby_function('min', 'min', np.min, numeric_only=False)
max = _groupby_function('max', 'max', np.max, numeric_only=False)
first = _groupby_function('first', 'first', _first_compat,
numeric_only=False, _convert=True)
last = _groupby_function('last', 'last', _last_compat, numeric_only=False,
_convert=True)
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def ohlc(self):
"""
Compute sum of values, excluding missing values
For multiple groupings, the result index will be a MultiIndex
"""
return self._apply_to_column_groupbys(
lambda x: x._cython_agg_general('ohlc'))
@Substitution(name='groupby')
@Appender(_doc_template)
def resample(self, rule, *args, **kwargs):
"""
Provide resampling when using a TimeGrouper
Return a new grouper with our resampler appended
"""
from pandas.tseries.resample import get_resampler_for_grouping
return get_resampler_for_grouping(self, rule, *args, **kwargs)
@Substitution(name='groupby')
@Appender(_doc_template)
def rolling(self, *args, **kwargs):
"""
Return a rolling grouper, providing rolling
functionaility per group
"""
from pandas.core.window import RollingGroupby
return RollingGroupby(self, *args, **kwargs)
@Substitution(name='groupby')
@Appender(_doc_template)
def expanding(self, *args, **kwargs):
"""
Return an expanding grouper, providing expanding
functionaility per group
"""
from pandas.core.window import ExpandingGroupby
return ExpandingGroupby(self, *args, **kwargs)
@Substitution(name='groupby')
@Appender(_doc_template)
def pad(self, limit=None):
"""
Forward fill the values
Parameters
----------
limit : integer, optional
limit of how many values to fill
See Also
--------
Series.fillna
DataFrame.fillna
"""
return self.apply(lambda x: x.ffill(limit=limit))
ffill = pad
@Substitution(name='groupby')
@Appender(_doc_template)
def backfill(self, limit=None):
"""
Backward fill the values
Parameters
----------
limit : integer, optional
limit of how many values to fill
See Also
--------
Series.fillna
DataFrame.fillna
"""
return self.apply(lambda x: x.bfill(limit=limit))
bfill = backfill
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def nth(self, n, dropna=None):
"""
Take the nth row from each group if n is an int, or a subset of rows
if n is a list of ints.
If dropna, will take the nth non-null row, dropna is either
Truthy (if a Series) or 'all', 'any' (if a DataFrame);
this is equivalent to calling dropna(how=dropna) before the
groupby.
Parameters
----------
n : int or list of ints
a single nth value for the row or a list of nth values
dropna : None or str, optional
apply the specified dropna operation before counting which row is
the nth row. Needs to be None, 'any' or 'all'
Examples
--------
>>> df = pd.DataFrame({'A': [1, 1, 2, 1, 2],
... 'B': [np.nan, 2, 3, 4, 5]}, columns=['A', 'B'])
>>> g = df.groupby('A')
>>> g.nth(0)
B
A
1 NaN
2 3.0
>>> g.nth(1)
B
A
1 2.0
2 5.0
>>> g.nth(-1)
B
A
1 4.0
2 5.0
>>> g.nth([0, 1])
B
A
1 NaN
1 2.0
2 3.0
2 5.0
Specifying ``dropna`` allows count ignoring NaN
>>> g.nth(0, dropna='any')
B
A
1 2.0
2 3.0
NaNs denote group exhausted when using dropna
>>> g.nth(3, dropna='any')
B
A
1 NaN
2 NaN
Specifying ``as_index=False`` in ``groupby`` keeps the original index.
>>> df.groupby('A', as_index=False).nth(1)
A B
1 1 2.0
4 2 5.0
"""
if isinstance(n, int):
nth_values = [n]
elif isinstance(n, (set, list, tuple)):
nth_values = list(set(n))
if dropna is not None:
raise ValueError(
"dropna option with a list of nth values is not supported")
else:
raise TypeError("n needs to be an int or a list/set/tuple of ints")
nth_values = np.array(nth_values, dtype=np.intp)
self._set_group_selection()
if not dropna:
mask = np.in1d(self._cumcount_array(), nth_values) | \
np.in1d(self._cumcount_array(ascending=False) + 1, -nth_values)
out = self._selected_obj[mask]
if not self.as_index:
return out
ids, _, _ = self.grouper.group_info
out.index = self.grouper.result_index[ids[mask]]
return out.sort_index() if self.sort else out
if isinstance(self._selected_obj, DataFrame) and \
dropna not in ['any', 'all']:
# Note: when agg-ing picker doesn't raise this, just returns NaN
raise ValueError("For a DataFrame groupby, dropna must be "
"either None, 'any' or 'all', "
"(was passed %s)." % (dropna),)
# old behaviour, but with all and any support for DataFrames.
# modified in GH 7559 to have better perf
max_len = n if n >= 0 else - 1 - n
dropped = self.obj.dropna(how=dropna, axis=self.axis)
# get a new grouper for our dropped obj
if self.keys is None and self.level is None:
# we don't have the grouper info available
# (e.g. we have selected out
# a column that is not in the current object)
axis = self.grouper.axis
grouper = axis[axis.isin(dropped.index)]
else:
# create a grouper with the original parameters, but on the dropped
# object
grouper, _, _ = _get_grouper(dropped, key=self.keys,
axis=self.axis, level=self.level,
sort=self.sort,
mutated=self.mutated)
grb = dropped.groupby(grouper, as_index=self.as_index, sort=self.sort)
sizes, result = grb.size(), grb.nth(n)
mask = (sizes < max_len).values
# set the results which don't meet the criteria
if len(result) and mask.any():
result.loc[mask] = np.nan
# reset/reindex to the original groups
if len(self.obj) == len(dropped) or \
len(result) == len(self.grouper.result_index):
result.index = self.grouper.result_index
else:
result = result.reindex(self.grouper.result_index)
return result
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def cumcount(self, ascending=True):
"""
Number each item in each group from 0 to the length of that group - 1.
Essentially this is equivalent to
>>> self.apply(lambda x: Series(np.arange(len(x)), x.index))
Parameters
----------
ascending : bool, default True
If False, number in reverse, from length of group - 1 to 0.
Examples
--------
>>> df = pd.DataFrame([['a'], ['a'], ['a'], ['b'], ['b'], ['a']],
... columns=['A'])
>>> df
A
0 a
1 a
2 a
3 b
4 b
5 a
>>> df.groupby('A').cumcount()
0 0
1 1
2 2
3 0
4 1
5 3
dtype: int64
>>> df.groupby('A').cumcount(ascending=False)
0 3
1 2
2 1
3 1
4 0
5 0
dtype: int64
"""
self._set_group_selection()
index = self._selected_obj.index
cumcounts = self._cumcount_array(ascending=ascending)
return Series(cumcounts, index)
@Substitution(name='groupby')
@Appender(_doc_template)
def cumprod(self, axis=0, *args, **kwargs):
"""Cumulative product for each group"""
nv.validate_groupby_func('cumprod', args, kwargs)
if axis != 0:
return self.apply(lambda x: x.cumprod(axis=axis))
return self._cython_transform('cumprod')
@Substitution(name='groupby')
@Appender(_doc_template)
def cumsum(self, axis=0, *args, **kwargs):
"""Cumulative sum for each group"""
nv.validate_groupby_func('cumsum', args, kwargs)
if axis != 0:
return self.apply(lambda x: x.cumprod(axis=axis))
return self._cython_transform('cumsum')
@Substitution(name='groupby')
@Appender(_doc_template)
def shift(self, periods=1, freq=None, axis=0):
"""
Shift each group by periods observations
Parameters
----------
periods : integer, default 1
number of periods to shift
freq : frequency string
axis : axis to shift, default 0
"""
if freq is not None or axis != 0:
return self.apply(lambda x: x.shift(periods, freq, axis))
labels, _, ngroups = self.grouper.group_info
# filled in by Cython
indexer = np.zeros_like(labels)
_algos.group_shift_indexer(indexer, labels, ngroups, periods)
output = {}
for name, obj in self._iterate_slices():
output[name] = algos.take_nd(obj.values, indexer)
return self._wrap_transformed_output(output)
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def head(self, n=5):
"""
Returns first n rows of each group.
Essentially equivalent to ``.apply(lambda x: x.head(n))``,
except ignores as_index flag.
Examples
--------
>>> df = DataFrame([[1, 2], [1, 4], [5, 6]],
columns=['A', 'B'])
>>> df.groupby('A', as_index=False).head(1)
A B
0 1 2
2 5 6
>>> df.groupby('A').head(1)
A B
0 1 2
2 5 6
"""
self._reset_group_selection()
mask = self._cumcount_array() < n
return self._selected_obj[mask]
@Substitution(name='groupby')
@Appender(_doc_template)
[docs] def tail(self, n=5):
"""
Returns last n rows of each group
Essentially equivalent to ``.apply(lambda x: x.tail(n))``,
except ignores as_index flag.
Examples
--------
>>> df = DataFrame([['a', 1], ['a', 2], ['b', 1], ['b', 2]],
columns=['A', 'B'])
>>> df.groupby('A').tail(1)
A B
1 a 2
3 b 2
>>> df.groupby('A').head(1)
A B
0 a 1
2 b 1
"""
self._reset_group_selection()
mask = self._cumcount_array(ascending=False) < n
return self._selected_obj[mask]
@Appender(GroupBy.__doc__)
def groupby(obj, by, **kwds):
if isinstance(obj, Series):
klass = SeriesGroupBy
elif isinstance(obj, DataFrame):
klass = DataFrameGroupBy
else: # pragma: no cover
raise TypeError('invalid type: %s' % type(obj))
return klass(obj, by, **kwds)
def _get_axes(group):
if isinstance(group, Series):
return [group.index]
else:
return group.axes
def _is_indexed_like(obj, axes):
if isinstance(obj, Series):
if len(axes) > 1:
return False
return obj.index.equals(axes[0])
elif isinstance(obj, DataFrame):
return obj.index.equals(axes[0])
return False
class BaseGrouper(object):
"""
This is an internal Grouper class, which actually holds
the generated groups
"""
def __init__(self, axis, groupings, sort=True, group_keys=True,
mutated=False):
self._filter_empty_groups = self.compressed = len(groupings) != 1
self.axis = axis
self.groupings = groupings
self.sort = sort
self.group_keys = group_keys
self.mutated = mutated
@property
def shape(self):
return tuple(ping.ngroups for ping in self.groupings)
def __iter__(self):
return iter(self.indices)
@property
def nkeys(self):
return len(self.groupings)
def get_iterator(self, data, axis=0):
"""
Groupby iterator
Returns
-------
Generator yielding sequence of (name, subsetted object)
for each group
"""
splitter = self._get_splitter(data, axis=axis)
keys = self._get_group_keys()
for key, (i, group) in zip(keys, splitter):
yield key, group
def _get_splitter(self, data, axis=0):
comp_ids, _, ngroups = self.group_info
return get_splitter(data, comp_ids, ngroups, axis=axis)
def _get_group_keys(self):
if len(self.groupings) == 1:
return self.levels[0]
else:
comp_ids, _, ngroups = self.group_info
# provide "flattened" iterator for multi-group setting
mapper = _KeyMapper(comp_ids, ngroups, self.labels, self.levels)
return [mapper.get_key(i) for i in range(ngroups)]
def apply(self, f, data, axis=0):
mutated = self.mutated
splitter = self._get_splitter(data, axis=axis)
group_keys = self._get_group_keys()
# oh boy
f_name = com._get_callable_name(f)
if (f_name not in _plotting_methods and
hasattr(splitter, 'fast_apply') and axis == 0):
try:
values, mutated = splitter.fast_apply(f, group_keys)
return group_keys, values, mutated
except (lib.InvalidApply):
# we detect a mutation of some kind
# so take slow path
pass
except Exception:
# raise this error to the caller
pass
result_values = []
for key, (i, group) in zip(group_keys, splitter):
object.__setattr__(group, 'name', key)
# group might be modified
group_axes = _get_axes(group)
res = f(group)
if not _is_indexed_like(res, group_axes):
mutated = True
result_values.append(res)
return group_keys, result_values, mutated
@cache_readonly
def indices(self):
""" dict {group name -> group indices} """
if len(self.groupings) == 1:
return self.groupings[0].indices
else:
label_list = [ping.labels for ping in self.groupings]
keys = [_values_from_object(ping.group_index)
for ping in self.groupings]
return _get_indices_dict(label_list, keys)
@property
def labels(self):
return [ping.labels for ping in self.groupings]
@property
def levels(self):
return [ping.group_index for ping in self.groupings]
@property
def names(self):
return [ping.name for ping in self.groupings]
def size(self):
"""
Compute group sizes
"""
ids, _, ngroup = self.group_info
ids = _ensure_platform_int(ids)
out = np.bincount(ids[ids != -1], minlength=ngroup or None)
return Series(out, index=self.result_index, dtype='int64')
@cache_readonly
def _max_groupsize(self):
"""
Compute size of largest group
"""
# For many items in each group this is much faster than
# self.size().max(), in worst case marginally slower
if self.indices:
return max(len(v) for v in self.indices.values())
else:
return 0
@cache_readonly
def groups(self):
""" dict {group name -> group labels} """
if len(self.groupings) == 1:
return self.groupings[0].groups
else:
to_groupby = lzip(*(ping.grouper for ping in self.groupings))
to_groupby = Index(to_groupby)
return self.axis.groupby(to_groupby.values)
@cache_readonly
def is_monotonic(self):
# return if my group orderings are monotonic
return Index(self.group_info[0]).is_monotonic
@cache_readonly
def group_info(self):
comp_ids, obs_group_ids = self._get_compressed_labels()
ngroups = len(obs_group_ids)
comp_ids = _ensure_int64(comp_ids)
return comp_ids, obs_group_ids, ngroups
def _get_compressed_labels(self):
all_labels = [ping.labels for ping in self.groupings]
if len(all_labels) > 1:
group_index = get_group_index(all_labels, self.shape,
sort=True, xnull=True)
return _compress_group_index(group_index, sort=self.sort)
ping = self.groupings[0]
return ping.labels, np.arange(len(ping.group_index))
@cache_readonly
def ngroups(self):
return len(self.result_index)
@property
def recons_labels(self):
comp_ids, obs_ids, _ = self.group_info
labels = (ping.labels for ping in self.groupings)
return decons_obs_group_ids(comp_ids,
obs_ids, self.shape, labels, xnull=True)
@cache_readonly
def result_index(self):
if not self.compressed and len(self.groupings) == 1:
return self.groupings[0].group_index.rename(self.names[0])
return MultiIndex(levels=[ping.group_index for ping in self.groupings],
labels=self.recons_labels,
verify_integrity=False,
names=self.names)
def get_group_levels(self):
if not self.compressed and len(self.groupings) == 1:
return [self.groupings[0].group_index]
name_list = []
for ping, labels in zip(self.groupings, self.recons_labels):
labels = _ensure_platform_int(labels)
levels = ping.group_index.take(labels)
name_list.append(levels)
return name_list
# ------------------------------------------------------------
# Aggregation functions
_cython_functions = {
'aggregate': {
'add': 'group_add',
'prod': 'group_prod',
'min': 'group_min',
'max': 'group_max',
'mean': 'group_mean',
'median': {
'name': 'group_median'
},
'var': 'group_var',
'first': {
'name': 'group_nth',
'f': lambda func, a, b, c, d: func(a, b, c, d, 1)
},
'last': 'group_last',
'ohlc': 'group_ohlc',
},
'transform': {
'cumprod': 'group_cumprod',
'cumsum': 'group_cumsum',
}
}
_cython_arity = {
'ohlc': 4, # OHLC
}
_name_functions = {
'ohlc': lambda *args: ['open', 'high', 'low', 'close']
}
def _get_cython_function(self, kind, how, values, is_numeric):
dtype_str = values.dtype.name
def get_func(fname):
# see if there is a fused-type version of function
# only valid for numeric
f = getattr(_algos, fname, None)
if f is not None and is_numeric:
return f
# otherwise find dtype-specific version, falling back to object
for dt in [dtype_str, 'object']:
f = getattr(_algos, "%s_%s" % (fname, dtype_str), None)
if f is not None:
return f
ftype = self._cython_functions[kind][how]
if isinstance(ftype, dict):
func = afunc = get_func(ftype['name'])
# a sub-function
f = ftype.get('f')
if f is not None:
def wrapper(*args, **kwargs):
return f(afunc, *args, **kwargs)
# need to curry our sub-function
func = wrapper
else:
func = get_func(ftype)
if func is None:
raise NotImplementedError("function is not implemented for this"
"dtype: [how->%s,dtype->%s]" %
(how, dtype_str))
return func, dtype_str
def _cython_operation(self, kind, values, how, axis):
assert kind in ['transform', 'aggregate']
arity = self._cython_arity.get(how, 1)
vdim = values.ndim
swapped = False
if vdim == 1:
values = values[:, None]
out_shape = (self.ngroups, arity)
else:
if axis > 0:
swapped = True
values = values.swapaxes(0, axis)
if arity > 1:
raise NotImplementedError("arity of more than 1 is not "
"supported for the 'how' argument")
out_shape = (self.ngroups,) + values.shape[1:]
is_numeric = is_numeric_dtype(values.dtype)
if is_datetime_or_timedelta_dtype(values.dtype):
values = values.view('int64')
is_numeric = True
elif is_bool_dtype(values.dtype):
values = _ensure_float64(values)
elif is_integer_dtype(values):
values = values.astype('int64', copy=False)
elif is_numeric and not is_complex_dtype(values):
values = _ensure_float64(values)
else:
values = values.astype(object)
try:
func, dtype_str = self._get_cython_function(
kind, how, values, is_numeric)
except NotImplementedError:
if is_numeric:
values = _ensure_float64(values)
func, dtype_str = self._get_cython_function(
kind, how, values, is_numeric)
else:
raise
if is_numeric:
out_dtype = '%s%d' % (values.dtype.kind, values.dtype.itemsize)
else:
out_dtype = 'object'
labels, _, _ = self.group_info
if kind == 'aggregate':
result = _maybe_fill(np.empty(out_shape, dtype=out_dtype),
fill_value=np.nan)
counts = np.zeros(self.ngroups, dtype=np.int64)
result = self._aggregate(
result, counts, values, labels, func, is_numeric)
elif kind == 'transform':
result = _maybe_fill(np.empty_like(values, dtype=out_dtype),
fill_value=np.nan)
# temporary storange for running-total type tranforms
accum = np.empty(out_shape, dtype=out_dtype)
result = self._transform(
result, accum, values, labels, func, is_numeric)
if is_integer_dtype(result):
if len(result[result == tslib.iNaT]) > 0:
result = result.astype('float64')
result[result == tslib.iNaT] = np.nan
if kind == 'aggregate' and \
self._filter_empty_groups and not counts.all():
if result.ndim == 2:
try:
result = lib.row_bool_subset(
result, (counts > 0).view(np.uint8))
except ValueError:
result = lib.row_bool_subset_object(
_ensure_object(result),
(counts > 0).view(np.uint8))
else:
result = result[counts > 0]
if vdim == 1 and arity == 1:
result = result[:, 0]
if how in self._name_functions:
# TODO
names = self._name_functions[how]()
else:
names = None
if swapped:
result = result.swapaxes(0, axis)
return result, names
def aggregate(self, values, how, axis=0):
return self._cython_operation('aggregate', values, how, axis)
def transform(self, values, how, axis=0):
return self._cython_operation('transform', values, how, axis)
def _aggregate(self, result, counts, values, comp_ids, agg_func,
is_numeric):
if values.ndim > 3:
# punting for now
raise NotImplementedError("number of dimensions is currently "
"limited to 3")
elif values.ndim > 2:
for i, chunk in enumerate(values.transpose(2, 0, 1)):
chunk = chunk.squeeze()
agg_func(result[:, :, i], counts, chunk, comp_ids)
else:
agg_func(result, counts, values, comp_ids)
return result
def _transform(self, result, accum, values, comp_ids, transform_func,
is_numeric):
comp_ids, _, ngroups = self.group_info
if values.ndim > 3:
# punting for now
raise NotImplementedError("number of dimensions is currently "
"limited to 3")
elif values.ndim > 2:
for i, chunk in enumerate(values.transpose(2, 0, 1)):
chunk = chunk.squeeze()
transform_func(result[:, :, i], values,
comp_ids, accum)
else:
transform_func(result, values, comp_ids, accum)
return result
def agg_series(self, obj, func):
try:
return self._aggregate_series_fast(obj, func)
except Exception:
return self._aggregate_series_pure_python(obj, func)
def _aggregate_series_fast(self, obj, func):
func = self._is_builtin_func(func)
if obj.index._has_complex_internals:
raise TypeError('Incompatible index for Cython grouper')
group_index, _, ngroups = self.group_info
# avoids object / Series creation overhead
dummy = obj._get_values(slice(None, 0)).to_dense()
indexer = _get_group_index_sorter(group_index, ngroups)
obj = obj.take(indexer, convert=False)
group_index = algos.take_nd(group_index, indexer, allow_fill=False)
grouper = lib.SeriesGrouper(obj, func, group_index, ngroups,
dummy)
result, counts = grouper.get_result()
return result, counts
def _aggregate_series_pure_python(self, obj, func):
group_index, _, ngroups = self.group_info
counts = np.zeros(ngroups, dtype=int)
result = None
splitter = get_splitter(obj, group_index, ngroups, axis=self.axis)
for label, group in splitter:
res = func(group)
if result is None:
if (isinstance(res, (Series, Index, np.ndarray)) or
isinstance(res, list)):
raise ValueError('Function does not reduce')
result = np.empty(ngroups, dtype='O')
counts[label] = group.shape[0]
result[label] = res
result = lib.maybe_convert_objects(result, try_float=0)
return result, counts
def generate_bins_generic(values, binner, closed):
"""
Generate bin edge offsets and bin labels for one array using another array
which has bin edge values. Both arrays must be sorted.
Parameters
----------
values : array of values
binner : a comparable array of values representing bins into which to bin
the first array. Note, 'values' end-points must fall within 'binner'
end-points.
closed : which end of bin is closed; left (default), right
Returns
-------
bins : array of offsets (into 'values' argument) of bins.
Zero and last edge are excluded in result, so for instance the first
bin is values[0:bin[0]] and the last is values[bin[-1]:]
"""
lenidx = len(values)
lenbin = len(binner)
if lenidx <= 0 or lenbin <= 0:
raise ValueError("Invalid length for values or for binner")
# check binner fits data
if values[0] < binner[0]:
raise ValueError("Values falls before first bin")
if values[lenidx - 1] > binner[lenbin - 1]:
raise ValueError("Values falls after last bin")
bins = np.empty(lenbin - 1, dtype=np.int64)
j = 0 # index into values
bc = 0 # bin count
# linear scan, presume nothing about values/binner except that it fits ok
for i in range(0, lenbin - 1):
r_bin = binner[i + 1]
# count values in current bin, advance to next bin
while j < lenidx and (values[j] < r_bin or
(closed == 'right' and values[j] == r_bin)):
j += 1
bins[bc] = j
bc += 1
return bins
class BinGrouper(BaseGrouper):
def __init__(self, bins, binlabels, filter_empty=False, mutated=False):
self.bins = _ensure_int64(bins)
self.binlabels = _ensure_index(binlabels)
self._filter_empty_groups = filter_empty
self.mutated = mutated
@cache_readonly
def groups(self):
""" dict {group name -> group labels} """
# this is mainly for compat
# GH 3881
result = {}
for key, value in zip(self.binlabels, self.bins):
if key is not tslib.NaT:
result[key] = value
return result
@property
def nkeys(self):
return 1
def get_iterator(self, data, axis=0):
"""
Groupby iterator
Returns
-------
Generator yielding sequence of (name, subsetted object)
for each group
"""
if isinstance(data, NDFrame):
slicer = lambda start, edge: data._slice(
slice(start, edge), axis=axis)
length = len(data.axes[axis])
else:
slicer = lambda start, edge: data[slice(start, edge)]
length = len(data)
start = 0
for edge, label in zip(self.bins, self.binlabels):
if label is not tslib.NaT:
yield label, slicer(start, edge)
start = edge
if start < length:
yield self.binlabels[-1], slicer(start, None)
@cache_readonly
def indices(self):
indices = collections.defaultdict(list)
i = 0
for label, bin in zip(self.binlabels, self.bins):
if i < bin:
if label is not tslib.NaT:
indices[label] = list(range(i, bin))
i = bin
return indices
@cache_readonly
def group_info(self):
ngroups = self.ngroups
obs_group_ids = np.arange(ngroups)
rep = np.diff(np.r_[0, self.bins])
rep = _ensure_platform_int(rep)
if ngroups == len(self.bins):
comp_ids = np.repeat(np.arange(ngroups), rep)
else:
comp_ids = np.repeat(np.r_[-1, np.arange(ngroups)], rep)
return comp_ids.astype('int64', copy=False), \
obs_group_ids.astype('int64', copy=False), ngroups
@cache_readonly
def ngroups(self):
return len(self.result_index)
@cache_readonly
def result_index(self):
if len(self.binlabels) != 0 and isnull(self.binlabels[0]):
return self.binlabels[1:]
return self.binlabels
@property
def levels(self):
return [self.binlabels]
@property
def names(self):
return [self.binlabels.name]
@property
def groupings(self):
return [Grouping(lvl, lvl, in_axis=False, level=None, name=name)
for lvl, name in zip(self.levels, self.names)]
def agg_series(self, obj, func):
dummy = obj[:0]
grouper = lib.SeriesBinGrouper(obj, func, self.bins, dummy)
return grouper.get_result()
# ----------------------------------------------------------------------
# cython aggregation
_cython_functions = copy.deepcopy(BaseGrouper._cython_functions)
_cython_functions['aggregate'].pop('median')
class Grouping(object):
"""
Holds the grouping information for a single key
Parameters
----------
index : Index
grouper :
obj :
name :
level :
in_axis : if the Grouping is a column in self.obj and hence among
Groupby.exclusions list
Returns
-------
**Attributes**:
* indices : dict of {group -> index_list}
* labels : ndarray, group labels
* ids : mapping of label -> group
* counts : array of group counts
* group_index : unique groups
* groups : dict of {group -> label_list}
"""
def __init__(self, index, grouper=None, obj=None, name=None, level=None,
sort=True, in_axis=False):
self.name = name
self.level = level
self.grouper = _convert_grouper(index, grouper)
self.index = index
self.sort = sort
self.obj = obj
self.in_axis = in_axis
# right place for this?
if isinstance(grouper, (Series, Index)) and name is None:
self.name = grouper.name
if isinstance(grouper, MultiIndex):
self.grouper = grouper.values
# pre-computed
self._should_compress = True
# we have a single grouper which may be a myriad of things,
# some of which are dependent on the passing in level
if level is not None:
if not isinstance(level, int):
if level not in index.names:
raise AssertionError('Level %s not in index' % str(level))
level = index.names.index(level)
inds = index.labels[level]
level_index = index.levels[level]
if self.name is None:
self.name = index.names[level]
# XXX complete hack
if grouper is not None:
level_values = index.levels[level].take(inds)
self.grouper = level_values.map(self.grouper)
else:
# all levels may not be observed
labels, uniques = algos.factorize(inds, sort=True)
if len(uniques) > 0 and uniques[0] == -1:
# handle NAs
mask = inds != -1
ok_labels, uniques = algos.factorize(inds[mask], sort=True)
labels = np.empty(len(inds), dtype=inds.dtype)
labels[mask] = ok_labels
labels[~mask] = -1
if len(uniques) < len(level_index):
level_index = level_index.take(uniques)
self._labels = labels
self._group_index = level_index
self.grouper = level_index.take(labels)
else:
if isinstance(self.grouper, (list, tuple)):
self.grouper = com._asarray_tuplesafe(self.grouper)
# a passed Categorical
elif is_categorical_dtype(self.grouper):
# must have an ordered categorical
if self.sort:
if not self.grouper.ordered:
# technically we cannot group on an unordered
# Categorical
# but this a user convenience to do so; the ordering
# is preserved and if it's a reduction it doesn't make
# any difference
pass
# fix bug #GH8868 sort=False being ignored in categorical
# groupby
else:
cat = self.grouper.unique()
self.grouper = self.grouper.reorder_categories(
cat.categories)
# we make a CategoricalIndex out of the cat grouper
# preserving the categories / ordered attributes
self._labels = self.grouper.codes
c = self.grouper.categories
self._group_index = CategoricalIndex(
Categorical.from_codes(np.arange(len(c)),
categories=c,
ordered=self.grouper.ordered))
# a passed Grouper like
elif isinstance(self.grouper, Grouper):
# get the new grouper
grouper = self.grouper._get_binner_for_grouping(self.obj)
self.obj = self.grouper.obj
self.grouper = grouper
if self.name is None:
self.name = grouper.name
# we are done
if isinstance(self.grouper, Grouping):
self.grouper = self.grouper.grouper
# no level passed
elif not isinstance(self.grouper,
(Series, Index, Categorical, np.ndarray)):
if getattr(self.grouper, 'ndim', 1) != 1:
t = self.name or str(type(self.grouper))
raise ValueError("Grouper for '%s' not 1-dimensional" % t)
self.grouper = self.index.map(self.grouper)
if not (hasattr(self.grouper, "__len__") and
len(self.grouper) == len(self.index)):
errmsg = ('Grouper result violates len(labels) == '
'len(data)\nresult: %s' %
pprint_thing(self.grouper))
self.grouper = None # Try for sanity
raise AssertionError(errmsg)
# if we have a date/time-like grouper, make sure that we have
# Timestamps like
if getattr(self.grouper, 'dtype', None) is not None:
if is_datetime64_dtype(self.grouper):
from pandas import to_datetime
self.grouper = to_datetime(self.grouper)
elif is_timedelta64_dtype(self.grouper):
from pandas import to_timedelta
self.grouper = to_timedelta(self.grouper)
def __repr__(self):
return 'Grouping({0})'.format(self.name)
def __iter__(self):
return iter(self.indices)
_labels = None
_group_index = None
@property
def ngroups(self):
return len(self.group_index)
@cache_readonly
def indices(self):
return _groupby_indices(self.grouper)
@property
def labels(self):
if self._labels is None:
self._make_labels()
return self._labels
@property
def group_index(self):
if self._group_index is None:
self._make_labels()
return self._group_index
def _make_labels(self):
if self._labels is None or self._group_index is None:
labels, uniques = algos.factorize(self.grouper, sort=self.sort)
uniques = Index(uniques, name=self.name)
self._labels = labels
self._group_index = uniques
@cache_readonly
def groups(self):
return self.index.groupby(self.grouper)
def _get_grouper(obj, key=None, axis=0, level=None, sort=True,
mutated=False):
"""
create and return a BaseGrouper, which is an internal
mapping of how to create the grouper indexers.
This may be composed of multiple Grouping objects, indicating
multiple groupers
Groupers are ultimately index mappings. They can originate as:
index mappings, keys to columns, functions, or Groupers
Groupers enable local references to axis,level,sort, while
the passed in axis, level, and sort are 'global'.
This routine tries to figure out what the passing in references
are and then creates a Grouping for each one, combined into
a BaseGrouper.
"""
group_axis = obj._get_axis(axis)
# validate that the passed level is compatible with the passed
# axis of the object
if level is not None:
if not isinstance(group_axis, MultiIndex):
if isinstance(level, compat.string_types):
if obj.index.name != level:
raise ValueError('level name %s is not the name of the '
'index' % level)
elif level > 0:
raise ValueError('level > 0 only valid with MultiIndex')
level = None
key = group_axis
# a passed-in Grouper, directly convert
if isinstance(key, Grouper):
binner, grouper, obj = key._get_grouper(obj)
if key.key is None:
return grouper, [], obj
else:
return grouper, set([key.key]), obj
# already have a BaseGrouper, just return it
elif isinstance(key, BaseGrouper):
return key, [], obj
if not isinstance(key, (tuple, list)):
keys = [key]
match_axis_length = False
else:
keys = key
match_axis_length = len(keys) == len(group_axis)
# what are we after, exactly?
any_callable = any(callable(g) or isinstance(g, dict) for g in keys)
any_groupers = any(isinstance(g, Grouper) for g in keys)
any_arraylike = any(isinstance(g, (list, tuple, Series, Index, np.ndarray))
for g in keys)
try:
if isinstance(obj, DataFrame):
all_in_columns = all(g in obj.columns for g in keys)
else:
all_in_columns = False
except Exception:
all_in_columns = False
if not any_callable and not all_in_columns and \
not any_arraylike and not any_groupers and \
match_axis_length and level is None:
keys = [com._asarray_tuplesafe(keys)]
if isinstance(level, (tuple, list)):
if key is None:
keys = [None] * len(level)
levels = level
else:
levels = [level] * len(keys)
groupings = []
exclusions = []
# if the actual grouper should be obj[key]
def is_in_axis(key):
if not _is_label_like(key):
try:
obj._data.items.get_loc(key)
except Exception:
return False
return True
# if the the grouper is obj[name]
def is_in_obj(gpr):
try:
return id(gpr) == id(obj[gpr.name])
except Exception:
return False
for i, (gpr, level) in enumerate(zip(keys, levels)):
if is_in_obj(gpr): # df.groupby(df['name'])
in_axis, name = True, gpr.name
exclusions.append(name)
elif is_in_axis(gpr): # df.groupby('name')
in_axis, name, gpr = True, gpr, obj[gpr]
exclusions.append(name)
else:
in_axis, name = False, None
if is_categorical_dtype(gpr) and len(gpr) != len(obj):
raise ValueError("Categorical dtype grouper must "
"have len(grouper) == len(data)")
# create the Grouping
# allow us to passing the actual Grouping as the gpr
ping = Grouping(group_axis,
gpr,
obj=obj,
name=name,
level=level,
sort=sort,
in_axis=in_axis) \
if not isinstance(gpr, Grouping) else gpr
groupings.append(ping)
if len(groupings) == 0:
raise ValueError('No group keys passed!')
# create the internals grouper
grouper = BaseGrouper(group_axis, groupings, sort=sort, mutated=mutated)
return grouper, exclusions, obj
def _is_label_like(val):
return (isinstance(val, compat.string_types) or
(val is not None and is_scalar(val)))
def _convert_grouper(axis, grouper):
if isinstance(grouper, dict):
return grouper.get
elif isinstance(grouper, Series):
if grouper.index.equals(axis):
return grouper._values
else:
return grouper.reindex(axis)._values
elif isinstance(grouper, (list, Series, Index, np.ndarray)):
if len(grouper) != len(axis):
raise AssertionError('Grouper and axis must be same length')
return grouper
else:
return grouper
def _whitelist_method_generator(klass, whitelist):
"""
Yields all GroupBy member defs for DataFrame/Series names in _whitelist.
Parameters
----------
klass - class where members are defined. Should be Series or DataFrame
whitelist - list of names of klass methods to be constructed
Returns
-------
The generator yields a sequence of strings, each suitable for exec'ing,
that define implementations of the named methods for DataFrameGroupBy
or SeriesGroupBy.
Since we don't want to override methods explicitly defined in the
base class, any such name is skipped.
"""
method_wrapper_template = \
"""def %(name)s(%(sig)s) :
\"""
%(doc)s
\"""
f = %(self)s.__getattr__('%(name)s')
return f(%(args)s)"""
property_wrapper_template = \
"""@property
def %(name)s(self) :
\"""
%(doc)s
\"""
return self.__getattr__('%(name)s')"""
for name in whitelist:
# don't override anything that was explicitly defined
# in the base class
if hasattr(GroupBy, name):
continue
# ugly, but we need the name string itself in the method.
f = getattr(klass, name)
doc = f.__doc__
doc = doc if type(doc) == str else ''
if isinstance(f, types.MethodType):
wrapper_template = method_wrapper_template
decl, args = make_signature(f)
# pass args by name to f because otherwise
# GroupBy._make_wrapper won't know whether
# we passed in an axis parameter.
args_by_name = ['{0}={0}'.format(arg) for arg in args[1:]]
params = {'name': name,
'doc': doc,
'sig': ','.join(decl),
'self': args[0],
'args': ','.join(args_by_name)}
else:
wrapper_template = property_wrapper_template
params = {'name': name, 'doc': doc}
yield wrapper_template % params
class SeriesGroupBy(GroupBy):
#
# Make class defs of attributes on SeriesGroupBy whitelist
_apply_whitelist = _series_apply_whitelist
for _def_str in _whitelist_method_generator(Series,
_series_apply_whitelist):
exec(_def_str)
@property
def name(self):
"""
since we are a series, we by definition only have
a single name, but may be the result of a selection or
the name of our object
"""
if self._selection is None:
return self.obj.name
else:
return self._selection
def aggregate(self, func_or_funcs, *args, **kwargs):
"""
Apply aggregation function or functions to groups, yielding most likely
Series but in some cases DataFrame depending on the output of the
aggregation function
Parameters
----------
func_or_funcs : function or list / dict of functions
List/dict of functions will produce DataFrame with column names
determined by the function names themselves (list) or the keys in
the dict
Notes
-----
agg is an alias for aggregate. Use it.
Examples
--------
>>> series
bar 1.0
baz 2.0
qot 3.0
qux 4.0
>>> mapper = lambda x: x[0] # first letter
>>> grouped = series.groupby(mapper)
>>> grouped.aggregate(np.sum)
b 3.0
q 7.0
>>> grouped.aggregate([np.sum, np.mean, np.std])
mean std sum
b 1.5 0.5 3
q 3.5 0.5 7
>>> grouped.agg({'result' : lambda x: x.mean() / x.std(),
... 'total' : np.sum})
result total
b 2.121 3
q 4.95 7
See also
--------
apply, transform
Returns
-------
Series or DataFrame
"""
_level = kwargs.pop('_level', None)
if isinstance(func_or_funcs, compat.string_types):
return getattr(self, func_or_funcs)(*args, **kwargs)
if hasattr(func_or_funcs, '__iter__'):
ret = self._aggregate_multiple_funcs(func_or_funcs,
(_level or 0) + 1)
else:
cyfunc = self._is_cython_func(func_or_funcs)
if cyfunc and not args and not kwargs:
return getattr(self, cyfunc)()
if self.grouper.nkeys > 1:
return self._python_agg_general(func_or_funcs, *args, **kwargs)
try:
return self._python_agg_general(func_or_funcs, *args, **kwargs)
except Exception:
result = self._aggregate_named(func_or_funcs, *args, **kwargs)
index = Index(sorted(result), name=self.grouper.names[0])
ret = Series(result, index=index)
if not self.as_index: # pragma: no cover
print('Warning, ignoring as_index=True')
# _level handled at higher
if not _level and isinstance(ret, dict):
from pandas import concat
ret = concat(ret, axis=1)
return ret
agg = aggregate
def _aggregate_multiple_funcs(self, arg, _level):
if isinstance(arg, dict):
columns = list(arg.keys())
arg = list(arg.items())
elif any(isinstance(x, (tuple, list)) for x in arg):
arg = [(x, x) if not isinstance(x, (tuple, list)) else x
for x in arg]
# indicated column order
columns = lzip(*arg)[0]
else:
# list of functions / function names
columns = []
for f in arg:
if isinstance(f, compat.string_types):
columns.append(f)
else:
# protect against callables without names
columns.append(com._get_callable_name(f))
arg = lzip(columns, arg)
results = {}
for name, func in arg:
obj = self
if name in results:
raise SpecificationError('Function names must be unique, '
'found multiple named %s' % name)
# reset the cache so that we
# only include the named selection
if name in self._selected_obj:
obj = copy.copy(obj)
obj._reset_cache()
obj._selection = name
results[name] = obj.aggregate(func)
if isinstance(list(compat.itervalues(results))[0],
DataFrame):
# let higher level handle
if _level:
return results
return list(compat.itervalues(results))[0]
return DataFrame(results, columns=columns)
def _wrap_output(self, output, index, names=None):
""" common agg/transform wrapping logic """
output = output[self.name]
if names is not None:
return DataFrame(output, index=index, columns=names)
else:
name = self.name
if name is None:
name = self._selected_obj.name
return Series(output, index=index, name=name)
def _wrap_aggregated_output(self, output, names=None):
return self._wrap_output(output=output,
index=self.grouper.result_index,
names=names)
def _wrap_transformed_output(self, output, names=None):
return self._wrap_output(output=output,
index=self.obj.index,
names=names)
def _wrap_applied_output(self, keys, values, not_indexed_same=False):
if len(keys) == 0:
# GH #6265
return Series([], name=self.name, index=keys)
def _get_index():
if self.grouper.nkeys > 1:
index = MultiIndex.from_tuples(keys, names=self.grouper.names)
else:
index = Index(keys, name=self.grouper.names[0])
return index
if isinstance(values[0], dict):
# GH #823
index = _get_index()
result = DataFrame(values, index=index).stack()
result.name = self.name
return result
if isinstance(values[0], (Series, dict)):
return self._concat_objects(keys, values,
not_indexed_same=not_indexed_same)
elif isinstance(values[0], DataFrame):
# possible that Series -> DataFrame by applied function
return self._concat_objects(keys, values,
not_indexed_same=not_indexed_same)
else:
# GH #6265
return Series(values, index=_get_index(), name=self.name)
def _aggregate_named(self, func, *args, **kwargs):
result = {}
for name, group in self:
group.name = name
output = func(group, *args, **kwargs)
if isinstance(output, (Series, Index, np.ndarray)):
raise Exception('Must produce aggregated value')
result[name] = self._try_cast(output, group)
return result
def transform(self, func, *args, **kwargs):
"""
Call function producing a like-indexed Series on each group and return
a Series with the transformed values
Parameters
----------
func : function
To apply to each group. Should return a Series with the same index
Examples
--------
>>> grouped.transform(lambda x: (x - x.mean()) / x.std())
Returns
-------
transformed : Series
"""
func = self._is_cython_func(func) or func
# if string function
if isinstance(func, compat.string_types):
if func in _cython_transforms:
# cythonized transform
return getattr(self, func)(*args, **kwargs)
else:
# cythonized aggregation and merge
return self._transform_fast(
lambda: getattr(self, func)(*args, **kwargs))
# reg transform
dtype = self._selected_obj.dtype
result = self._selected_obj.values.copy()
wrapper = lambda x: func(x, *args, **kwargs)
for i, (name, group) in enumerate(self):
object.__setattr__(group, 'name', name)
res = wrapper(group)
if hasattr(res, 'values'):
res = res.values
# may need to astype
try:
common_type = np.common_type(np.array(res), result)
if common_type != result.dtype:
result = result.astype(common_type)
except:
pass
indexer = self._get_index(name)
result[indexer] = res
result = _possibly_downcast_to_dtype(result, dtype)
return self._selected_obj.__class__(result,
index=self._selected_obj.index,
name=self._selected_obj.name)
def _transform_fast(self, func):
"""
fast version of transform, only applicable to
builtin/cythonizable functions
"""
if isinstance(func, compat.string_types):
func = getattr(self, func)
ids, _, ngroup = self.grouper.group_info
cast = (self.size().fillna(0) > 0).any()
out = algos.take_1d(func().values, ids)
if cast:
out = self._try_cast(out, self.obj)
return Series(out, index=self.obj.index, name=self.obj.name)
def filter(self, func, dropna=True, *args, **kwargs): # noqa
"""
Return a copy of a Series excluding elements from groups that
do not satisfy the boolean criterion specified by func.
Parameters
----------
func : function
To apply to each group. Should return True or False.
dropna : Drop groups that do not pass the filter. True by default;
if False, groups that evaluate False are filled with NaNs.
Examples
--------
>>> grouped.filter(lambda x: x.mean() > 0)
Returns
-------
filtered : Series
"""
if isinstance(func, compat.string_types):
wrapper = lambda x: getattr(x, func)(*args, **kwargs)
else:
wrapper = lambda x: func(x, *args, **kwargs)
# Interpret np.nan as False.
def true_and_notnull(x, *args, **kwargs):
b = wrapper(x, *args, **kwargs)
return b and notnull(b)
try:
indices = [self._get_index(name) for name, group in self
if true_and_notnull(group)]
except ValueError:
raise TypeError("the filter must return a boolean result")
except TypeError:
raise TypeError("the filter must return a boolean result")
filtered = self._apply_filter(indices, dropna)
return filtered
[docs] def nunique(self, dropna=True):
""" Returns number of unique elements in the group """
ids, _, _ = self.grouper.group_info
val = self.obj.get_values()
try:
sorter = np.lexsort((val, ids))
except TypeError: # catches object dtypes
assert val.dtype == object, \
'val.dtype must be object, got %s' % val.dtype
val, _ = algos.factorize(val, sort=False)
sorter = np.lexsort((val, ids))
_isnull = lambda a: a == -1
else:
_isnull = isnull
ids, val = ids[sorter], val[sorter]
# group boundaries are where group ids change
# unique observations are where sorted values change
idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]]
inc = np.r_[1, val[1:] != val[:-1]]
# 1st item of each group is a new unique observation
mask = _isnull(val)
if dropna:
inc[idx] = 1
inc[mask] = 0
else:
inc[mask & np.r_[False, mask[:-1]]] = 0
inc[idx] = 1
out = np.add.reduceat(inc, idx).astype('int64', copy=False)
res = out if ids[0] != -1 else out[1:]
ri = self.grouper.result_index
# we might have duplications among the bins
if len(res) != len(ri):
res, out = np.zeros(len(ri), dtype=out.dtype), res
res[ids] = out
return Series(res,
index=ri,
name=self.name)
@deprecate_kwarg('take_last', 'keep',
mapping={True: 'last', False: 'first'})
@Appender(Series.nlargest.__doc__)
[docs] def nlargest(self, n=5, keep='first'):
# ToDo: When we remove deprecate_kwargs, we can remote these methods
# and include nlargest and nsmallest to _series_apply_whitelist
return self.apply(lambda x: x.nlargest(n=n, keep=keep))
@deprecate_kwarg('take_last', 'keep',
mapping={True: 'last', False: 'first'})
@Appender(Series.nsmallest.__doc__)
[docs] def nsmallest(self, n=5, keep='first'):
return self.apply(lambda x: x.nsmallest(n=n, keep=keep))
[docs] def value_counts(self, normalize=False, sort=True, ascending=False,
bins=None, dropna=True):
from functools import partial
from pandas.tools.tile import cut
from pandas.tools.merge import _get_join_indexers
if bins is not None and not np.iterable(bins):
# scalar bins cannot be done at top level
# in a backward compatible way
return self.apply(Series.value_counts,
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins)
ids, _, _ = self.grouper.group_info
val = self.obj.get_values()
# groupby removes null keys from groupings
mask = ids != -1
ids, val = ids[mask], val[mask]
if bins is None:
lab, lev = algos.factorize(val, sort=True)
else:
cat, bins = cut(val, bins, retbins=True)
# bins[:-1] for backward compat;
# o.w. cat.categories could be better
lab, lev, dropna = cat.codes, bins[:-1], False
sorter = np.lexsort((lab, ids))
ids, lab = ids[sorter], lab[sorter]
# group boundaries are where group ids change
idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]]
# new values are where sorted labels change
inc = np.r_[True, lab[1:] != lab[:-1]]
inc[idx] = True # group boundaries are also new values
out = np.diff(np.nonzero(np.r_[inc, True])[0]) # value counts
# num. of times each group should be repeated
rep = partial(np.repeat, repeats=np.add.reduceat(inc, idx))
# multi-index components
labels = list(map(rep, self.grouper.recons_labels)) + [lab[inc]]
levels = [ping.group_index for ping in self.grouper.groupings] + [lev]
names = self.grouper.names + [self.name]
if dropna:
mask = labels[-1] != -1
if mask.all():
dropna = False
else:
out, labels = out[mask], [label[mask] for label in labels]
if normalize:
out = out.astype('float')
d = np.diff(np.r_[idx, len(ids)])
if dropna:
m = ids[lab == -1]
if _np_version_under1p8:
mi, ml = algos.factorize(m)
d[ml] = d[ml] - np.bincount(mi)
else:
np.add.at(d, m, -1)
acc = rep(d)[mask]
else:
acc = rep(d)
out /= acc
if sort and bins is None:
cat = ids[inc][mask] if dropna else ids[inc]
sorter = np.lexsort((out if ascending else -out, cat))
out, labels[-1] = out[sorter], labels[-1][sorter]
if bins is None:
mi = MultiIndex(levels=levels, labels=labels, names=names,
verify_integrity=False)
if is_integer_dtype(out):
out = _ensure_int64(out)
return Series(out, index=mi, name=self.name)
# for compat. with algos.value_counts need to ensure every
# bin is present at every index level, null filled with zeros
diff = np.zeros(len(out), dtype='bool')
for lab in labels[:-1]:
diff |= np.r_[True, lab[1:] != lab[:-1]]
ncat, nbin = diff.sum(), len(levels[-1])
left = [np.repeat(np.arange(ncat), nbin),
np.tile(np.arange(nbin), ncat)]
right = [diff.cumsum() - 1, labels[-1]]
_, idx = _get_join_indexers(left, right, sort=False, how='left')
out = np.where(idx != -1, out[idx], 0)
if sort:
sorter = np.lexsort((out if ascending else -out, left[0]))
out, left[-1] = out[sorter], left[-1][sorter]
# build the multi-index w/ full levels
labels = list(map(lambda lab: np.repeat(lab[diff], nbin), labels[:-1]))
labels.append(left[-1])
mi = MultiIndex(levels=levels, labels=labels, names=names,
verify_integrity=False)
if is_integer_dtype(out):
out = _ensure_int64(out)
return Series(out, index=mi, name=self.name)
def count(self):
""" Compute count of group, excluding missing values """
ids, _, ngroups = self.grouper.group_info
val = self.obj.get_values()
mask = (ids != -1) & ~isnull(val)
ids = _ensure_platform_int(ids)
out = np.bincount(ids[mask], minlength=ngroups or None)
return Series(out,
index=self.grouper.result_index,
name=self.name,
dtype='int64')
def _apply_to_column_groupbys(self, func):
""" return a pass thru """
return func(self)
class NDFrameGroupBy(GroupBy):
def _iterate_slices(self):
if self.axis == 0:
# kludge
if self._selection is None:
slice_axis = self.obj.columns
else:
slice_axis = self._selection_list
slicer = lambda x: self.obj[x]
else:
slice_axis = self.obj.index
slicer = self.obj.xs
for val in slice_axis:
if val in self.exclusions:
continue
yield val, slicer(val)
def _cython_agg_general(self, how, numeric_only=True):
new_items, new_blocks = self._cython_agg_blocks(
how, numeric_only=numeric_only)
return self._wrap_agged_blocks(new_items, new_blocks)
def _wrap_agged_blocks(self, items, blocks):
obj = self._obj_with_exclusions
new_axes = list(obj._data.axes)
# more kludge
if self.axis == 0:
new_axes[0], new_axes[1] = new_axes[1], self.grouper.result_index
else:
new_axes[self.axis] = self.grouper.result_index
# Make sure block manager integrity check passes.
assert new_axes[0].equals(items)
new_axes[0] = items
mgr = BlockManager(blocks, new_axes)
new_obj = type(obj)(mgr)
return self._post_process_cython_aggregate(new_obj)
_block_agg_axis = 0
def _cython_agg_blocks(self, how, numeric_only=True):
data, agg_axis = self._get_data_to_aggregate()
new_blocks = []
if numeric_only:
data = data.get_numeric_data(copy=False)
for block in data.blocks:
result, _ = self.grouper.aggregate(
block.values, how, axis=agg_axis)
# see if we can cast the block back to the original dtype
result = block._try_coerce_and_cast_result(result)
newb = make_block(result, placement=block.mgr_locs)
new_blocks.append(newb)
if len(new_blocks) == 0:
raise DataError('No numeric types to aggregate')
return data.items, new_blocks
def _get_data_to_aggregate(self):
obj = self._obj_with_exclusions
if self.axis == 0:
return obj.swapaxes(0, 1)._data, 1
else:
return obj._data, self.axis
def _post_process_cython_aggregate(self, obj):
# undoing kludge from below
if self.axis == 0:
obj = obj.swapaxes(0, 1)
return obj
def aggregate(self, arg, *args, **kwargs):
_level = kwargs.pop('_level', None)
result, how = self._aggregate(arg, _level=_level, *args, **kwargs)
if how is None:
return result
if result is None:
# grouper specific aggregations
if self.grouper.nkeys > 1:
return self._python_agg_general(arg, *args, **kwargs)
else:
# try to treat as if we are passing a list
try:
assert not args and not kwargs
result = self._aggregate_multiple_funcs(
[arg], _level=_level)
result.columns = Index(
result.columns.levels[0],
name=self._selected_obj.columns.name)
except:
result = self._aggregate_generic(arg, *args, **kwargs)
if not self.as_index:
self._insert_inaxis_grouper_inplace(result)
result.index = np.arange(len(result))
return result._convert(datetime=True)
agg = aggregate
def _aggregate_generic(self, func, *args, **kwargs):
if self.grouper.nkeys != 1:
raise AssertionError('Number of keys must be 1')
axis = self.axis
obj = self._obj_with_exclusions
result = {}
if axis != obj._info_axis_number:
try:
for name, data in self:
result[name] = self._try_cast(func(data, *args, **kwargs),
data)
except Exception:
return self._aggregate_item_by_item(func, *args, **kwargs)
else:
for name in self.indices:
try:
data = self.get_group(name, obj=obj)
result[name] = self._try_cast(func(data, *args, **kwargs),
data)
except Exception:
wrapper = lambda x: func(x, *args, **kwargs)
result[name] = data.apply(wrapper, axis=axis)
return self._wrap_generic_output(result, obj)
def _wrap_aggregated_output(self, output, names=None):
raise AbstractMethodError(self)
def _aggregate_item_by_item(self, func, *args, **kwargs):
# only for axis==0
obj = self._obj_with_exclusions
result = {}
cannot_agg = []
errors = None
for item in obj:
try:
data = obj[item]
colg = SeriesGroupBy(data, selection=item,
grouper=self.grouper)
result[item] = self._try_cast(
colg.aggregate(func, *args, **kwargs), data)
except ValueError:
cannot_agg.append(item)
continue
except TypeError as e:
cannot_agg.append(item)
errors = e
continue
result_columns = obj.columns
if cannot_agg:
result_columns = result_columns.drop(cannot_agg)
# GH6337
if not len(result_columns) and errors is not None:
raise errors
return DataFrame(result, columns=result_columns)
def _decide_output_index(self, output, labels):
if len(output) == len(labels):
output_keys = labels
else:
output_keys = sorted(output)
try:
output_keys.sort()
except Exception: # pragma: no cover
pass
if isinstance(labels, MultiIndex):
output_keys = MultiIndex.from_tuples(output_keys,
names=labels.names)
return output_keys
def _wrap_applied_output(self, keys, values, not_indexed_same=False):
from pandas.core.index import _all_indexes_same
if len(keys) == 0:
return DataFrame(index=keys)
key_names = self.grouper.names
# GH12824.
def first_non_None_value(values):
try:
v = next(v for v in values if v is not None)
except StopIteration:
return None
return v
v = first_non_None_value(values)
if v is None:
# GH9684. If all values are None, then this will throw an error.
# We'd prefer it return an empty dataframe.
return DataFrame()
elif isinstance(v, DataFrame):
return self._concat_objects(keys, values,
not_indexed_same=not_indexed_same)
elif self.grouper.groupings is not None:
if len(self.grouper.groupings) > 1:
key_index = MultiIndex.from_tuples(keys, names=key_names)
else:
ping = self.grouper.groupings[0]
if len(keys) == ping.ngroups:
key_index = ping.group_index
key_index.name = key_names[0]
key_lookup = Index(keys)
indexer = key_lookup.get_indexer(key_index)
# reorder the values
values = [values[i] for i in indexer]
else:
key_index = Index(keys, name=key_names[0])
# don't use the key indexer
if not self.as_index:
key_index = None
# make Nones an empty object
v = first_non_None_value(values)
if v is None:
return DataFrame()
elif isinstance(v, NDFrame):
values = [
x if x is not None else
v._constructor(**v._construct_axes_dict())
for x in values
]
v = values[0]
if isinstance(v, (np.ndarray, Index, Series)):
if isinstance(v, Series):
applied_index = self._selected_obj._get_axis(self.axis)
all_indexed_same = _all_indexes_same([
x.index for x in values
])
singular_series = (len(values) == 1 and
applied_index.nlevels == 1)
# GH3596
# provide a reduction (Frame -> Series) if groups are
# unique
if self.squeeze:
# assign the name to this series
if singular_series:
values[0].name = keys[0]
# GH2893
# we have series in the values array, we want to
# produce a series:
# if any of the sub-series are not indexed the same
# OR we don't have a multi-index and we have only a
# single values
return self._concat_objects(
keys, values, not_indexed_same=not_indexed_same
)
# still a series
# path added as of GH 5545
elif all_indexed_same:
from pandas.tools.merge import concat
return concat(values)
if not all_indexed_same:
# GH 8467
return self._concat_objects(
keys, values, not_indexed_same=True,
)
try:
if self.axis == 0:
# GH6124 if the list of Series have a consistent name,
# then propagate that name to the result.
index = v.index.copy()
if index.name is None:
# Only propagate the series name to the result
# if all series have a consistent name. If the
# series do not have a consistent name, do
# nothing.
names = set(v.name for v in values)
if len(names) == 1:
index.name = list(names)[0]
# normally use vstack as its faster than concat
# and if we have mi-columns
if isinstance(v.index,
MultiIndex) or key_index is None:
stacked_values = np.vstack(map(np.asarray, values))
result = DataFrame(stacked_values, index=key_index,
columns=index)
else:
# GH5788 instead of stacking; concat gets the
# dtypes correct
from pandas.tools.merge import concat
result = concat(values, keys=key_index,
names=key_index.names,
axis=self.axis).unstack()
result.columns = index
else:
stacked_values = np.vstack(map(np.asarray, values))
result = DataFrame(stacked_values.T, index=v.index,
columns=key_index)
except (ValueError, AttributeError):
# GH1738: values is list of arrays of unequal lengths fall
# through to the outer else caluse
return Series(values, index=key_index, name=self.name)
# if we have date/time like in the original, then coerce dates
# as we are stacking can easily have object dtypes here
so = self._selected_obj
if (so.ndim == 2 and so.dtypes.isin(_DATELIKE_DTYPES).any()):
result = result._convert(numeric=True)
date_cols = self._selected_obj.select_dtypes(
include=list(_DATELIKE_DTYPES)).columns
date_cols = date_cols.intersection(result.columns)
result[date_cols] = (result[date_cols]
._convert(datetime=True,
coerce=True))
else:
result = result._convert(datetime=True)
return self._reindex_output(result)
# values are not series or array-like but scalars
else:
# only coerce dates if we find at least 1 datetime
coerce = True if any([isinstance(x, Timestamp)
for x in values]) else False
# self.name not passed through to Series as the result
# should not take the name of original selection of columns
return (Series(values, index=key_index)
._convert(datetime=True,
coerce=coerce))
else:
# Handle cases like BinGrouper
return self._concat_objects(keys, values,
not_indexed_same=not_indexed_same)
def _transform_general(self, func, *args, **kwargs):
from pandas.tools.merge import concat
applied = []
obj = self._obj_with_exclusions
gen = self.grouper.get_iterator(obj, axis=self.axis)
fast_path, slow_path = self._define_paths(func, *args, **kwargs)
path = None
for name, group in gen:
object.__setattr__(group, 'name', name)
if path is None:
# Try slow path and fast path.
try:
path, res = self._choose_path(fast_path, slow_path, group)
except TypeError:
return self._transform_item_by_item(obj, fast_path)
except ValueError:
msg = 'transform must return a scalar value for each group'
raise ValueError(msg)
else:
res = path(group)
# broadcasting
if isinstance(res, Series):
if res.index.is_(obj.index):
group.T.values[:] = res
else:
group.values[:] = res
applied.append(group)
else:
applied.append(res)
concat_index = obj.columns if self.axis == 0 else obj.index
concatenated = concat(applied, join_axes=[concat_index],
axis=self.axis, verify_integrity=False)
return self._set_result_index_ordered(concatenated)
def transform(self, func, *args, **kwargs):
"""
Call function producing a like-indexed DataFrame on each group and
return a DataFrame having the same indexes as the original object
filled with the transformed values
Parameters
----------
f : function
Function to apply to each subframe
Notes
-----
Each subframe is endowed the attribute 'name' in case you need to know
which group you are working on.
Examples
--------
>>> grouped = df.groupby(lambda x: mapping[x])
>>> grouped.transform(lambda x: (x - x.mean()) / x.std())
"""
# optimized transforms
func = self._is_cython_func(func) or func
if isinstance(func, compat.string_types):
if func in _cython_transforms:
# cythonized transform
return getattr(self, func)(*args, **kwargs)
else:
# cythonized aggregation and merge
result = getattr(self, func)(*args, **kwargs)
else:
return self._transform_general(func, *args, **kwargs)
# a reduction transform
if not isinstance(result, DataFrame):
return self._transform_general(func, *args, **kwargs)
obj = self._obj_with_exclusions
# nuiscance columns
if not result.columns.equals(obj.columns):
return self._transform_general(func, *args, **kwargs)
return self._transform_fast(result, obj)
def _transform_fast(self, result, obj):
"""
Fast transform path for aggregations
"""
# if there were groups with no observations (Categorical only?)
# try casting data to original dtype
cast = (self.size().fillna(0) > 0).any()
# for each col, reshape to to size of original frame
# by take operation
ids, _, ngroup = self.grouper.group_info
output = []
for i, _ in enumerate(result.columns):
res = algos.take_1d(result.iloc[:, i].values, ids)
if cast:
res = self._try_cast(res, obj.iloc[:, i])
output.append(res)
return DataFrame._from_arrays(output, columns=result.columns,
index=obj.index)
def _define_paths(self, func, *args, **kwargs):
if isinstance(func, compat.string_types):
fast_path = lambda group: getattr(group, func)(*args, **kwargs)
slow_path = lambda group: group.apply(
lambda x: getattr(x, func)(*args, **kwargs), axis=self.axis)
else:
fast_path = lambda group: func(group, *args, **kwargs)
slow_path = lambda group: group.apply(
lambda x: func(x, *args, **kwargs), axis=self.axis)
return fast_path, slow_path
def _choose_path(self, fast_path, slow_path, group):
path = slow_path
res = slow_path(group)
# if we make it here, test if we can use the fast path
try:
res_fast = fast_path(group)
# compare that we get the same results
if res.shape == res_fast.shape:
res_r = res.values.ravel()
res_fast_r = res_fast.values.ravel()
mask = notnull(res_r)
if (res_r[mask] == res_fast_r[mask]).all():
path = fast_path
except:
pass
return path, res
def _transform_item_by_item(self, obj, wrapper):
# iterate through columns
output = {}
inds = []
for i, col in enumerate(obj):
try:
output[col] = self[col].transform(wrapper)
inds.append(i)
except Exception:
pass
if len(output) == 0: # pragma: no cover
raise TypeError('Transform function invalid for data types')
columns = obj.columns
if len(output) < len(obj.columns):
columns = columns.take(inds)
return DataFrame(output, index=obj.index, columns=columns)
def filter(self, func, dropna=True, *args, **kwargs): # noqa
"""
Return a copy of a DataFrame excluding elements from groups that
do not satisfy the boolean criterion specified by func.
Parameters
----------
f : function
Function to apply to each subframe. Should return True or False.
dropna : Drop groups that do not pass the filter. True by default;
if False, groups that evaluate False are filled with NaNs.
Notes
-----
Each subframe is endowed the attribute 'name' in case you need to know
which group you are working on.
Examples
--------
>>> grouped = df.groupby(lambda x: mapping[x])
>>> grouped.filter(lambda x: x['A'].sum() + x['B'].sum() > 0)
"""
indices = []
obj = self._selected_obj
gen = self.grouper.get_iterator(obj, axis=self.axis)
for name, group in gen:
object.__setattr__(group, 'name', name)
res = func(group, *args, **kwargs)
try:
res = res.squeeze()
except AttributeError: # allow e.g., scalars and frames to pass
pass
# interpret the result of the filter
if is_bool(res) or (is_scalar(res) and isnull(res)):
if res and notnull(res):
indices.append(self._get_index(name))
else:
# non scalars aren't allowed
raise TypeError("filter function returned a %s, "
"but expected a scalar bool" %
type(res).__name__)
return self._apply_filter(indices, dropna)
class DataFrameGroupBy(NDFrameGroupBy):
_apply_whitelist = _dataframe_apply_whitelist
#
# Make class defs of attributes on DataFrameGroupBy whitelist.
for _def_str in _whitelist_method_generator(DataFrame, _apply_whitelist):
exec(_def_str)
_block_agg_axis = 1
@Substitution(name='groupby')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
return super(DataFrameGroupBy, self).aggregate(arg, *args, **kwargs)
agg = aggregate
def _gotitem(self, key, ndim, subset=None):
"""
sub-classes to define
return a sliced object
Parameters
----------
key : string / list of selections
ndim : 1,2
requested ndim of result
subset : object, default None
subset to act on
"""
if ndim == 2:
if subset is None:
subset = self.obj
return DataFrameGroupBy(subset, self.grouper, selection=key,
grouper=self.grouper,
exclusions=self.exclusions,
as_index=self.as_index)
elif ndim == 1:
if subset is None:
subset = self.obj[key]
return SeriesGroupBy(subset, selection=key,
grouper=self.grouper)
raise AssertionError("invalid ndim for _gotitem")
def _wrap_generic_output(self, result, obj):
result_index = self.grouper.levels[0]
if self.axis == 0:
return DataFrame(result, index=obj.columns,
columns=result_index).T
else:
return DataFrame(result, index=obj.index,
columns=result_index)
def _get_data_to_aggregate(self):
obj = self._obj_with_exclusions
if self.axis == 1:
return obj.T._data, 1
else:
return obj._data, 1
def _insert_inaxis_grouper_inplace(self, result):
# zip in reverse so we can always insert at loc 0
izip = zip(* map(reversed, (
self.grouper.names,
self.grouper.get_group_levels(),
[grp.in_axis for grp in self.grouper.groupings])))
for name, lev, in_axis in izip:
if in_axis:
result.insert(0, name, lev)
def _wrap_aggregated_output(self, output, names=None):
agg_axis = 0 if self.axis == 1 else 1
agg_labels = self._obj_with_exclusions._get_axis(agg_axis)
output_keys = self._decide_output_index(output, agg_labels)
if not self.as_index:
result = DataFrame(output, columns=output_keys)
self._insert_inaxis_grouper_inplace(result)
result = result.consolidate()
else:
index = self.grouper.result_index
result = DataFrame(output, index=index, columns=output_keys)
if self.axis == 1:
result = result.T
return self._reindex_output(result)._convert(datetime=True)
def _wrap_transformed_output(self, output, names=None):
return DataFrame(output, index=self.obj.index)
def _wrap_agged_blocks(self, items, blocks):
if not self.as_index:
index = np.arange(blocks[0].values.shape[1])
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)
self._insert_inaxis_grouper_inplace(result)
result = result.consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)
if self.axis == 1:
result = result.T
return self._reindex_output(result)._convert(datetime=True)
def _reindex_output(self, result):
"""
if we have categorical groupers, then we want to make sure that
we have a fully reindex-output to the levels. These may have not
participated in the groupings (e.g. may have all been
nan groups)
This can re-expand the output space
"""
groupings = self.grouper.groupings
if groupings is None:
return result
elif len(groupings) == 1:
return result
elif not any([isinstance(ping.grouper, (Categorical, CategoricalIndex))
for ping in groupings]):
return result
levels_list = [ping.group_index for ping in groupings]
index, _ = MultiIndex.from_product(
levels_list, names=self.grouper.names).sortlevel()
if self.as_index:
d = {self.obj._get_axis_name(self.axis): index, 'copy': False}
return result.reindex(**d)
# GH 13204
# Here, the categorical in-axis groupers, which need to be fully
# expanded, are columns in `result`. An idea is to do:
# result = result.set_index(self.grouper.names)
# .reindex(index).reset_index()
# but special care has to be taken because of possible not-in-axis
# groupers.
# So, we manually select and drop the in-axis grouper columns,
# reindex `result`, and then reset the in-axis grouper columns.
# Select in-axis groupers
in_axis_grps = [(i, ping.name) for (i, ping)
in enumerate(groupings) if ping.in_axis]
g_nums, g_names = zip(*in_axis_grps)
result = result.drop(labels=list(g_names), axis=1)
# Set a temp index and reindex (possibly expanding)
result = result.set_index(self.grouper.result_index
).reindex(index, copy=False)
# Reset in-axis grouper columns
# (using level numbers `g_nums` because level names may not be unique)
result = result.reset_index(level=g_nums)
return result.reset_index(drop=True)
def _iterate_column_groupbys(self):
for i, colname in enumerate(self._selected_obj.columns):
yield colname, SeriesGroupBy(self._selected_obj.iloc[:, i],
selection=colname,
grouper=self.grouper,
exclusions=self.exclusions)
def _apply_to_column_groupbys(self, func):
from pandas.tools.merge import concat
return concat(
(func(col_groupby) for _, col_groupby
in self._iterate_column_groupbys()),
keys=self._selected_obj.columns, axis=1)
[docs] def count(self):
""" Compute count of group, excluding missing values """
from functools import partial
from pandas.lib import count_level_2d
from pandas.types.missing import _isnull_ndarraylike as isnull
data, _ = self._get_data_to_aggregate()
ids, _, ngroups = self.grouper.group_info
mask = ids != -1
val = ((mask & ~isnull(blk.get_values())) for blk in data.blocks)
loc = (blk.mgr_locs for blk in data.blocks)
counter = partial(count_level_2d, labels=ids, max_bin=ngroups, axis=1)
blk = map(make_block, map(counter, val), loc)
return self._wrap_agged_blocks(data.items, list(blk))
from pandas.tools.plotting import boxplot_frame_groupby # noqa
DataFrameGroupBy.boxplot = boxplot_frame_groupby
class PanelGroupBy(NDFrameGroupBy):
@Substitution(name='groupby')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
return super(PanelGroupBy, self).aggregate(arg, *args, **kwargs)
agg = aggregate
def _iterate_slices(self):
if self.axis == 0:
# kludge
if self._selection is None:
slice_axis = self._selected_obj.items
else:
slice_axis = self._selection_list
slicer = lambda x: self._selected_obj[x]
else:
raise NotImplementedError("axis other than 0 is not supported")
for val in slice_axis:
if val in self.exclusions:
continue
yield val, slicer(val)
def aggregate(self, arg, *args, **kwargs):
"""
Aggregate using input function or dict of {column -> function}
Parameters
----------
arg : function or dict
Function to use for aggregating groups. If a function, must either
work when passed a Panel or when passed to Panel.apply. If
pass a dict, the keys must be DataFrame column names
Returns
-------
aggregated : Panel
"""
if isinstance(arg, compat.string_types):
return getattr(self, arg)(*args, **kwargs)
return self._aggregate_generic(arg, *args, **kwargs)
def _wrap_generic_output(self, result, obj):
if self.axis == 0:
new_axes = list(obj.axes)
new_axes[0] = self.grouper.result_index
elif self.axis == 1:
x, y, z = obj.axes
new_axes = [self.grouper.result_index, z, x]
else:
x, y, z = obj.axes
new_axes = [self.grouper.result_index, y, x]
result = Panel._from_axes(result, new_axes)
if self.axis == 1:
result = result.swapaxes(0, 1).swapaxes(0, 2)
elif self.axis == 2:
result = result.swapaxes(0, 2)
return result
def _aggregate_item_by_item(self, func, *args, **kwargs):
obj = self._obj_with_exclusions
result = {}
if self.axis > 0:
for item in obj:
try:
itemg = DataFrameGroupBy(obj[item],
axis=self.axis - 1,
grouper=self.grouper)
result[item] = itemg.aggregate(func, *args, **kwargs)
except (ValueError, TypeError):
raise
new_axes = list(obj.axes)
new_axes[self.axis] = self.grouper.result_index
return Panel._from_axes(result, new_axes)
else:
raise ValueError("axis value must be greater than 0")
def _wrap_aggregated_output(self, output, names=None):
raise AbstractMethodError(self)
class NDArrayGroupBy(GroupBy):
pass
# ----------------------------------------------------------------------
# Splitting / application
class DataSplitter(object):
def __init__(self, data, labels, ngroups, axis=0):
self.data = data
self.labels = _ensure_int64(labels)
self.ngroups = ngroups
self.axis = axis
@cache_readonly
def slabels(self):
# Sorted labels
return algos.take_nd(self.labels, self.sort_idx, allow_fill=False)
@cache_readonly
def sort_idx(self):
# Counting sort indexer
return _get_group_index_sorter(self.labels, self.ngroups)
def __iter__(self):
sdata = self._get_sorted_data()
if self.ngroups == 0:
raise StopIteration
starts, ends = lib.generate_slices(self.slabels, self.ngroups)
for i, (start, end) in enumerate(zip(starts, ends)):
# Since I'm now compressing the group ids, it's now not "possible"
# to produce empty slices because such groups would not be observed
# in the data
# if start >= end:
# raise AssertionError('Start %s must be less than end %s'
# % (str(start), str(end)))
yield i, self._chop(sdata, slice(start, end))
def _get_sorted_data(self):
return self.data.take(self.sort_idx, axis=self.axis, convert=False)
def _chop(self, sdata, slice_obj):
return sdata.iloc[slice_obj]
def apply(self, f):
raise AbstractMethodError(self)
class ArraySplitter(DataSplitter):
pass
class SeriesSplitter(DataSplitter):
def _chop(self, sdata, slice_obj):
return sdata._get_values(slice_obj).to_dense()
class FrameSplitter(DataSplitter):
def __init__(self, data, labels, ngroups, axis=0):
super(FrameSplitter, self).__init__(data, labels, ngroups, axis=axis)
def fast_apply(self, f, names):
# must return keys::list, values::list, mutated::bool
try:
starts, ends = lib.generate_slices(self.slabels, self.ngroups)
except:
# fails when all -1
return [], True
sdata = self._get_sorted_data()
results, mutated = lib.apply_frame_axis0(sdata, f, names, starts, ends)
return results, mutated
def _chop(self, sdata, slice_obj):
if self.axis == 0:
return sdata.iloc[slice_obj]
else:
return sdata._slice(slice_obj, axis=1) # ix[:, slice_obj]
class NDFrameSplitter(DataSplitter):
def __init__(self, data, labels, ngroups, axis=0):
super(NDFrameSplitter, self).__init__(data, labels, ngroups, axis=axis)
self.factory = data._constructor
def _get_sorted_data(self):
# this is the BlockManager
data = self.data._data
# this is sort of wasteful but...
sorted_axis = data.axes[self.axis].take(self.sort_idx)
sorted_data = data.reindex_axis(sorted_axis, axis=self.axis)
return sorted_data
def _chop(self, sdata, slice_obj):
return self.factory(sdata.get_slice(slice_obj, axis=self.axis))
def get_splitter(data, *args, **kwargs):
if isinstance(data, Series):
klass = SeriesSplitter
elif isinstance(data, DataFrame):
klass = FrameSplitter
else:
klass = NDFrameSplitter
return klass(data, *args, **kwargs)
# ----------------------------------------------------------------------
# Misc utilities
def get_group_index(labels, shape, sort, xnull):
"""
For the particular label_list, gets the offsets into the hypothetical list
representing the totally ordered cartesian product of all possible label
combinations, *as long as* this space fits within int64 bounds;
otherwise, though group indices identify unique combinations of
labels, they cannot be deconstructed.
- If `sort`, rank of returned ids preserve lexical ranks of labels.
i.e. returned id's can be used to do lexical sort on labels;
- If `xnull` nulls (-1 labels) are passed through.
Parameters
----------
labels: sequence of arrays
Integers identifying levels at each location
shape: sequence of ints same length as labels
Number of unique levels at each location
sort: boolean
If the ranks of returned ids should match lexical ranks of labels
xnull: boolean
If true nulls are excluded. i.e. -1 values in the labels are
passed through
Returns
-------
An array of type int64 where two elements are equal if their corresponding
labels are equal at all location.
"""
def _int64_cut_off(shape):
acc = long(1)
for i, mul in enumerate(shape):
acc *= long(mul)
if not acc < _INT64_MAX:
return i
return len(shape)
def loop(labels, shape):
# how many levels can be done without overflow:
nlev = _int64_cut_off(shape)
# compute flat ids for the first `nlev` levels
stride = np.prod(shape[1:nlev], dtype='i8')
out = stride * labels[0].astype('i8', subok=False, copy=False)
for i in range(1, nlev):
stride //= shape[i]
out += labels[i] * stride
if xnull: # exclude nulls
mask = labels[0] == -1
for lab in labels[1:nlev]:
mask |= lab == -1
out[mask] = -1
if nlev == len(shape): # all levels done!
return out
# compress what has been done so far in order to avoid overflow
# to retain lexical ranks, obs_ids should be sorted
comp_ids, obs_ids = _compress_group_index(out, sort=sort)
labels = [comp_ids] + labels[nlev:]
shape = [len(obs_ids)] + shape[nlev:]
return loop(labels, shape)
def maybe_lift(lab, size): # pormote nan values
return (lab + 1, size + 1) if (lab == -1).any() else (lab, size)
labels = map(_ensure_int64, labels)
if not xnull:
labels, shape = map(list, zip(*map(maybe_lift, labels, shape)))
return loop(list(labels), list(shape))
_INT64_MAX = np.iinfo(np.int64).max
def _int64_overflow_possible(shape):
the_prod = long(1)
for x in shape:
the_prod *= long(x)
return the_prod >= _INT64_MAX
def decons_group_index(comp_labels, shape):
# reconstruct labels
if _int64_overflow_possible(shape):
# at some point group indices are factorized,
# and may not be deconstructed here! wrong path!
raise ValueError('cannot deconstruct factorized group indices!')
label_list = []
factor = 1
y = 0
x = comp_labels
for i in reversed(range(len(shape))):
labels = (x - y) % (factor * shape[i]) // factor
np.putmask(labels, comp_labels < 0, -1)
label_list.append(labels)
y = labels * factor
factor *= shape[i]
return label_list[::-1]
def decons_obs_group_ids(comp_ids, obs_ids, shape, labels, xnull):
"""
reconstruct labels from observed group ids
Parameters
----------
xnull: boolean,
if nulls are excluded; i.e. -1 labels are passed through
"""
from pandas.hashtable import unique_label_indices
if not xnull:
lift = np.fromiter(((a == -1).any() for a in labels), dtype='i8')
shape = np.asarray(shape, dtype='i8') + lift
if not _int64_overflow_possible(shape):
# obs ids are deconstructable! take the fast route!
out = decons_group_index(obs_ids, shape)
return out if xnull or not lift.any() \
else [x - y for x, y in zip(out, lift)]
i = unique_label_indices(comp_ids)
i8copy = lambda a: a.astype('i8', subok=False, copy=True)
return [i8copy(lab[i]) for lab in labels]
def _indexer_from_factorized(labels, shape, compress=True):
ids = get_group_index(labels, shape, sort=True, xnull=False)
if not compress:
ngroups = (ids.size and ids.max()) + 1
else:
ids, obs = _compress_group_index(ids, sort=True)
ngroups = len(obs)
return _get_group_index_sorter(ids, ngroups)
def _lexsort_indexer(keys, orders=None, na_position='last'):
labels = []
shape = []
if isinstance(orders, bool):
orders = [orders] * len(keys)
elif orders is None:
orders = [True] * len(keys)
for key, order in zip(keys, orders):
# we are already a Categorical
if is_categorical_dtype(key):
c = key
# create the Categorical
else:
c = Categorical(key, ordered=True)
if na_position not in ['last', 'first']:
raise ValueError('invalid na_position: {!r}'.format(na_position))
n = len(c.categories)
codes = c.codes.copy()
mask = (c.codes == -1)
if order: # ascending
if na_position == 'last':
codes = np.where(mask, n, codes)
elif na_position == 'first':
codes += 1
else: # not order means descending
if na_position == 'last':
codes = np.where(mask, n, n - codes - 1)
elif na_position == 'first':
codes = np.where(mask, 0, n - codes)
if mask.any():
n += 1
shape.append(n)
labels.append(codes)
return _indexer_from_factorized(labels, shape)
def _nargsort(items, kind='quicksort', ascending=True, na_position='last'):
"""
This is intended to be a drop-in replacement for np.argsort which
handles NaNs. It adds ascending and na_position parameters.
GH #6399, #5231
"""
# specially handle Categorical
if is_categorical_dtype(items):
return items.argsort(ascending=ascending)
items = np.asanyarray(items)
idx = np.arange(len(items))
mask = isnull(items)
non_nans = items[~mask]
non_nan_idx = idx[~mask]
nan_idx = np.nonzero(mask)[0]
if not ascending:
non_nans = non_nans[::-1]
non_nan_idx = non_nan_idx[::-1]
indexer = non_nan_idx[non_nans.argsort(kind=kind)]
if not ascending:
indexer = indexer[::-1]
# Finally, place the NaNs at the end or the beginning according to
# na_position
if na_position == 'last':
indexer = np.concatenate([indexer, nan_idx])
elif na_position == 'first':
indexer = np.concatenate([nan_idx, indexer])
else:
raise ValueError('invalid na_position: {!r}'.format(na_position))
return indexer
class _KeyMapper(object):
"""
Ease my suffering. Map compressed group id -> key tuple
"""
def __init__(self, comp_ids, ngroups, labels, levels):
self.levels = levels
self.labels = labels
self.comp_ids = comp_ids.astype(np.int64)
self.k = len(labels)
self.tables = [_hash.Int64HashTable(ngroups) for _ in range(self.k)]
self._populate_tables()
def _populate_tables(self):
for labs, table in zip(self.labels, self.tables):
table.map(self.comp_ids, labs.astype(np.int64))
def get_key(self, comp_id):
return tuple(level[table.get_item(comp_id)]
for table, level in zip(self.tables, self.levels))
def _get_indices_dict(label_list, keys):
shape = list(map(len, keys))
group_index = get_group_index(label_list, shape, sort=True, xnull=True)
ngroups = ((group_index.size and group_index.max()) + 1) \
if _int64_overflow_possible(shape) \
else np.prod(shape, dtype='i8')
sorter = _get_group_index_sorter(group_index, ngroups)
sorted_labels = [lab.take(sorter) for lab in label_list]
group_index = group_index.take(sorter)
return lib.indices_fast(sorter, group_index, keys, sorted_labels)
# ----------------------------------------------------------------------
# sorting levels...cleverly?
def _get_group_index_sorter(group_index, ngroups):
"""
_algos.groupsort_indexer implements `counting sort` and it is at least
O(ngroups), where
ngroups = prod(shape)
shape = map(len, keys)
that is, linear in the number of combinations (cartesian product) of unique
values of groupby keys. This can be huge when doing multi-key groupby.
np.argsort(kind='mergesort') is O(count x log(count)) where count is the
length of the data-frame;
Both algorithms are `stable` sort and that is necessary for correctness of
groupby operations. e.g. consider:
df.groupby(key)[col].transform('first')
"""
count = len(group_index)
alpha = 0.0 # taking complexities literally; there may be
beta = 1.0 # some room for fine-tuning these parameters
if alpha + beta * ngroups < count * np.log(count):
sorter, _ = _algos.groupsort_indexer(_ensure_int64(group_index),
ngroups)
return _ensure_platform_int(sorter)
else:
return group_index.argsort(kind='mergesort')
def _compress_group_index(group_index, sort=True):
"""
Group_index is offsets into cartesian product of all possible labels. This
space can be huge, so this function compresses it, by computing offsets
(comp_ids) into the list of unique labels (obs_group_ids).
"""
size_hint = min(len(group_index), _hash._SIZE_HINT_LIMIT)
table = _hash.Int64HashTable(size_hint)
group_index = _ensure_int64(group_index)
# note, group labels come out ascending (ie, 1,2,3 etc)
comp_ids, obs_group_ids = table.get_labels_groupby(group_index)
if sort and len(obs_group_ids) > 0:
obs_group_ids, comp_ids = _reorder_by_uniques(obs_group_ids, comp_ids)
return comp_ids, obs_group_ids
def _reorder_by_uniques(uniques, labels):
# sorter is index where elements ought to go
sorter = uniques.argsort()
# reverse_indexer is where elements came from
reverse_indexer = np.empty(len(sorter), dtype=np.int64)
reverse_indexer.put(sorter, np.arange(len(sorter)))
mask = labels < 0
# move labels to right locations (ie, unsort ascending labels)
labels = algos.take_nd(reverse_indexer, labels, allow_fill=False)
np.putmask(labels, mask, -1)
# sort observed ids
uniques = algos.take_nd(uniques, sorter, allow_fill=False)
return uniques, labels
def _groupby_indices(values):
if is_categorical_dtype(values):
# we have a categorical, so we can do quite a bit
# bit better than factorizing again
reverse = dict(enumerate(values.categories))
codes = values.codes.astype('int64')
_, counts = _hash.value_count_int64(codes, False)
else:
reverse, codes, counts = _algos.group_labels(
_values_from_object(_ensure_object(values)))
return _algos.groupby_indices(reverse, codes, counts)
def numpy_groupby(data, labels, axis=0):
s = np.argsort(labels)
keys, inv = np.unique(labels, return_inverse=True)
i = inv.take(s)
groups_at = np.where(i != np.concatenate(([-1], i[:-1])))[0]
ordered_data = data.take(s, axis=axis)
group_sums = np.add.reduceat(ordered_data, groups_at, axis=axis)
return group_sums