"""Calculate the statistical Significance of IMFs.""" import logging import math import numpy as np from scipy import stats from scipy.signal import find_peaks # helper function: Find mean period of an IMF def mean_period(data): """Return mean-period of signal.""" peaks = len(find_peaks(data, height=0)[0]) return len(data) / peaks if peaks > 0 else len(data) # helper function: find energy of signal/IMF def energy(data): """Return energy of signal.""" return sum(pow(data, 2)) # helper function: find IMF significance in 'a priori' test def significance_apriori(energy_density, T, N, alpha): """Check a priori significance and Return True if significant else False.""" k = abs(stats.norm.ppf((1 - alpha) / 2)) upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2))) lower_limit = -T - (k * (math.sqrt(2 / N) * math.exp(T / 2))) return not (lower_limit <= energy_density <= upper_limit) # helper function: find significance in 'a posteriori' test def significance_aposteriori(scaled_energy_density, T, N, alpha): """Check a posteriori significance and Return True if significant else False.""" k = abs(stats.norm.ppf((1 - alpha) / 2)) upper_limit = -T + (k * (math.sqrt(2 / N) * math.exp(T / 2))) return not (scaled_energy_density <= upper_limit) def whitenoise_check(IMFs: np.ndarray, test_name: str = "aposteriori", rescaling_imf: int = 1, alpha: float = 0.95): """Whitenoise statistical significance test. Performs whitenoise test as described by Wu & Huang [Wu2004]_. References ---------- .. [Wu2004] Zhaohua Wu, and Norden E. Huang. "A Study of the Characteristics of White Noise Using the Empirical Mode Decomposition Method." Proceedings: Mathematical, Physical and Engineering Sciences, vol. 460, no. 2046, The Royal Society, 2004, pp. 1597–611, http://www.jstor.org/stable/4143111. Parameters ---------- IMFs: np.ndarray (Required) numpy array containing IMFs computed from a normalized signal test_name: str (Optional) Test type. Supported values: 'apriori', 'aposteriori'. (default 'aposteriori') rescaling_imf: int (Optional) ith IMF of the signal used in rescaling for 'a posteriori' test. (default 1) alpha: float (Optional) The percentiles at which the test is to be performed; 0 < alpha < 1; (default 0.95) Returns ------- Optional dictionary Returns dictionary with keys and values as IMFs' number and test result, respetively, Test results can be either 0 (fail) or 1 (pass). In case of problems with the input imfs, e.g. NaN values or no imfs, we return None. Examples -------- >>> import numpy as np >>> from PyEMD import EMD >>> from PyEMD.checks import whitenoise_check >>> T = np.linspace(0, 1, 100) >>> S = np.sin(2*2*np.pi*T) >>> emd = EMD() >>> imfs = emd(S) >>> significant_imfs = whitenoise_check(imfs, test_name='apriori') >>> significant_imfs {1: 1, 2: 1} >>> type(significant_imfs) """ assert isinstance(IMFs, np.ndarray), "Invalid Data type, Pass a numpy.ndarray containing IMFs" # check if IMFs are empty or not if len(IMFs) == 0 or len(IMFs[0]) == 0: logging.getLogger("PyEMD").warning("Detected empty input. Skipping check.") return None assert isinstance(alpha, float), "Invalid Data type for alpha, pass a float value between (0,1)" assert 0 < alpha < 1, "alpha value should be in between (0,1)" assert test_name in ("apriori", "aposteriori"), "Invalid test type" assert isinstance(rescaling_imf, int), "Invalid data type for rescaling_imf, pass a int value" assert 0 < rescaling_imf <= len(IMFs), "Invalid rescaling IMF" if np.any(np.isnan(IMFs)): # Return None if input has NaN logging.getLogger("PyEMD").warning("Detected NaN values during whitenoise check. Skipping check.") return None N = len(IMFs[0]) output = {} if test_name == "apriori": for idx, imf in enumerate(IMFs): log_T = math.log(mean_period(imf)) energy_density = math.log(energy(imf) / N) sig_priori = significance_apriori(energy_density, log_T, N, alpha) output[idx + 1] = int(sig_priori) elif test_name == "aposteriori": scaling_imf_mean_period = math.log(mean_period(IMFs[rescaling_imf - 1])) scaling_imf_energy_density = math.log(energy(IMFs[rescaling_imf - 1]) / N) k = abs(stats.norm.ppf((1 - alpha) / 2)) up_limit = -scaling_imf_mean_period + (k * math.sqrt(2 / N) * math.exp(scaling_imf_mean_period) / 2) scaling_factor = up_limit - scaling_imf_energy_density for idx, imf in enumerate(IMFs): log_T = math.log(mean_period(imf)) energy_density = math.log(energy(imf) / N) scaled_energy_density = energy_density + scaling_factor sig_aposteriori = significance_aposteriori(scaled_energy_density, log_T, N, alpha) output[idx + 1] = int(sig_aposteriori) else: raise AssertionError("Only 'apriori' and 'aposteriori' are allowed") return output