134 lines
5.1 KiB
Python
134 lines
5.1 KiB
Python
![]() |
"""Calculate the statistical Significance of IMFs."""
|
|||
|
import logging
|
|||
|
import math
|
|||
|
|
|||
|
import numpy as np
|
|||
|
from scipy import stats
|
|||
|
from scipy.signal import find_peaks
|
|||
|
|
|||
|
|
|||
|
# helper function: Find mean period of an IMF
|
|||
|
def mean_period(data):
|
|||
|
"""Return mean-period of signal."""
|
|||
|
peaks = len(find_peaks(data, height=0)[0])
|
|||
|
return len(data) / peaks if peaks > 0 else len(data)
|
|||
|
|
|||
|
|
|||
|
# helper function: find energy of signal/IMF
|
|||
|
def energy(data):
|
|||
|
"""Return energy of signal."""
|
|||
|
return sum(pow(data, 2))
|
|||
|
|
|||
|
|
|||
|
# helper function: find IMF significance in 'a priori' test
|
|||
|
def significance_apriori(energy_density, T, N, alpha):
|
|||
|
"""Check a priori significance and Return True if significant else False."""
|
|||
|
k = abs(stats.norm.ppf((1 - alpha) / 2))
|
|||
|
upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2)))
|
|||
|
lower_limit = -T - (k * (math.sqrt(2 / N) * math.exp(T / 2)))
|
|||
|
|
|||
|
return not (lower_limit <= energy_density <= upper_limit)
|
|||
|
|
|||
|
|
|||
|
# helper function: find significance in 'a posteriori' test
|
|||
|
def significance_aposteriori(scaled_energy_density, T, N, alpha):
|
|||
|
"""Check a posteriori significance and Return True if significant else False."""
|
|||
|
k = abs(stats.norm.ppf((1 - alpha) / 2))
|
|||
|
upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2)))
|
|||
|
return not (scaled_energy_density <= upper_limit)
|
|||
|
|
|||
|
|
|||
|
def whitenoise_check(IMFs: np.ndarray, test_name: str = "aposteriori", rescaling_imf: int = 1, alpha: float = 0.95):
|
|||
|
"""Whitenoise statistical significance test.
|
|||
|
|
|||
|
Performs whitenoise test as described by Wu & Huang [Wu2004]_.
|
|||
|
|
|||
|
References
|
|||
|
----------
|
|||
|
.. [Wu2004] Zhaohua Wu, and Norden E. Huang. "A Study of the Characteristics of White Noise Using the
|
|||
|
Empirical Mode Decomposition Method." Proceedings: Mathematical, Physical and Engineering
|
|||
|
Sciences, vol. 460, no. 2046, The Royal Society, 2004, pp. 1597–611, http://www.jstor.org/stable/4143111.
|
|||
|
|
|||
|
Parameters
|
|||
|
----------
|
|||
|
IMFs: np.ndarray
|
|||
|
(Required) numpy array containing IMFs computed from a normalized signal
|
|||
|
test_name: str
|
|||
|
(Optional) Test type. Supported values: 'apriori', 'aposteriori'. (default 'aposteriori')
|
|||
|
rescaling_imf: int
|
|||
|
(Optional) ith IMF of the signal used in rescaling for 'a posteriori' test. (default 1)
|
|||
|
alpha: float
|
|||
|
(Optional) The percentiles at which the test is to be performed; 0 < alpha < 1; (default 0.95)
|
|||
|
|
|||
|
Returns
|
|||
|
-------
|
|||
|
Optional dictionary
|
|||
|
Returns dictionary with keys and values as IMFs' number and test result, respetively,
|
|||
|
Test results can be either 0 (fail) or 1 (pass).
|
|||
|
In case of problems with the input imfs, e.g. NaN values or no imfs, we return None.
|
|||
|
|
|||
|
Examples
|
|||
|
--------
|
|||
|
>>> import numpy as np
|
|||
|
>>> from PyEMD import EMD
|
|||
|
>>> from PyEMD.checks import whitenoise_check
|
|||
|
>>> T = np.linspace(0, 1, 100)
|
|||
|
>>> S = np.sin(2*2*np.pi*T)
|
|||
|
>>> emd = EMD()
|
|||
|
>>> imfs = emd(S)
|
|||
|
>>> significant_imfs = whitenoise_check(imfs, test_name='apriori')
|
|||
|
>>> significant_imfs
|
|||
|
{1: 1, 2: 1}
|
|||
|
>>> type(significant_imfs)
|
|||
|
<class 'dict'>
|
|||
|
|
|||
|
"""
|
|||
|
assert isinstance(IMFs, np.ndarray), "Invalid Data type, Pass a numpy.ndarray containing IMFs"
|
|||
|
# check if IMFs are empty or not
|
|||
|
if len(IMFs) == 0 or len(IMFs[0]) == 0:
|
|||
|
logging.getLogger("PyEMD").warning("Detected empty input. Skipping check.")
|
|||
|
return None
|
|||
|
|
|||
|
assert isinstance(alpha, float), "Invalid Data type for alpha, pass a float value between (0,1)"
|
|||
|
assert 0 < alpha < 1, "alpha value should be in between (0,1)"
|
|||
|
assert test_name in ("apriori", "aposteriori"), "Invalid test type"
|
|||
|
assert isinstance(rescaling_imf, int), "Invalid data type for rescaling_imf, pass a int value"
|
|||
|
assert 0 < rescaling_imf <= len(IMFs), "Invalid rescaling IMF"
|
|||
|
|
|||
|
if np.any(np.isnan(IMFs)):
|
|||
|
# Return None if input has NaN
|
|||
|
logging.getLogger("PyEMD").warning("Detected NaN values during whitenoise check. Skipping check.")
|
|||
|
return None
|
|||
|
|
|||
|
N = len(IMFs[0])
|
|||
|
output = {}
|
|||
|
if test_name == "apriori":
|
|||
|
for idx, imf in enumerate(IMFs):
|
|||
|
log_T = math.log(mean_period(imf))
|
|||
|
energy_density = math.log(energy(imf) / N)
|
|||
|
sig_priori = significance_apriori(energy_density, log_T, N, alpha)
|
|||
|
|
|||
|
output[idx + 1] = int(sig_priori)
|
|||
|
|
|||
|
elif test_name == "aposteriori":
|
|||
|
scaling_imf_mean_period = math.log(mean_period(IMFs[rescaling_imf - 1]))
|
|||
|
scaling_imf_energy_density = math.log(energy(IMFs[rescaling_imf - 1]) / N)
|
|||
|
|
|||
|
k = abs(stats.norm.ppf((1 - alpha) / 2))
|
|||
|
up_limit = -scaling_imf_mean_period + (k * math.sqrt(2 / N) * math.exp(scaling_imf_mean_period) / 2)
|
|||
|
|
|||
|
scaling_factor = up_limit - scaling_imf_energy_density
|
|||
|
|
|||
|
for idx, imf in enumerate(IMFs):
|
|||
|
log_T = math.log(mean_period(imf))
|
|||
|
energy_density = math.log(energy(imf) / N)
|
|||
|
scaled_energy_density = energy_density + scaling_factor
|
|||
|
sig_aposteriori = significance_aposteriori(scaled_energy_density, log_T, N, alpha)
|
|||
|
|
|||
|
output[idx + 1] = int(sig_aposteriori)
|
|||
|
|
|||
|
else:
|
|||
|
raise AssertionError("Only 'apriori' and 'aposteriori' are allowed")
|
|||
|
|
|||
|
return output
|