Source code for physioview.pipeline.ECG

from typing import Literal
from scipy.signal import butter, cheby1, ellip, filtfilt, find_peaks, \
    hilbert, iirnotch, lfilter, sosfiltfilt
from scipy.ndimage import uniform_filter1d
import pandas as pd
import numpy as np

# ============================== ECG Filters =================================
[docs] class Filters: """ A class for filtering raw electrocardiogram (ECG) signals. Parameters/Attributes --------------------- fs : int The sampling rate of the ECG signal. powerline_freq : int The powerline interference frequency. This value must be either 50 or 60 (by default, 60). The 50 Hz power grid is prevalent in many European, Asian, and African countries. """ def __init__(self,fs: int, powerline_freq: Literal[50, 60] = 60): """ Initialize the Filters object. Parameters ---------- fs : int The sampling rate of the ECG signal. powerline_freq : {50, 60} The powerline interference frequency. This value must be either 50 or 60 (by default, 60). The 50 Hz power grid is prevalent in many European, Asian, and African countries. """ self.fs = fs if powerline_freq not in [50, 60]: raise ValueError('The `powerline_freq` parameter must be set to ' '50 or 60 for the powerline interference ' 'frequency.') else: self.pl_freq = powerline_freq
[docs] def baseline_wander( self, signal: np.ndarray, cutoff: float = 0.05, order: int = 2 ) -> np.ndarray: """ Apply a high-pass filter to remove baseline wander from ECG data. Parameters ---------- signal : array_like An array containing the noisy signal data. cutoff : float The cut-off frequency at which frequencies below this value in the signal are attenuated; by default, 0.05 Hz. order : int The filter order, i.e., the number of samples required to produce the desired filtered output; by default, 2. Returns ------- filtered : array_like An array containing the filtered signal data. """ nyquist = 0.5 * self.fs highcut = cutoff / nyquist b, a = butter(order, highcut, btype = 'high', analog = False) filtered = filtfilt(b, a, signal) return filtered
[docs] def muscle_noise( self, signal: np.ndarray, lowcut: float = 30, highcut: float= 100, order: int = 2 ) -> np.ndarray: """ Apply a bandstop filter to remove muscle (EMG) noise from ECG data. Parameters ---------- signal : array_like An array containing the noisy signal data. lowcut : float, optional The lower cutoff frequency of the bandstop filter; by default, 30. highcut : float, optional The upper cutoff frequency of the bandstop filter; by default, 100 order : int, optional The filter order, i.e., the number of samples required to produce the desired filtered output; by default, 2. Returns ------- filtered : array_like The filtered signal with muscle/EMG noise removed. """ nyquist = 0.5 * self.fs low = lowcut / nyquist high = highcut / nyquist sos = butter(order, [low, high], btype = 'bandstop', analog = False, output = 'sos') filtered = sosfiltfilt(sos, signal) return filtered
[docs] def powerline_interference( self, signal: np.ndarray, q: float = 30 ) -> np.ndarray: """ Filter out powerline interference at a specified frequency. Parameters ---------- signal : array_like An array containing the noisy signal data. q : float, optional The quality factor, i.e., how narrow or wide the stopband is for a notch filter (by default, 30). A higher quality factor indicates a narrower bandpass. Returns ------- filtered : array_like An array containing the filtered signal data. """ w0 = 2 * np.pi * self.pl_freq / self.fs b, a = iirnotch(w0, q) filtered = filtfilt(b, a, signal) return filtered
[docs] def filter_signal( self, signal: np.ndarray, lowcut: float = 1, highcut: float = 15, rp: float= 0.15, rs: float = 80, order: int = 2 ) -> np.ndarray: """ Filter out artifact from ECG data due to powerline interference, baseline wander, movement, and muscle noise using an elliptic (Cauer) bandpass filter. Parameters ---------- signal : array_like An array containing the ECG data to be filtered. lowcut : float, optional The lower cutoff frequency (in Hz) of the bandpass filter below which frequencies are attenuated; by default, 1 Hz. highcut : float, optional The upper cutoff frequency (in Hz) of the bandpass filter above which frequencies are attenuated; by default, 15 Hz. rp : float, optional The maximum ripple (in dB) allowed in the passband; by default, 0.15 dB. Smaller values make the passband flatter (i.e., allow less distortion of ECG frequencies). rs : float, optional The minimum ripple (in dB) allowed in the stopband; by default, 80 dB. Larger values reject more noise. order : int, optional The filter order. Lower orders yield smoother transitions but less sharp cutoff; by default, 2. Returns ------- filtered : array_like An array containing the bandpass-filtered ECG signal. Notes ----- This is the default filter used in the PhysioView Dashboard. """ nyq = 0.5 * self.fs low = lowcut / nyq high = highcut / nyq b, a = ellip(order, rp, rs, [low, high], btype = 'band') filtered = filtfilt(b, a, signal) return filtered
# ======================= ECG Beat Detection Methods =========================
[docs] class BeatDetectors: """ A class for detecting beats in electrocardiogram (ECG) signals using popular algorithms. Parameters/Attributes --------------------- fs : int The sampling rate of the ECG signal. preprocessed : bool, optional Whether the ECG signal is preprocessed or not; by default, True. Notes ----- Both unfiltered and filtered ECG data may be passed as inputs to the beat detection functions. If unfiltered data is passed, the `preprocessed` parameter should be set to `False`, and the beat detection functions will pre-process the signal using the algorithm's original pre-processing procedures. """ def __init__(self, fs: int, preprocessed: bool = True): """ Initialize the BeatDetectors object. Parameters ---------- fs : int The sampling rate of the ECG signal. preprocessed : bool, optional Whether the ECG signal is preprocessed or not; by default, True. """ self.fs = fs if not isinstance(preprocessed, bool): raise ValueError( 'The `preprocessed` attribute must be True or False.') else: self.preprocessed = preprocessed
[docs] def engzee( self, signal: np.ndarray ) -> np.ndarray: """ Extract QRS complex locations from an ECG signal with the Engelse and Zeelenberg (1979) algorithm, modified by Lourenço et al. (2011). Parameters ---------- signal : array_like An array containing the ECG signal. Returns ------- ecg_beats : array_like An array containing the indices of detected R peaks. References ---------- Engelse, W. A. H., & Zeelenberg, C. (1979). A single scan algorithm for QRS detection and feature extraction. IEEE Computers in Cardiology, 6, 37-42. Lourenço, A., Fred, A., & Ribeiro, B. (2012). Real time electrocardiogram segmentation for finger based ECG biometrics. In Proceedings of the 2012 Eighth International Conference on Intelligent Information Hiding and Multimedia Signal Processing (IIH-MSP) (pp. 21-24). IEEE. """ if not self.preprocessed: # Pre-process data using built-in filters signal = Filters.filter_signal(signal, self.fs) else: pass # Differentiate the input signal diff = np.zeros(len(signal)) for i in range(4, len(diff)): diff[i] = signal[i] - signal[i - 4] # Apply low-pass filter to the differentiated signal ci = [1, 4, 6, 4, 1] # coefficients low_pass = lfilter(ci, 1, diff) # Zero out the first part of the signal to avoid edge effects low_pass[: int(0.2 * self.fs)] = 0 # Define threshold parameters ms200 = int(0.2 * self.fs) ms1200 = int(1.2 * self.fs) ms160 = int(0.16 * self.fs) neg_threshold = int(0.01 * self.fs) # Initialize variables for threshold calculation M = 0 M_list = [] neg_m = [] MM = [] M_slope = np.linspace(1.0, 0.6, ms1200 - ms200) # Initialize lists for detected QRS complexes and R-peaks QRS = [] ecg_beats = [] # Initialize variables for peak detection counter = 0 # counter for tracking negative peaks thi_list = [] # list of indices where threshold is initially crossed thi = False # if threshold is initially crossed thf_list = [] # list of indices where threshold is finally crossed thf = False # if threshold is finallay crossed newM5 = False # if new M5 value is calculated # Compensate for potential delay in peak detection engzee_fake_delay = 0 # Loop through the low-pass filtered signal for peak detection for i in range(len(low_pass)): # Calculate threshold 'M' based on current QRS complex if i < 5 * self.fs: M = 0.6 * np.max(low_pass[: i + 1]) MM.append(M) if len(MM) > 5: MM.pop(0) elif QRS and i < QRS[-1] + ms200: newM5 = 0.6 * np.max(low_pass[QRS[-1]: i]) if newM5 > 1.5 * MM[-1]: newM5 = 1.1 * MM[-1] elif newM5 and QRS and i == QRS[-1] + ms200: MM.append(newM5) if len(MM) > 5: MM.pop(0) M = np.mean(MM) elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200: M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)] elif QRS and i > QRS[-1] + ms1200: M = 0.6 * np.mean(MM) M_list.append(M) neg_m.append(-M) # Detect QRS complexes if not QRS and low_pass[i] > M: QRS.append(i) thi_list.append(i) thi = True elif QRS and i > QRS[-1] + ms200 and low_pass[i] > M: QRS.append(i) thi_list.append(i) thi = True # Check for negative threshold crossing within defined window if thi and i < thi_list[-1] + ms160: if low_pass[i] < -M and low_pass[i - 1] > -M: thf = True if thf and low_pass[i] < -M: thf_list.append(i) counter += 1 elif low_pass[i] > -M and thf: counter = 0 thi = False thf = False elif thi and i > thi_list[-1] + ms160: counter = 0 thi = False thf = False # Check if the number of negative threshold crossings exceeds # the threshold if counter > neg_threshold: # Extract the unfiltered section around the detected peak unfiltered_section = signal[ thi_list[-1] - int(0.01 * self.fs): i] # Detect R-peaks and append to the list ecg_beats.append( engzee_fake_delay + np.argmax(unfiltered_section) + thi_list[-1] - int(0.01 * self.fs)) counter = 0 thi = False thf = False # Remove the first detection as it requires QRS complex amplitude # for the threshold ecg_beats.pop(0) # Convert list of beats to numpy array and return ecg_beats = np.array(ecg_beats, dtype = 'int') ecg_beats = self._remove_dupes(ecg_beats) return ecg_beats
[docs] def manikandan( self, signal: np.ndarray, adaptive_threshold: bool = True, window: float = 0.44 ) -> np.ndarray: """ Extract R peak locations from an ECG signal with the Manikandan and Soman (2012) algorithm. Parameters ---------- signal : array_like An array containing the ECG signal. adaptive_threshold : boolean, optional Whether to refine beat detection using adaptive thresholding; by default, True. window : float, optional The size (in seconds) of the sliding search window for the adaptive threshold; by default, 0.44 seconds (440 milliseconds). Returns ------- ecg_beats : array_like An array containing the indices of detected R peaks. References ---------- Manikandan, R., & Soman, K. P. (2012). A novel method for detecting R-peaks in electrocardiogram (ECG) signal. Biomedical Signal Processing and Control, 7(2), 118-128. """ def _adapt_thresh(signal, beats_ix, fs, window, step = 0.1): """Get beat indices with amplitudes that pass a minimum reference threshold.""" window_len = int(fs * window) window_step = int(fs * step) signal_beats = pd.DataFrame({'signal': signal}) signal_beats.loc[beats_ix, 'beat'] = 1 for n in range(0, len(signal_beats), window_step): w = signal_beats[n:(n + window_len)] if not w.beat.sum() >= 2: pass else: s = w['signal'] beats = w[w.beat == 1].index.values if len(beats) == 2: thresh = (s[beats].min() + s[beats].max()) * 0.5 if len(beats) > 2: thresh = (s[beats].median() + s[beats].max()) * 0.5 reject = s[beats][s[beats] < thresh].index signal_beats.loc[reject, 'beat'] = np.nan passing_beats = signal_beats[signal_beats.beat == 1].index.values return passing_beats if not self.preprocessed: signal = self._cheby1_filter(signal) else: pass signal = np.array(signal) # Differentiate the ECG signal dn = (np.append(signal[1:], 0) - signal) # Normalize the differentiated signal dtn = dn / (np.max(abs(dn))) # Compute their absolute values an = abs(dtn) # Compute their energy values en = an ** 2 # Compute the Shannon entropy and energy values sen = -abs(dtn) * np.log10(abs(dtn)) # entropy sn = -(dtn ** 2) * np.log10(dtn ** 2) # energy # Set the window size according to the duration of a QRS complex # (usually 120–150 ms) window_len = int(0.15 * self.fs) # Apply a moving average filter to the energy values sn_f = np.insert(self._ma_cumulative_sum(sn, window_len), 0, [0] * (window_len - 1)) # Apply the Hilbert transform to the filtered energy values zn = np.imag(hilbert(sn_f)) # Apply a moving average filter again to remove low-frequency drift # 2.5 sec from Manikandan & Soman (900 samples) # 2.5 sec in 500 Hz == 1250 samples ma_len = int(self.fs * 2.5) zn_ma = np.insert( self._ma_cumulative_sum(zn, ma_len), 0, [0] * (ma_len - 1)) # Compute the difference to get only the high-frequency components zn_ma_s = zn - zn_ma # Look for zero-crossings: https://stackoverflow.com/a/28766902/6205282 idx = np.argwhere(np.diff(np.sign(zn_ma_s)) > 0).flatten().tolist() # Prepare a container for windows idx_search = [] ecg_beats = np.empty(0, dtype = int) search_window_half = round(self.fs * .12) for n in idx: lows = np.arange(n - search_window_half, n) highs = np.arange(n + 1, (n + search_window_half + 1)) if highs[-1] >= len(signal): highs = np.delete( highs, np.arange( np.where(highs == len(signal))[0], len(highs))) ecg_window = np.concatenate((lows, [n], highs)) idx_search.append(ecg_window) ecg_window_wave = signal[ecg_window] peak_loc = ecg_window[ np.where(ecg_window_wave == np.max(ecg_window_wave))[0]] if peak_loc.item() > 0: ecg_beats = np.append(ecg_beats, peak_loc) ecg_beats = self._remove_dupes(ecg_beats) if adaptive_threshold is True: passing_beats = _adapt_thresh(signal, ecg_beats, self.fs, window) return passing_beats else: return ecg_beats
[docs] def nabian( self, signal: np.ndarray ) -> np.ndarray: """ Extract R peak locations from an ECG signal with the Nabian et al. (2018) algorithm. Parameters ---------- signal : array_like An array containing the ECG signal. Returns ------- ecg_beats : array_like An array containing the indices of detected R peaks. References ---------- Nabian, M., et al. (2018). An open-source feature extraction tool for the analysis of peripheral physiological data. IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11. """ if not self.preprocessed: signal = self._elliptic_bandpass_filter(signal) else: pass window_size = int(0.4 * self.fs) peaks = np.zeros(len(signal)) for i in range(1 + window_size, len(signal) - window_size): ecg_window = signal[i - window_size: i + window_size] rpeak = np.argmax(ecg_window) if i == (i - window_size - 1 + rpeak): peaks[i] = 1 ecg_beats = np.where(peaks == 1)[0] ecg_beats = self._remove_dupes(ecg_beats) return ecg_beats
[docs] def pantompkins( self, signal: np.ndarray ) -> np.ndarray: """ Extract QRS complex locations from an ECG signal with the Pan & Tompkins (1985) algorithm. Parameters ---------- signal : array_like An array containing the ECG signal. Returns ------- ecg_beats : array_like An array containing the indices of detected beats. References ---------- Pan, S. J., & Tompkins, W. J. (1985). A real-time QRS detection algorithm. IEEE Transactions on Biomedical Engineering, 32(3), 230-236. """ if not self.preprocessed: signal = self._butter_bandpass_filter(signal, method = 'pan') else: pass # Compute and square the differentiated signal diff = np.diff(signal) squared = diff * diff # Integrate the signal over a moving window of 150 ms in size window_size = int(0.15 * self.fs) mwa = uniform_filter1d( squared, window_size, origin = (window_size - 1) // 2) head_size = min(window_size - 1, len(squared)) mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace( 1, head_size, head_size) mwa[: int(0.2 * self.fs)] = 0 # Set the minimum distance criteria for peak validation min_peak_dist = int(0.3 * self.fs) min_missed_dist = int(0.25 * self.fs) # Initialize an array to store the beats ecg_beats = [] # Preset running estimates of the signal and noise peaks SPKI = 0.0 NPKI = 0.0 last_peak = 0 last_index = -1 peaks, _ = find_peaks(mwa, plateau_size = (1, 1)) for i, peak in enumerate(peaks): peak_value = mwa[peak] # Update the first threshold value threshold_I1 = NPKI + 0.25 * (SPKI - NPKI) # Check the current peak amplitude against the first threshold # and whether its location is greater than the minimum peak # distance from the last detected peak if (peak_value > threshold_I1) and peak > ( last_peak + min_peak_dist): ecg_beats.append(peak) # "Missed" IBI threshold is based on the previous eight IBIs if len(ecg_beats) > 9: IBI_avg = (ecg_beats[-2] - ecg_beats[-10]) // 8 IBI_missed = int(1.66 * IBI_avg) if (peak - last_peak) > IBI_missed: missed_peaks = peaks[last_index + 1: i] # Check the missed IBIs against the minimum missed # IBI distance missed_peaks = missed_peaks[ (missed_peaks > last_peak + min_missed_dist) & ( missed_peaks < peak - min_missed_dist)] # Update the second threshold value threshold_I2 = 0.5 * threshold_I1 # Check the peak amplitude at the missed peak location # against the second threshold missed_peaks = missed_peaks[ mwa[missed_peaks] > threshold_I2] if len(missed_peaks) > 0: ecg_beats[-1] = missed_peaks[ np.argmax(mwa[missed_peaks])] ecg_beats.append(peak) last_peak = peak last_index = i SPKI = 0.125 * peak_value + 0.875 * SPKI else: NPKI = 0.125 * peak_value + 0.875 * NPKI ecg_beats = self._remove_dupes(ecg_beats) return ecg_beats
def _ma_cumulative_sum( self, signal: np.ndarray, window_len: int ) -> np.ndarray: """Moving average filter using cumulative sums.""" cumsum = np.cumsum(np.insert(signal, 0, 0)) ma = (cumsum[window_len:] - cumsum[:-window_len]) / float(window_len) return ma def _ma_convolution( self, signal: np.ndarray, window_len: int ) -> np.ndarray: """Moving average filter using convolution.""" padded_diff = np.pad( signal, (window_len // 2, window_len // 2), mode = 'constant') ma = np.convolve( padded_diff, np.ones(window_len) / window_len, mode = 'valid') return ma def _butter_bandpass_filter( self, signal: np.ndarray, method: str ) -> np.ndarray: """A Butterworth bandpass filter, used in the preprocessing procedure of the Pan & Tompkins (1985) and Hamilton & Tompkins (1986) QRS detection algorithms. All parameters, including the passband frequency range (`Wn`) and order (`N`) of the filter, are provided as default values according to the algorithms.""" if method not in ['pan', 'hamilton']: raise ValueError('The `method` parameter must take either \'pan\' ' 'or \'hamilton\'.') else: if method == 'pan': lowcut = 0.5 highcut = 15 if method == 'hamilton': # Based on https://github.com/berndporr/py-ecg-detectors/ lowcut = 8 highcut = 16 nyquist = 0.5 * self.fs low = lowcut / nyquist high = highcut / nyquist b, a = butter(N = 2, Wn = [low, high], btype = 'band') preprocessed = filtfilt(b, a, signal) return preprocessed def _elliptic_bandpass_filter( self, signal: np.ndarray ) -> np.ndarray: """An elliptic bandpass filter, used in the preprocessing procedure of the Nabian et al. (2018) R peak detection algorithm. All parameters, including the lower and upper cutoff frequencies (`Wn`), passband ripple (`rp`), stopband attenuation (`rs`), and order (`N`) of the filter, are provided as default values according to the algorithm.""" nyq = 0.5 * self.fs lowcut = 0.5 highcut = 50 low = lowcut / nyq high = highcut / nyq b, a = ellip(N = 2, rs = 0.5, rp = 40, Wn = [low, high], btype = 'band') preprocessed = filtfilt(b, a, signal) return preprocessed def _cheby1_filter( self, signal: np.ndarray ) -> np.ndarray: """A Chebyshev Type I bandpass filter, used in the preprocessing procedure of the Manikandan and Soman (2012) beat detection algorithm. All parameters, including the lower and upper cutoff frequencies (`Wn`), passband ripple (`rp`), and order (`N`), of the filter are provided as default values according to the algorithm.""" nyquist = 0.5 * self.fs lowcut = 6 highcut = 18 low = lowcut / nyquist high = highcut / nyquist b, a = cheby1(N = 4, rp = 1, Wn = [low, high], btype = 'bandpass') preprocessed = filtfilt(b, a, signal) return preprocessed def _remove_dupes( self, ecg_beats: np.ndarray ) -> np.ndarray: """Remove duplicate indices in an `ecg_beats` array.""" ecg_beats = np.array(ecg_beats) unique_values, unique_ix = np.unique(ecg_beats, return_index = True) dupe_removed = ecg_beats[sorted(unique_ix)] return dupe_removed