import numpy as np
import pandas as pd
import cvxopt as cv
from typing import Literal, Optional, Union
from scipy.signal import butter, convolve, ellip, filtfilt, find_peaks, \
firwin, resample_poly
from math import gcd
from scipy.fft import fft, ifft, fftfreq
from flirt.eda import get_eda_features
# ============================== EDA Filters =================================
[docs]
class Filters:
"""
A class for filtering raw electrodermal activity (EDA) data.
Parameters/Attributes
---------------------
fs : int
The sampling rate of the EDA signal.
"""
def __init__(self, fs):
"""
Initialize the Filters object.
Parameters
----------
fs : int
The sampling rate of the PPG signal.
"""
self.fs = fs
[docs]
def lowpass_butter(self, signal, cutoff = 2, order = 3):
"""
Filter an EDA signal with a Butterworth low-pass filter.
Parameters
----------
signal : array_like
An array containing the raw EDA signal.
cutoff : int, float
The cut-off frequency at which frequencies above this value in the
EDA signal are attenuated; by default, 2 Hz.
order : int
The filter order, i.e., the number of samples required to produce
the desired filtered output; by default, 3.
Returns
-------
filtered : array_like
An array containing the filtered EDA signal.
"""
nyq = 0.5 * self.fs
normalized_cutoff = cutoff / nyq
b, a = butter(order, normalized_cutoff, btype = 'low', analog = False)
filtered = filtfilt(b, a, signal)
return filtered
[docs]
def lowpass_elliptic(
self,
signal: np.ndarray,
cutoff: float = 1.0,
order: int = 4,
rp: float = 1,
rs: float = 40
) -> np.ndarray:
"""
Apply an elliptic low-pass filter to an EDA signal.
Parameters
----------
signal : array_like
An array containing the raw EDA signal to be filtered.
cutoff : float, optional
The cutoff frequency in Hz; by default, 1 Hz.
order : int, optional
The filter order; by default, 4.
rp : float, optional
Maximum ripple (in dB) allowed in the passband; by default, 1 dB.
rs : float, optional
Minimum attenuation (in dB) required in the stopband;
by default, 40 dB.
Returns
-------
filtered : array_like
An array containing the filtered EDA signal.
"""
nyq = 0.5 * self.fs
wn = cutoff / nyq
b, a = ellip(order, rp, rs, wn, btype = 'low')
filtered = filtfilt(b, a, signal)
return filtered
[docs]
def lowpass_gaussian(self, signal, cutoff: float = 1.0):
"""
Apply a frequency-domain Gaussian low-pass filter to an EDA signal.
Parameters
----------
signal : array_like
An array containing the raw EDA signal.
cutoff : float, optional
The desired cutoff frequency in Hz; by default, 1.0 Hz.
Returns
-------
filtered : array_like
An array containing the low-pass filtered EDA signal.
References
----------
Nabian, M., et al. (2018). An open-source feature extraction tool for
the analysis of peripheral physiological data. IEEE Journal of
Translational Engineering in Health and Medicine, 6, 1-11.
"""
n = len(signal)
freqs = fftfreq(n, d = 1 / self.fs)
# Compute the FFT of the EDA signal
sig_fft = fft(signal)
# Compute the Gaussian frequency response
# Attenuate frequencies above cutoff symmetrically for +/- freqs
gaussian_response = np.exp(-0.5 * (freqs / cutoff) ** 2)
# Filter in the frequency domain
sig_fft_filtered = sig_fft * gaussian_response
# Transform to time domain
filtered = np.real(ifft(sig_fft_filtered))
return filtered
[docs]
def filter_signal(
self,
signal: np.ndarray,
fs: int = 4,
cutoff: float = 0.35,
filter_length: int = 2057,
window_type: Literal['hamming', 'hann', 'blackman'] = 'hamming'
) -> np.ndarray:
"""
Apply a FIR low-pass filter to an EDA signal.
Parameters
----------
signal : array_like
An array containing the raw EDA signal to be filtered.
fs : int, optional
The sampling rate of the signal in Hz; by default, 4 Hz.
cutoff : float, optional
The cut-off frequency above which frequencies are attenuated;
by default, 0.35 Hz.
filter_length : int, optional
The length of the FIR filter (number of taps); by default, 2057.
Larger values give smoother filtering but increase computation
and delay.
window_type : {'hamming', 'hann', 'blackman'}, optional
The type of window function to use for FIR design;
by default, 'hamming'.
Returns
-------
filtered : array_like
An array containing the filtered EDA signal.
References
----------
Kleckner, I.R., Jones, R.M., Wilder-Smith, O., Wormwood, J.B.,
Akcakaya, M., Quigley, K.S., ... & Goodwin, M.S. (2017). Simple,
transparent, and flexible automated quality assessment procedures for
ambulatory electrodermal activity data. IEEE Transactions on
Biomedical Engineering, 65(7), 1460-1467.
Notes
-----
This is the default filter used in the PhysioView Dashboard.
"""
# Normalize cutoff frequency to Nyquist
cutoff_norm = cutoff / (fs / 2)
# Design FIR filter
fir_coeff = firwin(
numtaps = filter_length,
cutoff = cutoff_norm,
window = window_type
)
# Apply filter to signal
filtered = filtfilt(fir_coeff, [1.0], signal)
return filtered
[docs]
def moving_average(self, signal, window_len):
"""
Apply a moving average filter to an EDA signal.
Parameters
----------
signal : array_like
An array containing the raw EDA signal.
window_len : int or float
The moving average window size, in seconds.
Returns
-------
ma : array_like
An array containing the moving average of the EDA signal.
"""
samples = int(window_len * self.fs)
kernel = np.ones(samples) / samples
ma = np.convolve(signal, kernel, mode = 'same')
epsilon = np.finfo(float).eps # pad the beginning with epsilons
ma = np.concatenate((ma, np.full(len(signal) - len(ma), epsilon)))
return ma
# ======================== Other EDA Data Processing =========================
[docs]
def detect_scr_peaks(
phasic: Union[np.ndarray, pd.Series],
smooth_size: int = 20,
min_amp_thresh: float = 0.1,
min_peak_amp: Optional[float] = None,
) -> np.ndarray:
"""
Detect skin conductance response (SCR) peaks in a phasic EDA signal using
the Nabian et al. (2018) approach.
Parameters
----------
phasic : array-like
Phasic component of EDA signal.
smooth_size : int, optional
The size of Bartlett window for smoothing derivative; by default, 20.
min_amp_thresh : float, optional
The minimum SCR amplitude threshold, relative to the max detected
amplitude; by default, 0.1 (i.e., 10%).
min_peak_amp : float, optional
The minimum amplitude for a SCR peak to be considered valid; by
default, None.
Returns
-------
peaks : np.ndarray
An array containing indices of detected SCR peaks.
References
----------
Nabian, M., et al. (2018). An open-source feature extraction tool for
the analysis of peripheral physiological data. IEEE Journal of
Translational Engineering in Health and Medicine, 6, 1-11.
"""
# Differentiate phasic signal
diff = np.diff(phasic, prepend = phasic[0])
# Smooth derivative with Bartlett kernel
kernel = np.bartlett(smooth_size)
kernel /= kernel.sum()
diff_smoothed = convolve(diff, kernel, mode="same")
# Find zero crossings
def _get_zero_crossings(sig, direction = 'positive'):
zc = np.where(np.diff(np.sign(sig)) != 0)[0]
if direction == 'positive':
return [i for i in zc if sig[i] < 0 and sig[i+1] >= 0]
elif direction == 'negative':
return [i for i in zc if sig[i] > 0 and sig[i+1] <= 0]
else:
return zc
pos_crossings = _get_zero_crossings(diff_smoothed, 'positive')
neg_crossings = _get_zero_crossings(diff_smoothed, 'negative')
# Ensure onset before offset
if len(neg_crossings) > 0 and len(pos_crossings) > 0:
if neg_crossings[0] < pos_crossings[0]:
neg_crossings = neg_crossings[1:]
n_pairs = min(len(pos_crossings), len(neg_crossings))
pos_crossings = pos_crossings[:n_pairs]
neg_crossings = neg_crossings[:n_pairs]
# Get peaks
candidates = []
for onset, offset in zip(pos_crossings, neg_crossings):
window = phasic[onset:offset]
if len(window) == 0:
continue
peak_idx = onset + np.argmax(window)
amp = phasic[peak_idx] - phasic[onset]
candidates.append((peak_idx, amp))
# Apply sequential amplitude threshold
if len(candidates) == 0:
return np.array([])
peaks, amps = [], []
for idx, amp in candidates:
if not amps:
if (min_peak_amp is None) or (amp >= min_peak_amp):
peaks.append(idx)
amps.append(amp)
else:
use_rel = amp >= (min_amp_thresh * max(amps))
use_abs = (min_peak_amp is None) or (amp >= min_peak_amp)
if use_abs and use_rel:
peaks.append(idx)
amps.append(amp)
return np.array(peaks)
[docs]
def compute_tonic_scl(
signal: np.ndarray,
fs: int = 4,
seg_size: Optional[int] = None,
) -> Union[float, np.ndarray]:
"""
Compute the tonic skin conductance level (SCL) as the mean of the EDA
signal over a given segment, while excluding intervals corresponding to
skin conductance responses (SCRs).
Parameters
----------
signal: array_like
An array containing the filtered EDA signal.
fs : int, optional
The sampling rate of the EDA signal; by default, 4 Hz.
seg_size : int, optional
The segment size (in seconds) of the EDA signal; by default, None.
If given, the EDA signal is divided into non-overlapping segments of
this length and a tonic SCL is computed for each segment. If None,
the tonic SCL is computed once across the whole signal.
Returns
-------
tonic_scl : float or np.ndarray
The tonic skin conductance level (SCL).
Notes
-----
SCRs are detected based on amplitude and temporal criteria, and the data
from their onset through recovery are omitted from the calculation. If no
SCRs are detected, the tonic SCL is equivalent to the mean of the entire
signal.
"""
def _scr_intervals(
min_height: float = 0.05,
min_rise_time: float = 1.0,
min_recovery_time: float = 2.0
) -> list[tuple]:
"""
Detect SCR intervals (start, end) from an EDA signal.
Parameters
----------
min_height : float, optional
The minimum SCR amplitude (in µS) to be considered valid;
by default, 0.05 uS.
min_rise_time : float, optional
Minimum time (in seconds) from SCR onset to peak;
by default, 1 second.
min_recovery_time : float, optional
Minimum time (in seconds) from peak back to baseline;
by default, 2 seconds.
Returns
-------
scr_intervals : list of tuple
A list of (start_index, end_index) intervals for each detected
SCR.
"""
# Differentiate signal to emphasize rises
diff_sig = np.diff(signal, prepend = signal[0])
# Detect candidate peaks in the EDA signal itself
min_distance = int((min_rise_time + min_recovery_time) * fs)
peaks, props = find_peaks(signal, height = min_height,
distance = min_distance)
# Get start and end indices of SCR intervals
scr_intervals = []
for peak in peaks:
start = peak
# where derivative last went negative before the peak
while start > 0 and diff_sig[start] > 0:
start -= 1
end = peak
# where derivative first goes negative after the peak
while end < len(signal) - 1 and diff_sig[end] < 0:
end += 1
scr_intervals.append((start, end))
return scr_intervals
def _masked_mean(sig_segment: np.ndarray) -> float:
"""Compute mean excluding SCR intervals."""
scr_intervals = _scr_intervals()
mask = np.ones(len(sig_segment), dtype = bool)
for start, end in scr_intervals:
mask[start:end] = False
return np.mean(sig_segment[mask]) if np.any(mask) else np.nan
if seg_size is None:
# One tonic SCL for the whole signal
tonic_scl = _masked_mean(signal)
else:
# Segmented tonic SCL
seg_len = int(seg_size * fs)
n_segments = len(signal) // seg_len
tonic_scl = []
for i in range(n_segments):
start, end = i * seg_len, (i + 1) * seg_len
tonic_scl.append(_masked_mean(signal[start:end]))
tonic_scl = np.array(tonic_scl)
return tonic_scl
[docs]
def decompose_eda(
signal: np.ndarray,
fs: int,
show_progress: bool = True
) -> tuple:
"""
Extract the phasic and tonic components of an electrodermal activity (EDA)
signal using the convex optimization approach by Greco et al. (2015).
This is an alias function for `_cvxEDA()` in this module.
Parameters
----------
signal : array_like
An array containing the EDA signal.
fs : float
The sampling rate of the EDA signal.
Returns
-------
phasic : array_like
The phasic component (fast-moving changes) of the EDA signal.
tonic : array_like
The tonic component (slow-moving changes) of the EDA signal.
References
----------
Greco, A., Valenza, G., Lanata, A., Scilingo, E. P., & Citi, L. (2016).
cvxEDA: A convex optimization approach to electrodermal activity
processing. IEEE Transactions on Biomedical Engineering, 63(4), 797–804.
"""
phasic, _, tonic, _, _, _, _ = _cvxEDA(
signal, fs, options = {'show_progress': show_progress})
return phasic, tonic
[docs]
def compute_features(
signal: np.ndarray,
fs: int,
window_size: int = 180,
step_size: int = 60,
) -> pd.DataFrame:
"""
Compute statistical EDA features from phasic and tonic components of
an EDA signal.
Parameters
----------
signal : array_like
An array containing the raw EDA signal.
fs : int
The sampling rate of the EDA signal.
window_size : int, optional
The size of the window, in seconds, over which EDA features are
computed; by default, 180.
step_size : int, optional
The time interval, in seconds, at which EDA features are computed;
by default, 60.
"""
eda = pd.Series(signal)
eda_features = get_eda_features(
data = eda,
window_length = window_size,
window_step_size = step_size,
data_frequency = fs
)
eda_features.reset_index(drop = True, inplace = True)
return eda_features
[docs]
def resample(
signal: np.ndarray,
fs: int,
new_fs: int
) -> np.ndarray:
"""
Resample a signal to a new sampling frequency using polyphase filtering.
Parameters
----------
signal : array_like
The input signal (1D array).
fs : int
The original sampling frequency of the signal.
new_fs : int
The desired sampling frequency.
Returns
-------
rs : array_like
The resampled signal.
"""
# Ensure integer values
fs = int(round(fs))
new_fs = int(round(new_fs))
# Simplify the up/down ratio
g = gcd(fs, new_fs)
up = new_fs // g
down = fs // g
# Polyphase resampling with anti-aliasing
rs = resample_poly(signal, up, down)
rs = np.asarray(rs).flatten()
return rs
def _cvxEDA(
signal: np.ndarray,
fs: int = 4,
tau0: float = 2.,
tau1: float = 0.7,
delta_knot: float = 10.,
alpha: float = 8e-4,
gamma: float = 1e-2,
solver: 'QP solver' = None,
options: dict = {'reltol': 1e-9, 'show_progress': True}
) -> tuple:
"""
Decompose an EDA signal into its phasic and tonic components using the
convex optimization approach by Greco et al. (2015).
Parameters
----------
signal : array_like
An array containing the EDA signal.
fs : int, optional
The sampling rate of the EDA signal; by default, 4 Hz.
tau0 : float, optional
Slow time constant of the Bateman function; by default, 2.0.
tau1 : float, optional
Fast time constant of the Bateman function; by default, 0.7.
delta_knot : float, optional
Time between knots of the tonic spline function; by default, 10.0.
alpha : float, optional
Penalization for the sparse SMNA driver; by default, 8e-4.
gamma : float, optional
Penalization for the tonic spline coefficients; by default, 1e-2.
solver : object, optional
Sparse QP solver to be used.
options : dict, optional
Solver options.
Returns
-------
r : array_like
The phasic component.
p : array_like
Sparse SMNA driver of phasic component.
t : array_like
The tonic component.
l : array_like
Coefficients of tonic spline.
d : array_like
The Offset and slope of the linear drift term.
e : array_like
Model residuals.
obj : float
Value of objective function being minimized (equation 15 in paper).
Notes
-----
Copyright (C) 2014-2015 Luca Citi, Alberto Greco
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You may contact the original author by e-mail (lciti@ieee.org).
If you use this program in support of published research, please include a
citation of the reference below. If you use this code in a software
package, please explicitly inform end users of this copyright notice and
ask them to cite the reference above in their published research.
References
----------
A. Greco, G. Valenza, A. Lanata, E. P. Scilingo, & L. Citi. (2015). cvxEDA:
A convex optimization approach to electrodermal activity processing. IEEE
Transactions on Biomedical Engineering, 63(4): 797-804.
"""
n = len(signal)
y = cv.matrix(signal)
delta = 1. / fs
# Bateman ARMA model
a1 = 1. / min(tau1, tau0) # a1 > a0
a0 = 1. / max(tau1, tau0)
ar = np.array(
[(a1 * delta + 2.) * (a0 * delta + 2.), 2. * a1 * a0 * delta ** 2 - 8.,
(a1 * delta - 2.) * (a0 * delta - 2.)]) / ((a1 - a0) * delta ** 2)
ma = np.array([1., 2., 1.])
# Matrices for ARMA model
i = np.arange(2, n)
A = cv.spmatrix(np.tile(ar, (n - 2, 1)), np.c_[i, i, i],
np.c_[i, i - 1, i - 2], (n, n))
M = cv.spmatrix(np.tile(ma, (n - 2, 1)), np.c_[i, i, i],
np.c_[i, i - 1, i - 2], (n, n))
# Spline
delta_knot_s = int(round(delta_knot / delta))
spl = np.r_[np.arange(1., delta_knot_s),
np.arange(delta_knot_s, 0., -1.)] # order 1
spl = np.convolve(spl, spl, 'full')
spl /= max(spl)
# Matrix of spline regressors
i = (np.c_[np.arange(-(len(spl) // 2), (len(spl) + 1) // 2)] +
np.r_[np.arange(0, n, delta_knot_s)])
nB = i.shape[1]
j = np.tile(np.arange(nB), (len(spl), 1))
p = np.tile(spl, (nB, 1)).T
valid = (i >= 0) & (i < n)
B = cv.spmatrix(p[valid], i[valid], j[valid])
# Trend
C = cv.matrix(np.c_[np.ones(n), np.arange(1., n + 1.) / n])
nC = C.size[1]
# Solve the problem:
# .5*(M*q + B*l + C*d - y)^2 + alpha*sum(A,1)*p + .5*gamma*l'*l
# s.t. A*q >= 0
old_options = cv.solvers.options.copy()
cv.solvers.options.clear()
cv.solvers.options.update(options)
if solver == 'conelp':
# Use conelp
z = lambda m, n: cv.spmatrix([], [], [], (m, n))
G = cv.sparse([
[-A, z(2, n), M, z(nB + 2, n)], [z(n + 2, nC), C, z(nB + 2, nC)],
[z(n, 1), -1, 1, z(n + nB + 2, 1)],
[z(2 * n + 2, 1), -1, 1, z(nB, 1)],
[z(n + 2, nB), B, z(2, nB),
cv.spmatrix(1.0, range(nB), range(nB))]])
h = cv.matrix([z(n, 1), .5, .5, y, .5, .5, z(nB, 1)])
c = cv.matrix(
[(cv.matrix(alpha, (1, n)) * A).T, z(nC, 1), 1, gamma, z(nB, 1)])
res = cv.solvers.conelp(c, G, h, dims = {
'l': n,
'q': [n + 2, nB + 2],
's': []
})
obj = res['primal objective']
else:
# Use qp
Mt, Ct, Bt = M.T, C.T, B.T
H = cv.sparse([
[Mt * M, Ct * M, Bt * M], [Mt * C, Ct * C, Bt * C],
[Mt * B, Ct * B,
Bt * B + gamma * cv.spmatrix(1.0, range(nB), range(nB))]
])
f = cv.matrix(
[(cv.matrix(alpha, (1, n)) * A).T - Mt * y, -(Ct * y), -(Bt * y)])
res = cv.solvers.qp(H, f, cv.spmatrix(-A.V, A.I, A.J, (n, len(f))),
cv.matrix(0., (n, 1)), solver = solver)
obj = res['primal objective'] + .5 * (y.T * y)
cv.solvers.options.clear()
cv.solvers.options.update(old_options)
l = res['x'][-nB:]
d = res['x'][n:n + nC]
t = B * l + C * d
q = res['x'][:n]
p = A * q
r = M * q
e = y - r - t
return tuple(np.array(a).ravel() for a in (r, p, t, l, d, e, obj))