Source code for gccestimating

"""Generalized Cross Correlation Estimators.

Istanciation Signatures:

- `gcc = GCC(sig1, sig2, fftlen)`
- `gcc = GCC.from_spectra(spec1, spec2, onesided=True)`

Estimators:

`gcc.cc()`, `gcc.roth()`, `gcc.scot()`, `gcc.phat()`, `gcc.ht()`

`gcc.gamma12()`

"""
from dataclasses import dataclass as _dataclass
import numpy as _np



[docs]class GCC(object): """Returns a GCC instance. Provides estimation methods for Generalized Cross Correlation. Parameters ---------- sig1 : ndarray First signal. sig2 : ndarray Second signal. fftlen : int or None Length of fft to be computed. If None, it will be calculated automatically as next power of two. Returns ------- gcc : GCC Examples -------- >>> import numpy as np >>> import matplotlib.pylab as plt >>> from gccestimating import GCC, corrlags >>> # generate some noise signals >>> nsamp = 1024 >>> noise1 = 0.5*np.random.randn(nsamp) >>> sig1 = np.zeros(nsamp) + noise1 >>> noise2 = 0.5*np.random.randn(nsamp) >>> sig2 = np.zeros_like(sig1) + noise2 >>> noise_both = np.random.randn(256) >>> sig1[:256] = noise_both >>> sig2[500:756] = noise_both >>> # create a lags array >>> lags = corrlags(2*nsamp-1, samplerate=1) >>> # Create the a GCC instance >>> gcc = GCC(sig1, sig2) >>> def mkplot(est, p): >>> plt.subplot(p) >>> plt.plot(lags, est.sig, label=est.name) >>> plt.legend() >>> # calculate the standard cc estimate >>> cc_est = gcc.cc() >>> # plot it using the mkplot function >>> mkplot(cc_est, 611) >>> # plot the other estimates >>> mkplot(gcc.scot(), 612) >>> mkplot(gcc.phat(), 613) >>> mkplot(gcc.roth(), 614) >>> mkplot(gcc.ht(), 615) >>> mkplot(gcc.eckart(noise_both, noise1, noise2), 616) >>> plt.figure() >>> plt.plot(np.correlate(sig1, sig2, 'full')) >>> plt.plot(gcc.cc()) >>> plt.show() """
[docs] @classmethod def from_spectra(cls, spec1, spec2, onesided=True): """Returns a GCC instance. Parameters ---------- spec1 : ndarray First spectrum. spec2 : ndarray Second spectrum. onesided : bool If you provide twosided Spectra (e.g. of comples signals) set to False. Default is True. Returns ------- gcc : GCC """ instance = cls() length1 = len(spec1) if len(spec2) != length1: raise ValueError('Spectra must be of same dimension.') instance._spec1 = spec1 instance._spec2 = spec2 if onesided: instance._fftlen = 2*length1-2 if length1%2 else 2*length1-1 instance._fft = _np.fft.rfft instance._ifft = _np.fft.irfft else: instance._fftlen = length1 instance._fft = _np.fft.fft instance._ifft = _np.fft.ifft instance._corrlen = instance._fftlen - 1 return instance
def __init__(self, sig1=None, sig2=None, fftlen=None): """Returns a GCC instance. Parameters ---------- sig1 : ndarray (N,) First signal. sig2 : ndarray (M,) Second signal. fftlen : int or None Length of fft to be computed. Will be calculated automatically if None (next power of two of 2max(N,M)-1). Returns ------- gcc : GCC """ self._sig1 = None self._sig2 = None self._spec11 = None self._spec22 = None self._spec12 = None self._gamma12 = None self._cc = None self._roth = None self._scot = None self._phat = None self._eckart = None self._ht = None self._corrlen = None self._fftlen = fftlen if sig1 is not None: self._from_signals(sig1, sig2, fftlen) def _from_signals(self, sig1, sig2, fftlen=None): corrlen = len(sig1) + len(sig2) - 1 fftlen = fftlen or int(2**_np.ceil(_np.log2(corrlen))) fft, ifft = _get_fftfuncs(sig1, sig2) spec1 = fft(sig1, fftlen) spec2 = fft(sig2, fftlen) self._corrlen = corrlen self._fftlen = fftlen self._fft = fft self._ifft = ifft self._sig1 = sig1 self._sig2 = sig2 self._spec1 = spec1 self._spec2 = spec2 def _backtransform(self, spec): sig = self._ifft(spec, self._fftlen) sig = _np.roll(sig, len(sig)//2) start = (len(sig)-self._corrlen)//2 + 1 return sig[start:start+self._corrlen] @property def spec11(self): """Returns auto power spectrum of first signal.""" if self._spec11 is None: self._spec11 = _np.real(self._spec1 * _np.conj(self._spec1)) return self._spec11 @property def spec22(self): """Returns auto power spectrum of second signal.""" if self._spec22 is None: self._spec22 = _np.real(self._spec2 * _np.conj(self._spec2)) return self._spec22 @property def spec12(self): """Returns cross power spectrum of first and second signal.""" if self._spec12 is None: self._spec12 = self._spec1*_np.conj(self._spec2) return self._spec12
[docs] @_dataclass(init=True, repr=True, eq=True) class Estimate(): """Data of an Estimate. Instances are returned by estimators in GCC. Parameters ---------- name : str Name of the estimator. sig : ndarray Estimator signal array (Rxy(t), Cross Correlation). spec : ndarray Estimator spectrum (Rxy(f)). """ name: str sig: _np.ndarray spec: _np.ndarray def __array__(self, dtype=None): if dtype is not None: return self.sig.astype(dtype) else: return self.sig def __len__(self): return len(self.sig) def index_to_lag(self, index, samplerate=None): lag = (index - len(self.sig)//2) if samplerate: lag /= samplerate return lag
[docs] def cc(self): """Returns GCC estimate $\\mathcal{F}^{-1} (S_{xy})$ """ if self._cc is None: self._cc = GCC.Estimate( name='CC', sig=self._backtransform(self.spec12), spec=self.spec12) return self._cc
[docs] def roth(self): """Returns GCC Roth estimate $\\mathcal{F}^{-1} (S_{xy}/S_{xx})$ """ if self._roth is None: spec = self.spec12 / _prevent_zerodivision(self.spec11) self._roth = GCC.Estimate( name='Roth', sig=self._backtransform(spec), spec=spec) return self._roth
[docs] def scot(self): """Returns GCC SCOT estimate Smoothed gamma12 Transformed GCC. $\\mathcal{F}^{-1} (S_{xy}/\\sqrt{S_{xx}S_{yy}})$ """ if self._scot is None: spec = self.gamma12() self._scot = GCC.Estimate( name='SCOT', sig=self._backtransform(spec), spec=spec) return self._scot
[docs] def gamma12(self): """Returns gamma12 $\\gamma_{12}(f)$""" if self._gamma12 is None: self._gamma12 = self.spec12 / _prevent_zerodivision( _np.sqrt(self.spec11*self.spec22)) return self._gamma12
[docs] def coherence(self): """Returns the coherence.""" return self.gamma12()**2
[docs] def phat(self): """Returns GCC PHAT estimate PHAse Transformed GCC. $\\mathcal{F}^{-1}(S_{xy}/|S_{xy}|)$ """ if self._phat is None: spec = self.spec12 / _prevent_zerodivision(_np.abs(self.spec12)) self._phat = GCC.Estimate( name='PHAT', sig=self._backtransform(spec), spec=spec) return self._phat
[docs] def eckart(self, sig0, noise1, noise2): """Returns an eckart estimate. Parameters ---------- sig0 : ndarray estimate of the actual signal to be correlated. noise1 : ndarray estimated noise in sig1. noise2 : ndarray estimated noise in sig2 Returns ------- estmate : GCC.Estimate Note ---- only implemented, not fully tested. """ if self._eckart is None: spec_sig0 = self._fft(sig0, self._fftlen) spec_noise1 = self._fft(noise1, self._fftlen) spec_noise2 = self._fft(noise2, self._fftlen) spec_sig00 = _np.real(spec_sig0*spec_sig0.conj()) spec_noise11 = _np.real(spec_noise1*spec_noise1.conj()) spec_noise22 = _np.real(spec_noise2*spec_noise2.conj()) weight = spec_sig00 /_prevent_zerodivision( spec_noise11*spec_noise22) spec = self.spec12*weight self._eckart = GCC.Estimate( name='Eckart', sig=self._backtransform(spec), spec=spec) return self._eckart
[docs] def ht(self): """Returns GCC HT estimate""" if self._ht is None: coh = _np.abs(self.gamma12())**2 spec = self.spec12*coh/_prevent_zerodivision(_np.abs(self.spec12)*(1-coh)) self._ht = GCC.Estimate( name='HT', sig=self._backtransform(spec), spec=spec) return self._ht
[docs]def corrlags(corrlen, samplerate=1): """Returns array of lags. Parameters ---------- corrlen : int Lenght of correlation function (usually 2N-1). samplerate : scalar Returns ------- lags : ndarray """ dt = 1 / samplerate la = corrlen // 2 lb = la+1 if corrlen%2 else la return _np.arange(-la*dt, lb*dt, dt)
def _get_fftfuncs(*signals): """Returns fft, ifft function depending on given signals data type.""" if _np.all([_np.all(_np.isreal(sig)) for sig in signals]): return _np.fft.rfft, _np.fft.irfft else: return _np.fft.fft, _np.fft.ifft def _prevent_zerodivision(sig, reg=1e-12, rep=1e-12): """Replaces values smaler reg. Same for negative values and negative reg. Replaces `sig < reg & sig >= 0` with `rep` and `sig > -reg & sig <= 0` with `-rep` Parameters ---------- sig : ndarray Will be modified by this function. Provide a copy of your array if original is needed. reg : scalar All values around 0 (-reg, reg) are replaced by `rep` rep : scalar Replace value. Returns ------- sig : ndarray Modified sig usable for division. """ reg = abs(reg) rep = abs(rep) sig[_np.logical_and(sig < reg, sig >= 0)] = rep sig[_np.logical_and(sig > -reg, sig <= 0)] = -rep return sig