'''getting started with diffusions, continuous time stochastic processes
Author: josef-pktd
License: BSD
References
----------
An Algorithmic Introduction to Numerical Simulation of Stochastic Differential
Equations
Author(s): Desmond J. Higham
Source: SIAM Review, Vol. 43, No. 3 (Sep., 2001), pp. 525-546
Published by: Society for Industrial and Applied Mathematics
Stable URL: http://www.jstor.org/stable/3649798
http://www.sitmo.com/ especially the formula collection
Notes
-----
OU process: use same trick for ARMA with constant (non-zero mean) and drift
some of the processes have easy multivariate extensions
*Open Issues*
include xzero in returned sample or not? currently not
*TODOS*
* Milstein from Higham paper, for which processes does it apply
* Maximum Likelihood estimation
* more statistical properties (useful for tests)
* helper functions for display and MonteCarlo summaries (also for testing/checking)
* more processes for the menagerie (e.g. from empirical papers)
* characteristic functions
* transformations, non-linear e.g. log
* special estimators, e.g. Ait Sahalia, empirical characteristic functions
* fft examples
* check naming of methods, "simulate", "sample", "simexact", ... ?
stochastic volatility models: estimation unclear
finance applications ? option pricing, interest rate models
'''
from __future__ import print_function
import numpy as np
from scipy import stats, signal
import matplotlib.pyplot as plt
#np.random.seed(987656789)
[docs]class Diffusion(object):
'''Wiener Process, Brownian Motion with mu=0 and sigma=1
'''
[docs] def __init__(self):
pass
[docs] def simulateW(self, nobs=100, T=1, dt=None, nrepl=1):
'''generate sample of Wiener Process
'''
dt = T*1.0/nobs
t = np.linspace(dt, 1, nobs)
dW = np.sqrt(dt)*np.random.normal(size=(nrepl, nobs))
W = np.cumsum(dW,1)
self.dW = dW
return W, t
[docs] def expectedsim(self, func, nobs=100, T=1, dt=None, nrepl=1):
'''get expectation of a function of a Wiener Process by simulation
initially test example from
'''
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
U = func(t, W)
Umean = U.mean(0)
return U, Umean, t
[docs]class AffineDiffusion(Diffusion):
'''
differential equation:
:math::
dx_t = f(t,x)dt + \sigma(t,x)dW_t
integral:
:math::
x_T = x_0 + \\int_{0}^{T}f(t,S)dt + \\int_0^T \\sigma(t,S)dW_t
TODO: check definition, affine, what about jump diffusion?
'''
[docs] def __init__(self):
pass
[docs] def sim(self, nobs=100, T=1, dt=None, nrepl=1):
# this doesn't look correct if drift or sig depend on x
# see arithmetic BM
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
dx = self._drift() + self._sig() * W
x = np.cumsum(dx,1)
xmean = x.mean(0)
return x, xmean, t
[docs] def simEM(self, xzero=None, nobs=100, T=1, dt=None, nrepl=1, Tratio=4):
'''
from Higham 2001
TODO: reverse parameterization to start with final nobs and DT
TODO: check if I can skip the loop using my way from exactprocess
problem might be Winc (reshape into 3d and sum)
TODO: (later) check memory efficiency for large simulations
'''
#TODO: reverse parameterization to start with final nobs and DT
nobs = nobs * Tratio # simple way to change parameter
# maybe wrong parameterization,
# drift too large, variance too small ? which dt/Dt
# _drift, _sig independent of dt is wrong
if xzero is None:
xzero = self.xzero
if dt is None:
dt = T*1.0/nobs
W, t = self.simulateW(nobs=nobs, T=T, dt=dt, nrepl=nrepl)
dW = self.dW
t = np.linspace(dt, 1, nobs)
Dt = Tratio*dt;
L = nobs/Tratio; # L EM steps of size Dt = R*dt
Xem = np.zeros((nrepl,L)); # preallocate for efficiency
Xtemp = xzero
Xem[:,0] = xzero
for j in np.arange(1,L):
#Winc = np.sum(dW[:,Tratio*(j-1)+1:Tratio*j],1)
Winc = np.sum(dW[:,np.arange(Tratio*(j-1)+1,Tratio*j)],1)
#Xtemp = Xtemp + Dt*lamda*Xtemp + mu*Xtemp*Winc;
Xtemp = Xtemp + self._drift(x=Xtemp) + self._sig(x=Xtemp) * Winc
#Dt*lamda*Xtemp + mu*Xtemp*Winc;
Xem[:,j] = Xtemp
return Xem
'''
R = 4; Dt = R*dt; L = N/R; % L EM steps of size Dt = R*dt
Xem = zeros(1,L); % preallocate for efficiency
Xtemp = Xzero;
for j = 1:L
Winc = sum(dW(R*(j-1)+1:R*j));
Xtemp = Xtemp + Dt*lambda*Xtemp + mu*Xtemp*Winc;
Xem(j) = Xtemp;
end
'''
[docs]class ExactDiffusion(AffineDiffusion):
'''Diffusion that has an exact integral representation
this is currently mainly for geometric, log processes
'''
[docs] def __init__(self):
pass
[docs] def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
'''ddt : discrete delta t
should be the same as an AR(1)
not tested yet
'''
t = np.linspace(ddt, nobs*ddt, nobs)
#expnt = np.exp(-self.lambd * t)
expddt = np.exp(-self.lambd * ddt)
normrvs = np.random.normal(size=(nrepl,nobs))
#do I need lfilter here AR(1) ? if mean reverting lag-coeff<1
#lfilter doesn't handle 2d arrays, it does?
inc = self._exactconst(expddt) + self._exactstd(expddt) * normrvs
return signal.lfilter([1.], [1.,-expddt], inc)
[docs] def exactdist(self, xzero, t):
expnt = np.exp(-self.lambd * t)
meant = xzero * expnt + self._exactconst(expnt)
stdt = self._exactstd(expnt)
return stats.norm(loc=meant, scale=stdt)
[docs]class ArithmeticBrownian(AffineDiffusion):
'''
:math::
dx_t &= \\mu dt + \\sigma dW_t
'''
[docs] def __init__(self, xzero, mu, sigma):
self.xzero = xzero
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
return self.mu
def _sig(self, *args, **kwds):
return self.sigma
[docs] def exactprocess(self, nobs, xzero=None, ddt=1., nrepl=2):
'''ddt : discrete delta t
not tested yet
'''
if xzero is None:
xzero = self.xzero
t = np.linspace(ddt, nobs*ddt, nobs)
normrvs = np.random.normal(size=(nrepl,nobs))
inc = self._drift + self._sigma * np.sqrt(ddt) * normrvs
#return signal.lfilter([1.], [1.,-1], inc)
return xzero + np.cumsum(inc,1)
[docs] def exactdist(self, xzero, t):
expnt = np.exp(-self.lambd * t)
meant = self._drift * t
stdt = self._sigma * np.sqrt(t)
return stats.norm(loc=meant, scale=stdt)
[docs]class GeometricBrownian(AffineDiffusion):
'''Geometric Brownian Motion
:math::
dx_t &= \\mu x_t dt + \\sigma x_t dW_t
$x_t $ stochastic process of Geometric Brownian motion,
$\mu $ is the drift,
$\sigma $ is the Volatility,
$W$ is the Wiener process (Brownian motion).
'''
[docs] def __init__(self, xzero, mu, sigma):
self.xzero = xzero
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
x = kwds['x']
return self.mu * x
def _sig(self, *args, **kwds):
x = kwds['x']
return self.sigma * x
[docs]class OUprocess(AffineDiffusion):
'''Ornstein-Uhlenbeck
:math::
dx_t&=\\lambda(\\mu - x_t)dt+\\sigma dW_t
mean reverting process
TODO: move exact higher up in class hierarchy
'''
[docs] def __init__(self, xzero, mu, lambd, sigma):
self.xzero = xzero
self.lambd = lambd
self.mu = mu
self.sigma = sigma
def _drift(self, *args, **kwds):
x = kwds['x']
return self.lambd * (self.mu - x)
def _sig(self, *args, **kwds):
x = kwds['x']
return self.sigma * x
[docs] def exact(self, xzero, t, normrvs):
#TODO: aggregate over time for process with observations for all t
# i.e. exact conditional distribution for discrete time increment
# -> exactprocess
#TODO: for single t, return stats.norm -> exactdist
expnt = np.exp(-self.lambd * t)
return (xzero * expnt + self.mu * (1-expnt) +
self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd) * normrvs)
[docs] def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
'''ddt : discrete delta t
should be the same as an AR(1)
not tested yet
# after writing this I saw the same use of lfilter in sitmo
'''
t = np.linspace(ddt, nobs*ddt, nobs)
expnt = np.exp(-self.lambd * t)
expddt = np.exp(-self.lambd * ddt)
normrvs = np.random.normal(size=(nrepl,nobs))
#do I need lfilter here AR(1) ? lfilter doesn't handle 2d arrays, it does?
from scipy import signal
#xzero * expnt
inc = ( self.mu * (1-expddt) +
self.sigma * np.sqrt((1-expddt*expddt)/2./self.lambd) * normrvs )
return signal.lfilter([1.], [1.,-expddt], inc)
[docs] def exactdist(self, xzero, t):
#TODO: aggregate over time for process with observations for all t
#TODO: for single t, return stats.norm
expnt = np.exp(-self.lambd * t)
meant = xzero * expnt + self.mu * (1-expnt)
stdt = self.sigma * np.sqrt((1-expnt*expnt)/2./self.lambd)
from scipy import stats
return stats.norm(loc=meant, scale=stdt)
[docs] def fitls(self, data, dt):
'''assumes data is 1d, univariate time series
formula from sitmo
'''
# brute force, no parameter estimation errors
nobs = len(data)-1
exog = np.column_stack((np.ones(nobs), data[:-1]))
parest, res, rank, sing = np.linalg.lstsq(exog, data[1:])
const, slope = parest
errvar = res/(nobs-2.)
lambd = -np.log(slope)/dt
sigma = np.sqrt(-errvar * 2.*np.log(slope)/ (1-slope**2)/dt)
mu = const / (1-slope)
return mu, lambd, sigma
[docs]class SchwartzOne(ExactDiffusion):
'''the Schwartz type 1 stochastic process
:math::
dx_t = \\kappa (\\mu - \\ln x_t) x_t dt + \\sigma x_tdW \\
The Schwartz type 1 process is a log of the Ornstein-Uhlenbeck stochastic
process.
'''
[docs] def __init__(self, xzero, mu, kappa, sigma):
self.xzero = xzero
self.mu = mu
self.kappa = kappa
self.lambd = kappa #alias until I fix exact
self.sigma = sigma
def _exactconst(self, expnt):
return (1-expnt) * (self.mu - self.sigma**2 / 2. /self.kappa)
def _exactstd(self, expnt):
return self.sigma * np.sqrt((1-expnt*expnt)/2./self.kappa)
[docs] def exactprocess(self, xzero, nobs, ddt=1., nrepl=2):
'''uses exact solution for log of process
'''
lnxzero = np.log(xzero)
lnx = super(self.__class__, self).exactprocess(xzero, nobs, ddt=ddt, nrepl=nrepl)
return np.exp(lnx)
[docs] def exactdist(self, xzero, t):
expnt = np.exp(-self.lambd * t)
#TODO: check this is still wrong, just guessing
meant = np.log(xzero) * expnt + self._exactconst(expnt)
stdt = self._exactstd(expnt)
return stats.lognorm(loc=meant, scale=stdt)
[docs] def fitls(self, data, dt):
'''assumes data is 1d, univariate time series
formula from sitmo
'''
# brute force, no parameter estimation errors
nobs = len(data)-1
exog = np.column_stack((np.ones(nobs),np.log(data[:-1])))
parest, res, rank, sing = np.linalg.lstsq(exog, np.log(data[1:]))
const, slope = parest
errvar = res/(nobs-2.) #check denominator estimate, of sigma too low
kappa = -np.log(slope)/dt
sigma = np.sqrt(errvar * kappa / (1-np.exp(-2*kappa*dt)))
mu = const / (1-np.exp(-kappa*dt)) + sigma**2/2./kappa
if np.shape(mu)== (1,): mu = mu[0] # how to remove scalar array ?
if np.shape(sigma)== (1,): sigma = sigma[0]
#mu, kappa are good, sigma too small
return mu, kappa, sigma
[docs]class BrownianBridge(object):
[docs] def __init__(self):
pass
[docs] def simulate(self, x0, x1, nobs, nrepl=1, ddt=1., sigma=1.):
nobs=nobs+1
dt = ddt*1./nobs
t = np.linspace(dt, ddt-dt, nobs)
t = np.linspace(dt, ddt, nobs)
wm = [t/ddt, 1-t/ddt]
#wmi = wm[1]
#wm1 = x1*wm[0]
wmi = 1-dt/(ddt-t)
wm1 = x1*(dt/(ddt-t))
su = sigma* np.sqrt(t*(1-t)/ddt)
s = sigma* np.sqrt(dt*(ddt-t-dt)/(ddt-t))
x = np.zeros((nrepl, nobs))
x[:,0] = x0
rvs = s*np.random.normal(size=(nrepl,nobs))
for i in range(1,nobs):
x[:,i] = x[:,i-1]*wmi[i] + wm1[i] + rvs[:,i]
return x, t, su
[docs]class CompoundPoisson(object):
'''nobs iid compound poisson distributions, not a process in time
'''
[docs] def __init__(self, lambd, randfn=np.random.normal):
if len(lambd) != len(randfn):
raise ValueError('lambd and randfn need to have the same number of elements')
self.nobj = len(lambd)
self.randfn = randfn
self.lambd = np.asarray(lambd)
[docs] def simulate(self, nobs, nrepl=1):
nobj = self.nobj
x = np.zeros((nrepl, nobs, nobj))
N = np.random.poisson(self.lambd[None,None,:], size=(nrepl,nobs,nobj))
for io in range(nobj):
randfnc = self.randfn[io]
nc = N[:,:,io]
#print nrepl,nobs,nc
#xio = randfnc(size=(nrepl,nobs,np.max(nc))).cumsum(-1)[np.arange(nrepl)[:,None],np.arange(nobs),nc-1]
rvs = randfnc(size=(nrepl,nobs,np.max(nc)))
print('rvs.sum()', rvs.sum(), rvs.shape)
xio = rvs.cumsum(-1)[np.arange(nrepl)[:,None],np.arange(nobs),nc-1]
#print xio.shape
x[:,:,io] = xio
x[N==0] = 0
return x, N
'''
randn('state',100) % set the state of randn
T = 1; N = 500; dt = T/N; t = [dt:dt:1];
M = 1000; % M paths simultaneously
dW = sqrt(dt)*randn(M,N); % increments
W = cumsum(dW,2); % cumulative sum
U = exp(repmat(t,[M 1]) + 0.5*W);
Umean = mean(U);
plot([0,t],[1,Umean],'b-'), hold on % plot mean over M paths
plot([0,t],[ones(5,1),U(1:5,:)],'r--'), hold off % plot 5 individual paths
xlabel('t','FontSize',16)
ylabel('U(t)','FontSize',16,'Rotation',0,'HorizontalAlignment','right')
legend('mean of 1000 paths','5 individual paths',2)
averr = norm((Umean - exp(9*t/8)),'inf') % sample error
'''
if __name__ == '__main__':
doplot = 1
nrepl = 1000
examples = []#['all']
if 'all' in examples:
w = Diffusion()
# Wiener Process
# ^^^^^^^^^^^^^^
ws = w.simulateW(1000, nrepl=nrepl)
if doplot:
plt.figure()
tmp = plt.plot(ws[0].T)
tmp = plt.plot(ws[0].mean(0), linewidth=2)
plt.title('Standard Brownian Motion (Wiener Process)')
func = lambda t, W: np.exp(t + 0.5*W)
us = w.expectedsim(func, nobs=500, nrepl=nrepl)
if doplot:
plt.figure()
tmp = plt.plot(us[0].T)
tmp = plt.plot(us[1], linewidth=2)
plt.title('Brownian Motion - exp')
#plt.show()
averr = np.linalg.norm(us[1] - np.exp(9*us[2]/8.), np.inf)
print(averr)
#print us[1][:10]
#print np.exp(9.*us[2][:10]/8.)
# Geometric Brownian
# ^^^^^^^^^^^^^^^^^^
gb = GeometricBrownian(xzero=1., mu=0.01, sigma=0.5)
gbs = gb.simEM(nobs=100, nrepl=100)
if doplot:
plt.figure()
tmp = plt.plot(gbs.T)
tmp = plt.plot(gbs.mean(0), linewidth=2)
plt.title('Geometric Brownian')
plt.figure()
tmp = plt.plot(np.log(gbs).T)
tmp = plt.plot(np.log(gbs.mean(0)), linewidth=2)
plt.title('Geometric Brownian - log-transformed')
ab = ArithmeticBrownian(xzero=1, mu=0.05, sigma=1)
abs = ab.simEM(nobs=100, nrepl=100)
if doplot:
plt.figure()
tmp = plt.plot(abs.T)
tmp = plt.plot(abs.mean(0), linewidth=2)
plt.title('Arithmetic Brownian')
# Ornstein-Uhlenbeck
# ^^^^^^^^^^^^^^^^^^
ou = OUprocess(xzero=2, mu=1, lambd=0.5, sigma=0.1)
ous = ou.simEM()
oue = ou.exact(1, 1, np.random.normal(size=(5,10)))
ou.exact(0, np.linspace(0,10,10/0.1), 0)
ou.exactprocess(0,10)
print(ou.exactprocess(0,10, ddt=0.1,nrepl=10).mean(0))
#the following looks good, approaches mu
oues = ou.exactprocess(0,100, ddt=0.1,nrepl=100)
if doplot:
plt.figure()
tmp = plt.plot(oues.T)
tmp = plt.plot(oues.mean(0), linewidth=2)
plt.title('Ornstein-Uhlenbeck')
# SchwartsOne
# ^^^^^^^^^^^
so = SchwartzOne(xzero=0, mu=1, kappa=0.5, sigma=0.1)
sos = so.exactprocess(0,50, ddt=0.1,nrepl=100)
print(sos.mean(0))
print(np.log(sos.mean(0)))
doplot = 1
if doplot:
plt.figure()
tmp = plt.plot(sos.T)
tmp = plt.plot(sos.mean(0), linewidth=2)
plt.title('Schwartz One')
print(so.fitls(sos[0,:],dt=0.1))
sos2 = so.exactprocess(0,500, ddt=0.1,nrepl=5)
print('true: mu=1, kappa=0.5, sigma=0.1')
for i in range(5):
print(so.fitls(sos2[i],dt=0.1))
# Brownian Bridge
# ^^^^^^^^^^^^^^^
bb = BrownianBridge()
#bbs = bb.sample(x0, x1, nobs, nrepl=1, ddt=1., sigma=1.)
bbs, t, wm = bb.simulate(0, 0.5, 99, nrepl=500, ddt=1., sigma=0.1)
if doplot:
plt.figure()
tmp = plt.plot(bbs.T)
tmp = plt.plot(bbs.mean(0), linewidth=2)
plt.title('Brownian Bridge')
plt.figure()
plt.plot(wm,'r', label='theoretical')
plt.plot(bbs.std(0), label='simulated')
plt.title('Brownian Bridge - Variance')
plt.legend()
# Compound Poisson
# ^^^^^^^^^^^^^^^^
cp = CompoundPoisson([1,1], [np.random.normal,np.random.normal])
cps = cp.simulate(nobs=20000,nrepl=3)
print(cps[0].sum(-1).sum(-1))
print(cps[0].sum())
print(cps[0].mean(-1).mean(-1))
print(cps[0].mean())
print(cps[1].size)
print(cps[1].sum())
#Note Y = sum^{N} X is compound poisson of iid x, then
#E(Y) = E(N)*E(X) eg. eq. (6.37) page 385 in http://ee.stanford.edu/~gray/sp.html
#plt.show()