"""
provide a generic structure to support window functions,
similar to how we have a Groupby object
"""
from __future__ import division
import warnings
import numpy as np
from collections import defaultdict
from pandas.types.generic import (ABCSeries,
ABCDataFrame,
ABCDatetimeIndex,
ABCTimedeltaIndex,
ABCPeriodIndex)
from pandas.types.common import (is_integer,
is_bool,
is_float_dtype,
is_integer_dtype,
needs_i8_conversion,
is_timedelta64_dtype,
is_list_like,
_ensure_float64)
import pandas as pd
from pandas.lib import isscalar
from pandas.core.base import (PandasObject, SelectionMixin,
GroupByMixin)
import pandas.core.common as com
import pandas._window as _window
from pandas.tseries.offsets import DateOffset
from pandas import compat
from pandas.compat.numpy import function as nv
from pandas.util.decorators import (Substitution, Appender,
cache_readonly)
from textwrap import dedent
_shared_docs = dict()
_doc_template = """
Returns
-------
same type as input
See also
--------
pandas.Series.%(name)s
pandas.DataFrame.%(name)s
"""
class _Window(PandasObject, SelectionMixin):
_attributes = ['window', 'min_periods', 'freq', 'center', 'win_type',
'axis', 'on']
exclusions = set()
def __init__(self, obj, window=None, min_periods=None, freq=None,
center=False, win_type=None, axis=0, on=None, **kwargs):
if freq is not None:
warnings.warn("The freq kw is deprecated and will be removed in a "
"future version. You can resample prior to passing "
"to a window function", FutureWarning, stacklevel=3)
self.__dict__.update(kwargs)
self.blocks = []
self.obj = obj
self.on = on
self.window = window
self.min_periods = min_periods
self.freq = freq
self.center = center
self.win_type = win_type
self.axis = obj._get_axis_number(axis) if axis is not None else None
self.validate()
@property
def _constructor(self):
return Window
@property
def is_datetimelike(self):
return None
@property
def _on(self):
return None
@property
def is_freq_type(self):
return self.win_type == 'freq'
def validate(self):
if self.center is not None and not is_bool(self.center):
raise ValueError("center must be a boolean")
if self.min_periods is not None and not \
is_integer(self.min_periods):
raise ValueError("min_periods must be an integer")
def _convert_freq(self, how=None):
""" resample according to the how, return a new object """
obj = self._selected_obj
index = None
if (self.freq is not None and
isinstance(obj, (ABCSeries, ABCDataFrame))):
if how is not None:
warnings.warn("The how kw argument is deprecated and removed "
"in a future version. You can resample prior "
"to passing to a window function", FutureWarning,
stacklevel=6)
obj = obj.resample(self.freq).aggregate(how or 'asfreq')
return obj, index
def _create_blocks(self, how):
""" split data into blocks & return conformed data """
obj, index = self._convert_freq(how)
if index is not None:
index = self._on
# filter out the on from the object
if self.on is not None:
if obj.ndim == 2:
obj = obj.reindex(columns=obj.columns.difference([self.on]),
copy=False)
blocks = obj.as_blocks(copy=False).values()
return blocks, obj, index
def _gotitem(self, key, ndim, subset=None):
"""
sub-classes to define
return a sliced object
Parameters
----------
key : string / list of selections
ndim : 1,2
requested ndim of result
subset : object, default None
subset to act on
"""
# create a new object to prevent aliasing
if subset is None:
subset = self.obj
self = self._shallow_copy(subset)
self._reset_cache()
if subset.ndim == 2:
if isscalar(key) and key in subset or is_list_like(key):
self._selection = key
return self
def __getattr__(self, attr):
if attr in self._internal_names_set:
return object.__getattribute__(self, attr)
if attr in self.obj:
return self[attr]
raise AttributeError("%r object has no attribute %r" %
(type(self).__name__, attr))
def _dir_additions(self):
return self.obj._dir_additions()
def _get_window(self, other=None):
return self.window
@property
def _window_type(self):
return self.__class__.__name__
def __unicode__(self):
""" provide a nice str repr of our rolling object """
attrs = ["{k}={v}".format(k=k, v=getattr(self, k))
for k in self._attributes
if getattr(self, k, None) is not None]
return "{klass} [{attrs}]".format(klass=self._window_type,
attrs=','.join(attrs))
def _get_index(self, index=None):
"""
Return index as ndarrays
Returns
-------
tuple of (index, index_as_ndarray)
"""
if self.is_freq_type:
if index is None:
index = self._on
return index, index.asi8
return index, index
def _prep_values(self, values=None, kill_inf=True, how=None):
if values is None:
values = getattr(self._selected_obj, 'values', self._selected_obj)
# GH #12373 : rolling functions error on float32 data
# make sure the data is coerced to float64
if is_float_dtype(values.dtype):
values = _ensure_float64(values)
elif is_integer_dtype(values.dtype):
values = _ensure_float64(values)
elif needs_i8_conversion(values.dtype):
raise NotImplementedError("ops for {action} for this "
"dtype {dtype} are not "
"implemented".format(
action=self._window_type,
dtype=values.dtype))
else:
try:
values = _ensure_float64(values)
except (ValueError, TypeError):
raise TypeError("cannot handle this type -> {0}"
"".format(values.dtype))
if kill_inf:
values = values.copy()
values[np.isinf(values)] = np.NaN
return values
def _wrap_result(self, result, block=None, obj=None):
""" wrap a single result """
if obj is None:
obj = self._selected_obj
index = obj.index
if isinstance(result, np.ndarray):
# coerce if necessary
if block is not None:
if is_timedelta64_dtype(block.values.dtype):
result = pd.to_timedelta(
result.ravel(), unit='ns').values.reshape(result.shape)
if result.ndim == 1:
from pandas import Series
return Series(result, index, name=obj.name)
return type(obj)(result, index=index, columns=block.columns)
return result
def _wrap_results(self, results, blocks, obj):
"""
wrap the results
Paramters
---------
results : list of ndarrays
blocks : list of blocks
obj : conformed data (may be resampled)
"""
from pandas import Series
from pandas.core.index import _ensure_index
final = []
for result, block in zip(results, blocks):
result = self._wrap_result(result, block=block, obj=obj)
if result.ndim == 1:
return result
final.append(result)
# if we have an 'on' column
# we want to put it back into the results
# in the same location
columns = self._selected_obj.columns
if self.on is not None \
and not self._on.equals(obj.index):
name = self._on.name
final.append(Series(self._on, index=obj.index, name=name))
if self._selection is not None:
selection = _ensure_index(self._selection)
# need to reorder to include original location of
# the on column (if its not already there)
if name not in selection:
columns = self.obj.columns
indexer = columns.get_indexer(selection.tolist() + [name])
columns = columns.take(sorted(indexer))
if not len(final):
return obj.astype('float64')
return pd.concat(final, axis=1).reindex(columns=columns,
copy=False)
def _center_window(self, result, window):
""" center the result in the window """
if self.axis > result.ndim - 1:
raise ValueError("Requested axis is larger then no. of argument "
"dimensions")
from pandas import Series, DataFrame
offset = _offset(window, True)
if offset > 0:
if isinstance(result, (Series, DataFrame)):
result = result.slice_shift(-offset, axis=self.axis)
else:
lead_indexer = [slice(None)] * result.ndim
lead_indexer[self.axis] = slice(offset, None)
result = np.copy(result[tuple(lead_indexer)])
return result
def aggregate(self, arg, *args, **kwargs):
result, how = self._aggregate(arg, *args, **kwargs)
if result is None:
return self.apply(arg, args=args, kwargs=kwargs)
return result
agg = aggregate
_shared_docs['sum'] = dedent("""
%(name)s sum
Parameters
----------
how : string, default None (DEPRECATED)
Method for down- or re-sampling""")
_shared_docs['mean'] = dedent("""
%(name)s mean
Parameters
----------
how : string, default None (DEPRECATED)
Method for down- or re-sampling""")
class Window(_Window):
"""
Provides rolling window calculcations.
.. versionadded:: 0.18.0
Parameters
----------
window : int, or offset
Size of the moving window. This is the number of observations used for
calculating the statistic. Each window will be a fixed size.
If its an offset then this will be the time period of each window. Each
window will be a variable sized based on the observations included in
the time-period. This is only valid for datetimelike indexes. This is
new in 0.19.0
min_periods : int, default None
Minimum number of observations in window required to have a value
(otherwise result is NA). For a window that is specified by an offset,
this will default to 1.
freq : string or DateOffset object, optional (default None) (DEPRECATED)
Frequency to conform the data to before computing the statistic.
Specified as a frequency string or DateOffset object.
center : boolean, default False
Set the labels at the center of the window.
win_type : string, default None
Provide a window type. See the notes below.
on : string, optional
For a DataFrame, column on which to calculate
the rolling window, rather than the index
.. versionadded:: 0.19.0
axis : int or string, default 0
Returns
-------
a Window or Rolling sub-classed for the particular operation
Examples
--------
>>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
>>> df
B
0 0.0
1 1.0
2 2.0
3 NaN
4 4.0
Rolling sum with a window length of 2, using the 'triang'
window type.
>>> df.rolling(2, win_type='triang').sum()
B
0 NaN
1 1.0
2 2.5
3 NaN
4 NaN
Rolling sum with a window length of 2, min_periods defaults
to the window length.
>>> df.rolling(2).sum()
B
0 NaN
1 1.0
2 3.0
3 NaN
4 NaN
Same as above, but explicity set the min_periods
>>> df.rolling(2, min_periods=1).sum()
B
0 0.0
1 1.0
2 3.0
3 2.0
4 4.0
A ragged (meaning not-a-regular frequency), time-indexed DataFrame
>>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]},
....: index = [pd.Timestamp('20130101 09:00:00'),
....: pd.Timestamp('20130101 09:00:02'),
....: pd.Timestamp('20130101 09:00:03'),
....: pd.Timestamp('20130101 09:00:05'),
....: pd.Timestamp('20130101 09:00:06')])
>>> df
B
2013-01-01 09:00:00 0.0
2013-01-01 09:00:02 1.0
2013-01-01 09:00:03 2.0
2013-01-01 09:00:05 NaN
2013-01-01 09:00:06 4.0
Contrasting to an integer rolling window, this will roll a variable
length window corresponding to the time period.
The default for min_periods is 1.
>>> df.rolling('2s').sum()
B
2013-01-01 09:00:00 0.0
2013-01-01 09:00:02 1.0
2013-01-01 09:00:03 3.0
2013-01-01 09:00:05 NaN
2013-01-01 09:00:06 4.0
Notes
-----
By default, the result is set to the right edge of the window. This can be
changed to the center of the window by setting ``center=True``.
The `freq` keyword is used to conform time series data to a specified
frequency by resampling the data. This is done with the default parameters
of :meth:`~pandas.Series.resample` (i.e. using the `mean`).
To learn more about the offsets & frequency strings, please see `this link
<http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases>`__.
The recognized win_types are:
* ``boxcar``
* ``triang``
* ``blackman``
* ``hamming``
* ``bartlett``
* ``parzen``
* ``bohman``
* ``blackmanharris``
* ``nuttall``
* ``barthann``
* ``kaiser`` (needs beta)
* ``gaussian`` (needs std)
* ``general_gaussian`` (needs power, width)
* ``slepian`` (needs width).
"""
def validate(self):
super(Window, self).validate()
window = self.window
if isinstance(window, (list, tuple, np.ndarray)):
pass
elif is_integer(window):
if window < 0:
raise ValueError("window must be non-negative")
try:
import scipy.signal as sig
except ImportError:
raise ImportError('Please install scipy to generate window '
'weight')
if not isinstance(self.win_type, compat.string_types):
raise ValueError('Invalid win_type {0}'.format(self.win_type))
if getattr(sig, self.win_type, None) is None:
raise ValueError('Invalid win_type {0}'.format(self.win_type))
else:
raise ValueError('Invalid window {0}'.format(window))
def _prep_window(self, **kwargs):
"""
provide validation for our window type, return the window
we have already been validated
"""
window = self._get_window()
if isinstance(window, (list, tuple, np.ndarray)):
return com._asarray_tuplesafe(window).astype(float)
elif is_integer(window):
import scipy.signal as sig
# the below may pop from kwargs
def _validate_win_type(win_type, kwargs):
arg_map = {'kaiser': ['beta'],
'gaussian': ['std'],
'general_gaussian': ['power', 'width'],
'slepian': ['width']}
if win_type in arg_map:
return tuple([win_type] + _pop_args(win_type,
arg_map[win_type],
kwargs))
return win_type
def _pop_args(win_type, arg_names, kwargs):
msg = '%s window requires %%s' % win_type
all_args = []
for n in arg_names:
if n not in kwargs:
raise ValueError(msg % n)
all_args.append(kwargs.pop(n))
return all_args
win_type = _validate_win_type(self.win_type, kwargs)
return sig.get_window(win_type, window).astype(float)
def _apply_window(self, mean=True, how=None, **kwargs):
"""
Applies a moving window of type ``window_type`` on the data.
Parameters
----------
mean : boolean, default True
If True computes weighted mean, else weighted sum
how : string, default to None (DEPRECATED)
how to resample
Returns
-------
y : type of input argument
"""
window = self._prep_window(**kwargs)
center = self.center
blocks, obj, index = self._create_blocks(how=how)
results = []
for b in blocks:
try:
values = self._prep_values(b.values)
except TypeError:
results.append(b.values.copy())
continue
if values.size == 0:
results.append(values.copy())
continue
offset = _offset(window, center)
additional_nans = np.array([np.NaN] * offset)
def f(arg, *args, **kwargs):
minp = _use_window(self.min_periods, len(window))
return _window.roll_window(np.concatenate((arg,
additional_nans))
if center else arg, window, minp,
avg=mean)
result = np.apply_along_axis(f, self.axis, values)
if center:
result = self._center_window(result, window)
results.append(result)
return self._wrap_results(results, blocks, obj)
@Substitution(name='rolling')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
result, how = self._aggregate(arg, *args, **kwargs)
if result is None:
# these must apply directly
result = arg(self)
return result
agg = aggregate
@Substitution(name='window')
@Appender(_doc_template)
@Appender(_shared_docs['sum'])
[docs] def sum(self, *args, **kwargs):
nv.validate_window_func('sum', args, kwargs)
return self._apply_window(mean=False, **kwargs)
@Substitution(name='window')
@Appender(_doc_template)
@Appender(_shared_docs['mean'])
[docs] def mean(self, *args, **kwargs):
nv.validate_window_func('mean', args, kwargs)
return self._apply_window(mean=True, **kwargs)
class _GroupByMixin(GroupByMixin):
""" provide the groupby facilities """
def __init__(self, obj, *args, **kwargs):
parent = kwargs.pop('parent', None) # noqa
groupby = kwargs.pop('groupby', None)
if groupby is None:
groupby, obj = obj, obj.obj
self._groupby = groupby
self._groupby.mutated = True
self._groupby.grouper.mutated = True
super(GroupByMixin, self).__init__(obj, *args, **kwargs)
count = GroupByMixin._dispatch('count')
corr = GroupByMixin._dispatch('corr', other=None, pairwise=None)
cov = GroupByMixin._dispatch('cov', other=None, pairwise=None)
def _apply(self, func, name, window=None, center=None,
check_minp=None, how=None, **kwargs):
"""
dispatch to apply; we are stripping all of the _apply kwargs and
performing the original function call on the grouped object
"""
def f(x, name=name, *args):
x = self._shallow_copy(x)
if isinstance(name, compat.string_types):
return getattr(x, name)(*args, **kwargs)
return x.apply(name, *args, **kwargs)
return self._groupby.apply(f)
class _Rolling(_Window):
@property
def _constructor(self):
return Rolling
def _apply(self, func, name=None, window=None, center=None,
check_minp=None, how=None, **kwargs):
"""
Rolling statistical measure using supplied function. Designed to be
used with passed-in Cython array-based functions.
Parameters
----------
func : string/callable to apply
name : string, optional
name of this function
window : int/array, default to _get_window()
center : boolean, default to self.center
check_minp : function, default to _use_window
how : string, default to None (DEPRECATED)
how to resample
Returns
-------
y : type of input
"""
if center is None:
center = self.center
if window is None:
window = self._get_window()
if check_minp is None:
check_minp = _use_window
blocks, obj, index = self._create_blocks(how=how)
index, indexi = self._get_index(index=index)
results = []
for b in blocks:
try:
values = self._prep_values(b.values)
except TypeError:
results.append(b.values.copy())
continue
if values.size == 0:
results.append(values.copy())
continue
# if we have a string function name, wrap it
if isinstance(func, compat.string_types):
cfunc = getattr(_window, func, None)
if cfunc is None:
raise ValueError("we do not support this function "
"in _window.{0}".format(func))
def func(arg, window, min_periods=None):
minp = check_minp(min_periods, window)
# ensure we are only rolling on floats
arg = _ensure_float64(arg)
return cfunc(arg,
window, minp, indexi, **kwargs)
# calculation function
if center:
offset = _offset(window, center)
additional_nans = np.array([np.NaN] * offset)
def calc(x):
return func(np.concatenate((x, additional_nans)),
window, min_periods=self.min_periods)
else:
def calc(x):
return func(x, window, min_periods=self.min_periods)
if values.ndim > 1:
result = np.apply_along_axis(calc, self.axis, values)
else:
result = calc(values)
if center:
result = self._center_window(result, window)
results.append(result)
return self._wrap_results(results, blocks, obj)
class _Rolling_and_Expanding(_Rolling):
_shared_docs['count'] = """%(name)s count of number of non-NaN
observations inside provided window."""
def count(self):
blocks, obj, index = self._create_blocks(how=None)
index, indexi = self._get_index(index=index)
window = self._get_window()
window = min(window, len(obj)) if not self.center else window
results = []
for b in blocks:
if needs_i8_conversion(b.values):
result = b.notnull().astype(int)
else:
try:
result = np.isfinite(b).astype(float)
except TypeError:
result = np.isfinite(b.astype(float)).astype(float)
result[pd.isnull(result)] = 0
result = self._constructor(result, window=window, min_periods=0,
center=self.center).sum()
results.append(result)
return self._wrap_results(results, blocks, obj)
_shared_docs['apply'] = dedent("""
%(name)s function apply
Parameters
----------
func : function
Must produce a single value from an ndarray input
\*args and \*\*kwargs are passed to the function""")
def apply(self, func, args=(), kwargs={}):
# TODO: _level is unused?
_level = kwargs.pop('_level', None) # noqa
window = self._get_window()
offset = _offset(window, self.center)
index, indexi = self._get_index()
def f(arg, window, min_periods):
minp = _use_window(min_periods, window)
return _window.roll_generic(arg, window, minp, indexi,
offset, func, args,
kwargs)
return self._apply(f, func, args=args, kwargs=kwargs,
center=False)
def sum(self, *args, **kwargs):
nv.validate_window_func('sum', args, kwargs)
return self._apply('roll_sum', 'sum', **kwargs)
_shared_docs['max'] = dedent("""
%(name)s maximum
Parameters
----------
how : string, default 'max' (DEPRECATED)
Method for down- or re-sampling""")
def max(self, how=None, *args, **kwargs):
nv.validate_window_func('max', args, kwargs)
if self.freq is not None and how is None:
how = 'max'
return self._apply('roll_max', 'max', how=how, **kwargs)
_shared_docs['min'] = dedent("""
%(name)s minimum
Parameters
----------
how : string, default 'min' (DEPRECATED)
Method for down- or re-sampling""")
def min(self, how=None, *args, **kwargs):
nv.validate_window_func('min', args, kwargs)
if self.freq is not None and how is None:
how = 'min'
return self._apply('roll_min', 'min', how=how, **kwargs)
def mean(self, *args, **kwargs):
nv.validate_window_func('mean', args, kwargs)
return self._apply('roll_mean', 'mean', **kwargs)
_shared_docs['median'] = dedent("""
%(name)s median
Parameters
----------
how : string, default 'median' (DEPRECATED)
Method for down- or re-sampling""")
def median(self, how=None, **kwargs):
if self.freq is not None and how is None:
how = 'median'
return self._apply('roll_median_c', 'median', how=how, **kwargs)
_shared_docs['std'] = dedent("""
%(name)s standard deviation
Parameters
----------
ddof : int, default 1
Delta Degrees of Freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.""")
def std(self, ddof=1, *args, **kwargs):
nv.validate_window_func('std', args, kwargs)
window = self._get_window()
index, indexi = self._get_index()
def f(arg, *args, **kwargs):
minp = _require_min_periods(1)(self.min_periods, window)
return _zsqrt(_window.roll_var(arg, window, minp, indexi,
ddof))
return self._apply(f, 'std', check_minp=_require_min_periods(1),
ddof=ddof, **kwargs)
_shared_docs['var'] = dedent("""
%(name)s variance
Parameters
----------
ddof : int, default 1
Delta Degrees of Freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.""")
def var(self, ddof=1, *args, **kwargs):
nv.validate_window_func('var', args, kwargs)
return self._apply('roll_var', 'var',
check_minp=_require_min_periods(1), ddof=ddof,
**kwargs)
_shared_docs['skew'] = """Unbiased %(name)s skewness"""
def skew(self, **kwargs):
return self._apply('roll_skew', 'skew',
check_minp=_require_min_periods(3), **kwargs)
_shared_docs['kurt'] = """Unbiased %(name)s kurtosis"""
def kurt(self, **kwargs):
return self._apply('roll_kurt', 'kurt',
check_minp=_require_min_periods(4), **kwargs)
_shared_docs['quantile'] = dedent("""
%(name)s quantile
Parameters
----------
quantile : float
0 <= quantile <= 1""")
def quantile(self, quantile, **kwargs):
window = self._get_window()
index, indexi = self._get_index()
def f(arg, *args, **kwargs):
minp = _use_window(self.min_periods, window)
return _window.roll_quantile(arg, window, minp, indexi,
quantile)
return self._apply(f, 'quantile', quantile=quantile,
**kwargs)
_shared_docs['cov'] = dedent("""
%(name)s sample covariance
Parameters
----------
other : Series, DataFrame, or ndarray, optional
if not supplied then will default to self and produce pairwise output
pairwise : bool, default None
If False then only matching columns between self and other will be used
and the output will be a DataFrame.
If True then all pairwise combinations will be calculated and the
output will be a Panel in the case of DataFrame inputs. In the case of
missing elements, only complete pairwise observations will be used.
ddof : int, default 1
Delta Degrees of Freedom. The divisor used in calculations
is ``N - ddof``, where ``N`` represents the number of elements.""")
def cov(self, other=None, pairwise=None, ddof=1, **kwargs):
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise
other = self._shallow_copy(other)
window = self._get_window(other)
def _get_cov(X, Y):
# GH #12373 : rolling functions error on float32 data
# to avoid potential overflow, cast the data to float64
X = X.astype('float64')
Y = Y.astype('float64')
mean = lambda x: x.rolling(window, self.min_periods,
center=self.center).mean(**kwargs)
count = (X + Y).rolling(window=window,
center=self.center).count(**kwargs)
bias_adj = count / (count - ddof)
return (mean(X * Y) - mean(X) * mean(Y)) * bias_adj
return _flex_binary_moment(self._selected_obj, other._selected_obj,
_get_cov, pairwise=bool(pairwise))
_shared_docs['corr'] = dedent("""
%(name)s sample correlation
Parameters
----------
other : Series, DataFrame, or ndarray, optional
if not supplied then will default to self and produce pairwise output
pairwise : bool, default None
If False then only matching columns between self and other will be used
and the output will be a DataFrame.
If True then all pairwise combinations will be calculated and the
output will be a Panel in the case of DataFrame inputs. In the case of
missing elements, only complete pairwise observations will be used.""")
def corr(self, other=None, pairwise=None, **kwargs):
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise
other = self._shallow_copy(other)
window = self._get_window(other)
def _get_corr(a, b):
a = a.rolling(window=window, min_periods=self.min_periods,
freq=self.freq, center=self.center)
b = b.rolling(window=window, min_periods=self.min_periods,
freq=self.freq, center=self.center)
return a.cov(b, **kwargs) / (a.std(**kwargs) * b.std(**kwargs))
return _flex_binary_moment(self._selected_obj, other._selected_obj,
_get_corr, pairwise=bool(pairwise))
class Rolling(_Rolling_and_Expanding):
@cache_readonly
def is_datetimelike(self):
return isinstance(self._on,
(ABCDatetimeIndex,
ABCTimedeltaIndex,
ABCPeriodIndex))
@cache_readonly
def _on(self):
if self.on is None:
return self.obj.index
elif (isinstance(self.obj, ABCDataFrame) and
self.on in self.obj.columns):
return pd.Index(self.obj[self.on])
else:
raise ValueError("invalid on specified as {0}, "
"must be a column (if DataFrame) "
"or None".format(self.on))
def validate(self):
super(Rolling, self).validate()
# we allow rolling on a datetimelike index
if (self.is_datetimelike and
isinstance(self.window, (compat.string_types, DateOffset))):
# must be monotonic for on
if not self._on.is_monotonic:
formatted = self.on or 'index'
raise ValueError("{0} must be "
"monotonic".format(formatted))
from pandas.tseries.frequencies import to_offset
try:
freq = to_offset(self.window)
except (TypeError, ValueError):
raise ValueError("passed window {0} in not "
"compat with a datetimelike "
"index".format(self.window))
# we don't allow center
if self.center:
raise NotImplementedError("center is not implemented "
"for datetimelike and offset "
"based windows")
# this will raise ValueError on non-fixed freqs
self.window = freq.nanos
self.win_type = 'freq'
# min_periods must be an integer
if self.min_periods is None:
self.min_periods = 1
elif not is_integer(self.window):
raise ValueError("window must be an integer")
elif self.window < 0:
raise ValueError("window must be non-negative")
@Substitution(name='rolling')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
return super(Rolling, self).aggregate(arg, *args, **kwargs)
agg = aggregate
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['count'])
[docs] def count(self):
# different impl for freq counting
if self.is_freq_type:
return self._apply('roll_count', 'count')
return super(Rolling, self).count()
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['apply'])
[docs] def apply(self, func, args=(), kwargs={}):
return super(Rolling, self).apply(func, args=args, kwargs=kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['sum'])
[docs] def sum(self, *args, **kwargs):
nv.validate_rolling_func('sum', args, kwargs)
return super(Rolling, self).sum(*args, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['max'])
[docs] def max(self, *args, **kwargs):
nv.validate_rolling_func('max', args, kwargs)
return super(Rolling, self).max(*args, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['min'])
[docs] def min(self, *args, **kwargs):
nv.validate_rolling_func('min', args, kwargs)
return super(Rolling, self).min(*args, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['mean'])
[docs] def mean(self, *args, **kwargs):
nv.validate_rolling_func('mean', args, kwargs)
return super(Rolling, self).mean(*args, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['median'])
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['std'])
[docs] def std(self, ddof=1, *args, **kwargs):
nv.validate_rolling_func('std', args, kwargs)
return super(Rolling, self).std(ddof=ddof, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['var'])
[docs] def var(self, ddof=1, *args, **kwargs):
nv.validate_rolling_func('var', args, kwargs)
return super(Rolling, self).var(ddof=ddof, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['skew'])
[docs] def skew(self, **kwargs):
return super(Rolling, self).skew(**kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['kurt'])
[docs] def kurt(self, **kwargs):
return super(Rolling, self).kurt(**kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['quantile'])
[docs] def quantile(self, quantile, **kwargs):
return super(Rolling, self).quantile(quantile=quantile, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['cov'])
[docs] def cov(self, other=None, pairwise=None, ddof=1, **kwargs):
return super(Rolling, self).cov(other=other, pairwise=pairwise,
ddof=ddof, **kwargs)
@Substitution(name='rolling')
@Appender(_doc_template)
@Appender(_shared_docs['corr'])
[docs] def corr(self, other=None, pairwise=None, **kwargs):
return super(Rolling, self).corr(other=other, pairwise=pairwise,
**kwargs)
class RollingGroupby(_GroupByMixin, Rolling):
"""
Provides a rolling groupby implementation
.. versionadded:: 0.18.1
"""
@property
def _constructor(self):
return Rolling
class Expanding(_Rolling_and_Expanding):
"""
Provides expanding transformations.
.. versionadded:: 0.18.0
Parameters
----------
min_periods : int, default None
Minimum number of observations in window required to have a value
(otherwise result is NA).
freq : string or DateOffset object, optional (default None) (DEPRECATED)
Frequency to conform the data to before computing the statistic.
Specified as a frequency string or DateOffset object.
center : boolean, default False
Set the labels at the center of the window.
axis : int or string, default 0
Returns
-------
a Window sub-classed for the particular operation
Examples
--------
>>> df = DataFrame({'B': [0, 1, 2, np.nan, 4]})
B
0 0.0
1 1.0
2 2.0
3 NaN
4 4.0
>>> df.expanding(2).sum()
B
0 NaN
1 1.0
2 3.0
3 3.0
4 7.0
Notes
-----
By default, the result is set to the right edge of the window. This can be
changed to the center of the window by setting ``center=True``.
The `freq` keyword is used to conform time series data to a specified
frequency by resampling the data. This is done with the default parameters
of :meth:`~pandas.Series.resample` (i.e. using the `mean`).
"""
_attributes = ['min_periods', 'freq', 'center', 'axis']
def __init__(self, obj, min_periods=1, freq=None, center=False, axis=0,
**kwargs):
super(Expanding, self).__init__(obj=obj, min_periods=min_periods,
freq=freq, center=center, axis=axis)
@property
def _constructor(self):
return Expanding
def _get_window(self, other=None):
obj = self._selected_obj
if other is None:
return (max(len(obj), self.min_periods) if self.min_periods
else len(obj))
return (max((len(obj) + len(obj)), self.min_periods)
if self.min_periods else (len(obj) + len(obj)))
@Substitution(name='expanding')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
return super(Expanding, self).aggregate(arg, *args, **kwargs)
agg = aggregate
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['count'])
[docs] def count(self, **kwargs):
return super(Expanding, self).count(**kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['apply'])
[docs] def apply(self, func, args=(), kwargs={}):
return super(Expanding, self).apply(func, args=args, kwargs=kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['sum'])
[docs] def sum(self, *args, **kwargs):
nv.validate_expanding_func('sum', args, kwargs)
return super(Expanding, self).sum(*args, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['max'])
[docs] def max(self, *args, **kwargs):
nv.validate_expanding_func('max', args, kwargs)
return super(Expanding, self).max(*args, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['min'])
[docs] def min(self, *args, **kwargs):
nv.validate_expanding_func('min', args, kwargs)
return super(Expanding, self).min(*args, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['mean'])
[docs] def mean(self, *args, **kwargs):
nv.validate_expanding_func('mean', args, kwargs)
return super(Expanding, self).mean(*args, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['median'])
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['std'])
[docs] def std(self, ddof=1, *args, **kwargs):
nv.validate_expanding_func('std', args, kwargs)
return super(Expanding, self).std(ddof=ddof, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['var'])
[docs] def var(self, ddof=1, *args, **kwargs):
nv.validate_expanding_func('var', args, kwargs)
return super(Expanding, self).var(ddof=ddof, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['skew'])
[docs] def skew(self, **kwargs):
return super(Expanding, self).skew(**kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['kurt'])
[docs] def kurt(self, **kwargs):
return super(Expanding, self).kurt(**kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['quantile'])
[docs] def quantile(self, quantile, **kwargs):
return super(Expanding, self).quantile(quantile=quantile, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['cov'])
[docs] def cov(self, other=None, pairwise=None, ddof=1, **kwargs):
return super(Expanding, self).cov(other=other, pairwise=pairwise,
ddof=ddof, **kwargs)
@Substitution(name='expanding')
@Appender(_doc_template)
@Appender(_shared_docs['corr'])
[docs] def corr(self, other=None, pairwise=None, **kwargs):
return super(Expanding, self).corr(other=other, pairwise=pairwise,
**kwargs)
class ExpandingGroupby(_GroupByMixin, Expanding):
"""
Provides a expanding groupby implementation
.. versionadded:: 0.18.1
"""
@property
def _constructor(self):
return Expanding
_bias_template = """
Parameters
----------
bias : boolean, default False
Use a standard estimation bias correction
"""
_pairwise_template = """
Parameters
----------
other : Series, DataFrame, or ndarray, optional
if not supplied then will default to self and produce pairwise output
pairwise : bool, default None
If False then only matching columns between self and other will be used and
the output will be a DataFrame.
If True then all pairwise combinations will be calculated and the output
will be a Panel in the case of DataFrame inputs. In the case of missing
elements, only complete pairwise observations will be used.
bias : boolean, default False
Use a standard estimation bias correction
"""
class EWM(_Rolling):
r"""
Provides exponential weighted functions
.. versionadded:: 0.18.0
Parameters
----------
com : float, optional
Specify decay in terms of center of mass,
:math:`\alpha = 1 / (1 + com),\text{ for } com \geq 0`
span : float, optional
Specify decay in terms of span,
:math:`\alpha = 2 / (span + 1),\text{ for } span \geq 1`
halflife : float, optional
Specify decay in terms of half-life,
:math:`\alpha = 1 - exp(log(0.5) / halflife),\text{ for } halflife > 0`
alpha : float, optional
Specify smoothing factor :math:`\alpha` directly,
:math:`0 < \alpha \leq 1`
.. versionadded:: 0.18.0
min_periods : int, default 0
Minimum number of observations in window required to have a value
(otherwise result is NA).
freq : None or string alias / date offset object, default=None (DEPRECATED)
Frequency to conform to before computing statistic
adjust : boolean, default True
Divide by decaying adjustment factor in beginning periods to account
for imbalance in relative weightings (viewing EWMA as a moving average)
ignore_na : boolean, default False
Ignore missing values when calculating weights;
specify True to reproduce pre-0.15.0 behavior
Returns
-------
a Window sub-classed for the particular operation
Examples
--------
>>> df = DataFrame({'B': [0, 1, 2, np.nan, 4]})
B
0 0.0
1 1.0
2 2.0
3 NaN
4 4.0
>>> df.ewm(com=0.5).mean()
B
0 0.000000
1 0.750000
2 1.615385
3 1.615385
4 3.670213
Notes
-----
Exactly one of center of mass, span, half-life, and alpha must be provided.
Allowed values and relationship between the parameters are specified in the
parameter descriptions above; see the link at the end of this section for
a detailed explanation.
The `freq` keyword is used to conform time series data to a specified
frequency by resampling the data. This is done with the default parameters
of :meth:`~pandas.Series.resample` (i.e. using the `mean`).
When adjust is True (default), weighted averages are calculated using
weights (1-alpha)**(n-1), (1-alpha)**(n-2), ..., 1-alpha, 1.
When adjust is False, weighted averages are calculated recursively as:
weighted_average[0] = arg[0];
weighted_average[i] = (1-alpha)*weighted_average[i-1] + alpha*arg[i].
When ignore_na is False (default), weights are based on absolute positions.
For example, the weights of x and y used in calculating the final weighted
average of [x, None, y] are (1-alpha)**2 and 1 (if adjust is True), and
(1-alpha)**2 and alpha (if adjust is False).
When ignore_na is True (reproducing pre-0.15.0 behavior), weights are based
on relative positions. For example, the weights of x and y used in
calculating the final weighted average of [x, None, y] are 1-alpha and 1
(if adjust is True), and 1-alpha and alpha (if adjust is False).
More details can be found at
http://pandas.pydata.org/pandas-docs/stable/computation.html#exponentially-weighted-windows
"""
_attributes = ['com', 'min_periods', 'freq', 'adjust', 'ignore_na', 'axis']
def __init__(self, obj, com=None, span=None, halflife=None, alpha=None,
min_periods=0, freq=None, adjust=True, ignore_na=False,
axis=0):
self.obj = obj
self.com = _get_center_of_mass(com, span, halflife, alpha)
self.min_periods = min_periods
self.freq = freq
self.adjust = adjust
self.ignore_na = ignore_na
self.axis = axis
self.on = None
@property
def _constructor(self):
return EWM
@Substitution(name='ewm')
@Appender(SelectionMixin._see_also_template)
@Appender(SelectionMixin._agg_doc)
def aggregate(self, arg, *args, **kwargs):
return super(EWM, self).aggregate(arg, *args, **kwargs)
agg = aggregate
def _apply(self, func, how=None, **kwargs):
"""Rolling statistical measure using supplied function. Designed to be
used with passed-in Cython array-based functions.
Parameters
----------
func : string/callable to apply
how : string, default to None (DEPRECATED)
how to resample
Returns
-------
y : type of input argument
"""
blocks, obj, index = self._create_blocks(how=how)
results = []
for b in blocks:
try:
values = self._prep_values(b.values)
except TypeError:
results.append(b.values.copy())
continue
if values.size == 0:
results.append(values.copy())
continue
# if we have a string function name, wrap it
if isinstance(func, compat.string_types):
cfunc = getattr(_window, func, None)
if cfunc is None:
raise ValueError("we do not support this function "
"in _window.{0}".format(func))
def func(arg):
return cfunc(arg, self.com, int(self.adjust),
int(self.ignore_na), int(self.min_periods))
results.append(np.apply_along_axis(func, self.axis, values))
return self._wrap_results(results, blocks, obj)
@Substitution(name='ewm')
@Appender(_doc_template)
[docs] def mean(self, *args, **kwargs):
"""exponential weighted moving average"""
nv.validate_window_func('mean', args, kwargs)
return self._apply('ewma', **kwargs)
@Substitution(name='ewm')
@Appender(_doc_template)
@Appender(_bias_template)
[docs] def std(self, bias=False, *args, **kwargs):
"""exponential weighted moving stddev"""
nv.validate_window_func('std', args, kwargs)
return _zsqrt(self.var(bias=bias, **kwargs))
vol = std
@Substitution(name='ewm')
@Appender(_doc_template)
@Appender(_bias_template)
[docs] def var(self, bias=False, *args, **kwargs):
"""exponential weighted moving variance"""
nv.validate_window_func('var', args, kwargs)
def f(arg):
return _window.ewmcov(arg, arg, self.com, int(self.adjust),
int(self.ignore_na), int(self.min_periods),
int(bias))
return self._apply(f, **kwargs)
@Substitution(name='ewm')
@Appender(_doc_template)
@Appender(_pairwise_template)
[docs] def cov(self, other=None, pairwise=None, bias=False, **kwargs):
"""exponential weighted sample covariance"""
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise
other = self._shallow_copy(other)
def _get_cov(X, Y):
X = self._shallow_copy(X)
Y = self._shallow_copy(Y)
cov = _window.ewmcov(X._prep_values(), Y._prep_values(), self.com,
int(self.adjust), int(self.ignore_na),
int(self.min_periods), int(bias))
return X._wrap_result(cov)
return _flex_binary_moment(self._selected_obj, other._selected_obj,
_get_cov, pairwise=bool(pairwise))
@Substitution(name='ewm')
@Appender(_doc_template)
@Appender(_pairwise_template)
[docs] def corr(self, other=None, pairwise=None, **kwargs):
"""exponential weighted sample correlation"""
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise
other = self._shallow_copy(other)
def _get_corr(X, Y):
X = self._shallow_copy(X)
Y = self._shallow_copy(Y)
def _cov(x, y):
return _window.ewmcov(x, y, self.com, int(self.adjust),
int(self.ignore_na),
int(self.min_periods),
1)
x_values = X._prep_values()
y_values = Y._prep_values()
cov = _cov(x_values, y_values)
x_var = _cov(x_values, x_values)
y_var = _cov(y_values, y_values)
corr = cov / _zsqrt(x_var * y_var)
return X._wrap_result(corr)
return _flex_binary_moment(self._selected_obj, other._selected_obj,
_get_corr, pairwise=bool(pairwise))
# Helper Funcs
def _flex_binary_moment(arg1, arg2, f, pairwise=False):
from pandas import Series, DataFrame, Panel
if not (isinstance(arg1, (np.ndarray, Series, DataFrame)) and
isinstance(arg2, (np.ndarray, Series, DataFrame))):
raise TypeError("arguments to moment function must be of type "
"np.ndarray/Series/DataFrame")
if (isinstance(arg1, (np.ndarray, Series)) and
isinstance(arg2, (np.ndarray, Series))):
X, Y = _prep_binary(arg1, arg2)
return f(X, Y)
elif isinstance(arg1, DataFrame):
def dataframe_from_int_dict(data, frame_template):
result = DataFrame(data, index=frame_template.index)
if len(result.columns) > 0:
result.columns = frame_template.columns[result.columns]
return result
results = {}
if isinstance(arg2, DataFrame):
if pairwise is False:
if arg1 is arg2:
# special case in order to handle duplicate column names
for i, col in enumerate(arg1.columns):
results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i])
return dataframe_from_int_dict(results, arg1)
else:
if not arg1.columns.is_unique:
raise ValueError("'arg1' columns are not unique")
if not arg2.columns.is_unique:
raise ValueError("'arg2' columns are not unique")
X, Y = arg1.align(arg2, join='outer')
X = X + 0 * Y
Y = Y + 0 * X
res_columns = arg1.columns.union(arg2.columns)
for col in res_columns:
if col in X and col in Y:
results[col] = f(X[col], Y[col])
return DataFrame(results, index=X.index,
columns=res_columns)
elif pairwise is True:
results = defaultdict(dict)
for i, k1 in enumerate(arg1.columns):
for j, k2 in enumerate(arg2.columns):
if j < i and arg2 is arg1:
# Symmetric case
results[i][j] = results[j][i]
else:
results[i][j] = f(*_prep_binary(arg1.iloc[:, i],
arg2.iloc[:, j]))
p = Panel.from_dict(results).swapaxes('items', 'major')
if len(p.major_axis) > 0:
p.major_axis = arg1.columns[p.major_axis]
if len(p.minor_axis) > 0:
p.minor_axis = arg2.columns[p.minor_axis]
return p
else:
raise ValueError("'pairwise' is not True/False")
else:
results = {}
for i, col in enumerate(arg1.columns):
results[i] = f(*_prep_binary(arg1.iloc[:, i], arg2))
return dataframe_from_int_dict(results, arg1)
else:
return _flex_binary_moment(arg2, arg1, f)
def _get_center_of_mass(com, span, halflife, alpha):
valid_count = len([x for x in [com, span, halflife, alpha]
if x is not None])
if valid_count > 1:
raise ValueError("com, span, halflife, and alpha "
"are mutually exclusive")
# Convert to center of mass; domain checks ensure 0 < alpha <= 1
if com is not None:
if com < 0:
raise ValueError("com must satisfy: com >= 0")
elif span is not None:
if span < 1:
raise ValueError("span must satisfy: span >= 1")
com = (span - 1) / 2.
elif halflife is not None:
if halflife <= 0:
raise ValueError("halflife must satisfy: halflife > 0")
decay = 1 - np.exp(np.log(0.5) / halflife)
com = 1 / decay - 1
elif alpha is not None:
if alpha <= 0 or alpha > 1:
raise ValueError("alpha must satisfy: 0 < alpha <= 1")
com = (1.0 - alpha) / alpha
else:
raise ValueError("Must pass one of com, span, halflife, or alpha")
return float(com)
def _offset(window, center):
if not is_integer(window):
window = len(window)
offset = (window - 1) / 2. if center else 0
try:
return int(offset)
except:
return offset.astype(int)
def _require_min_periods(p):
def _check_func(minp, window):
if minp is None:
return window
else:
return max(p, minp)
return _check_func
def _use_window(minp, window):
if minp is None:
return window
else:
return minp
def _zsqrt(x):
result = np.sqrt(x)
mask = x < 0
from pandas import DataFrame
if isinstance(x, DataFrame):
if mask.values.any():
result[mask] = 0
else:
if mask.any():
result[mask] = 0
return result
def _prep_binary(arg1, arg2):
if not isinstance(arg2, type(arg1)):
raise Exception('Input arrays must be of the same type!')
# mask out values, this also makes a common index...
X = arg1 + 0 * arg2
Y = arg2 + 0 * arg1
return X, Y
# Top-level exports
def rolling(obj, win_type=None, **kwds):
from pandas import Series, DataFrame
if not isinstance(obj, (Series, DataFrame)):
raise TypeError('invalid type: %s' % type(obj))
if win_type is not None:
return Window(obj, win_type=win_type, **kwds)
return Rolling(obj, **kwds)
rolling.__doc__ = Window.__doc__
def expanding(obj, **kwds):
from pandas import Series, DataFrame
if not isinstance(obj, (Series, DataFrame)):
raise TypeError('invalid type: %s' % type(obj))
return Expanding(obj, **kwds)
expanding.__doc__ = Expanding.__doc__
def ewm(obj, **kwds):
from pandas import Series, DataFrame
if not isinstance(obj, (Series, DataFrame)):
raise TypeError('invalid type: %s' % type(obj))
return EWM(obj, **kwds)
ewm.__doc__ = EWM.__doc__