'''Parametric Mixture Distributions
Created on Sat Jun 04 2011
Author: Josef Perktold
Notes:
Compound Poisson has mass point at zero
http://en.wikipedia.org/wiki/Compound_Poisson_distribution
and would need special treatment
need a distribution that has discrete mass points and contiuous range, e.g.
compound Poisson, Tweedie (for some parameter range),
pdf of Tobit model (?) - truncation with clipping
Question: Metaclasses and class factories for generating new distributions from
existing distributions by transformation, mixing, compounding
'''
from __future__ import print_function
import numpy as np
from scipy import stats
[docs]class ParametricMixtureD(object):
'''mixtures with a discrete distribution
The mixing distribution is a discrete distribution like scipy.stats.poisson.
All distribution in the mixture of the same type and parameterized
by the outcome of the mixing distribution and have to be a continuous
distribution (or have a pdf method).
As an example, a mixture of normal distributed random variables with
Poisson as the mixing distribution.
assumes vectorized shape, loc and scale as in scipy.stats.distributions
assume mixing_dist is frozen
initialization looks fragile for all possible cases of lower and upper
bounds of the distributions.
'''
[docs] def __init__(self, mixing_dist, base_dist, bd_args_func, bd_kwds_func,
cutoff=1e-3):
'''create a mixture distribution
Parameters
----------
mixing_dist : discrete frozen distribution
mixing distribution
base_dist : continuous distribution
parameterized distributions in the mixture
bd_args_func : callable
function that builds the tuple of args for the base_dist.
The function obtains as argument the values in the support of
the mixing distribution and should return an empty tuple or
a tuple of arrays.
bd_kwds_func : callable
function that builds the dictionary of kwds for the base_dist.
The function obtains as argument the values in the support of
the mixing distribution and should return an empty dictionary or
a dictionary with arrays as values.
cutoff : float
If the mixing distribution has infinite support, then the
distribution is truncated with approximately (subject to integer
conversion) the cutoff probability in the missing tail. Random
draws that are outside the truncated range are clipped, that is
assigned to the highest or lowest value in the truncated support.
'''
self.mixing_dist = mixing_dist
self.base_dist = base_dist
#self.bd_args = bd_args
if not np.isneginf(mixing_dist.dist.a):
lower = mixing_dist.dist.a
else:
lower = mixing_dist.ppf(1e-4)
if not np.isposinf(mixing_dist.dist.b):
upper = mixing_dist.dist.b
else:
upper = mixing_dist.isf(1e-4)
self.ma = lower
self.mb = upper
mixing_support = np.arange(lower, upper+1)
self.mixing_probs = mixing_dist.pmf(mixing_support)
self.bd_args = bd_args_func(mixing_support)
self.bd_kwds = bd_kwds_func(mixing_support)
[docs] def rvs(self, size=1):
mrvs = self.mixing_dist.rvs(size)
#TODO: check strange cases ? this assumes continous integers
mrvs_idx = (np.clip(mrvs, self.ma, self.mb) - self.ma).astype(int)
bd_args = tuple(md[mrvs_idx] for md in self.bd_args)
bd_kwds = dict((k, self.bd_kwds[k][mrvs_idx]) for k in self.bd_kwds)
kwds = {'size':size}
kwds.update(bd_kwds)
rvs = self.base_dist.rvs(*self.bd_args, **kwds)
return rvs, mrvs_idx
[docs] def pdf(self, x):
x = np.asarray(x)
if np.size(x) > 1:
x = x[...,None] #[None, ...]
bd_probs = self.base_dist.pdf(x, *self.bd_args, **self.bd_kwds)
prob = (bd_probs * self.mixing_probs).sum(-1)
return prob, bd_probs
[docs] def cdf(self, x):
x = np.asarray(x)
if np.size(x) > 1:
x = x[...,None] #[None, ...]
bd_probs = self.base_dist.cdf(x, *self.bd_args, **self.bd_kwds)
prob = (bd_probs * self.mixing_probs).sum(-1)
return prob, bd_probs
#try:
[docs]class ClippedContinuous(object):
'''clipped continuous distribution with a masspoint at clip_lower
Notes
-----
first version, to try out possible designs
insufficient checks for valid arguments and not clear
whether it works for distributions that have compact support
clip_lower is fixed and independent of the distribution parameters.
The clip_lower point in the pdf has to be interpreted as a mass point,
i.e. different treatment in integration and expect function, which means
none of the generic methods for this can be used.
maybe this will be better designed as a mixture between a degenerate or
discrete and a continuous distribution
Warning: uses equality to check for clip_lower values in function
arguments, since these are floating points, the comparison might fail
if clip_lower values are not exactly equal.
We could add a check whether the values are in a small neighborhood, but
it would be expensive (need to search and check all values).
'''
[docs] def __init__(self, base_dist, clip_lower):
self.base_dist = base_dist
self.clip_lower = clip_lower
def _get_clip_lower(self, kwds):
'''helper method to get clip_lower from kwds or attribute
'''
if not 'clip_lower' in kwds:
clip_lower = self.clip_lower
else:
clip_lower = kwds.pop('clip_lower')
return clip_lower, kwds
[docs] def rvs(self, *args, **kwds):
clip_lower, kwds = self._get_clip_lower(kwds)
rvs_ = self.base_dist.rvs(*args, **kwds)
#same as numpy.clip ?
rvs_[rvs_ < clip_lower] = clip_lower
return rvs_
[docs] def pdf(self, x, *args, **kwds):
x = np.atleast_1d(x)
if not 'clip_lower' in kwds:
clip_lower = self.clip_lower
else:
#allow clip_lower to be a possible parameter
clip_lower = kwds.pop('clip_lower')
pdf_raw = np.atleast_1d(self.base_dist.pdf(x, *args, **kwds))
clip_mask = (x == self.clip_lower)
if np.any(clip_mask):
clip_prob = self.base_dist.cdf(clip_lower, *args, **kwds)
pdf_raw[clip_mask] = clip_prob
#the following will be handled by sub-classing rv_continuous
pdf_raw[x < clip_lower] = 0
return pdf_raw
[docs] def cdf(self, x, *args, **kwds):
if not 'clip_lower' in kwds:
clip_lower = self.clip_lower
else:
#allow clip_lower to be a possible parameter
clip_lower = kwds.pop('clip_lower')
cdf_raw = self.base_dist.cdf(x, *args, **kwds)
#not needed if equality test is used
## clip_mask = (x == self.clip_lower)
## if np.any(clip_mask):
## clip_prob = self.base_dist.cdf(clip_lower, *args, **kwds)
## pdf_raw[clip_mask] = clip_prob
#the following will be handled by sub-classing rv_continuous
#if self.a is defined
cdf_raw[x < clip_lower] = 0
return cdf_raw
[docs] def sf(self, x, *args, **kwds):
if not 'clip_lower' in kwds:
clip_lower = self.clip_lower
else:
#allow clip_lower to be a possible parameter
clip_lower = kwds.pop('clip_lower')
sf_raw = self.base_dist.sf(x, *args, **kwds)
sf_raw[x <= clip_lower] = 1
return sf_raw
[docs] def ppf(self, x, *args, **kwds):
raise NotImplementedError
[docs] def plot(self, x, *args, **kwds):
clip_lower, kwds = self._get_clip_lower(kwds)
mass = self.pdf(clip_lower, *args, **kwds)
xr = np.concatenate(([clip_lower+1e-6], x[x>clip_lower]))
import matplotlib.pyplot as plt
#x = np.linspace(-4, 4, 21)
#plt.figure()
plt.xlim(clip_lower-0.1, x.max())
#remove duplicate calculation
xpdf = self.pdf(x, *args, **kwds)
plt.ylim(0, max(mass, xpdf.max())*1.1)
plt.plot(xr, self.pdf(xr, *args, **kwds))
#plt.vline(clip_lower, self.pdf(clip_lower, *args, **kwds))
plt.stem([clip_lower], [mass],
linefmt='b-', markerfmt='bo', basefmt='r-')
return
if __name__ == '__main__':
doplots = 1
#*********** Poisson-Normal Mixture
mdist = stats.poisson(2.)
bdist = stats.norm
bd_args_fn = lambda x: ()
#bd_kwds_fn = lambda x: {'loc': np.atleast_2d(10./(1+x))}
bd_kwds_fn = lambda x: {'loc': x, 'scale': 0.1*np.ones_like(x)} #10./(1+x)}
pd = ParametricMixtureD(mdist, bdist, bd_args_fn, bd_kwds_fn)
print(pd.pdf(1))
p, bp = pd.pdf(np.linspace(0,20,21))
pc, bpc = pd.cdf(np.linspace(0,20,21))
print(pd.rvs())
rvs, m = pd.rvs(size=1000)
if doplots:
import matplotlib.pyplot as plt
plt.hist(rvs, bins = 100)
plt.title('poisson mixture of normal distributions')
#********** clipped normal distribution (Tobit)
bdist = stats.norm
clip_lower_ = 0. #-0.5
cnorm = ClippedContinuous(bdist, clip_lower_)
x = np.linspace(1e-8, 4, 11)
print(cnorm.pdf(x))
print(cnorm.cdf(x))
if doplots:
#plt.figure()
#cnorm.plot(x)
plt.figure()
cnorm.plot(x = np.linspace(-1, 4, 51), loc=0.5, scale=np.sqrt(2))
plt.title('clipped normal distribution')
fig = plt.figure()
for i, loc in enumerate([0., 0.5, 1.,2.]):
fig.add_subplot(2,2,i+1)
cnorm.plot(x = np.linspace(-1, 4, 51), loc=loc, scale=np.sqrt(2))
plt.title('clipped normal, loc = %3.2f' % loc)
loc = 1.5
rvs = cnorm.rvs(loc=loc, size=2000)
plt.figure()
plt.hist(rvs, bins=50)
plt.title('clipped normal rvs, loc = %3.2f' % loc)
#plt.show()