Source code for heartview.pipeline.SQA

import pandas as pd
import numpy as np
import plotly.graph_objects as go
import dash_bootstrap_components as dbc
from tqdm import tqdm
from math import ceil
from scipy.interpolate import interp1d


# ============================== CARDIOVASCULAR ==============================
[docs] class Cardio: """ A class for signal quality assessment on cardiovascular data, including electrocardiograph (ECG) or photoplethysmograph (PPG) data. Parameters/Attributes --------------------- fs : int The sampling rate of the cardiovascular data. """ def __init__(self, fs): """ Initialize the Cardiovascular object. Parameters ---------- fs : int The sampling rate of the ECG or PPG recording. """ self.fs = int(fs)
[docs] def compute_metrics(self, data, beats_ix, artifacts_ix, ts_col = None, seg_size = 60, min_hr = 40, rolling_window = None, rolling_step = 15, show_progress = True): """ Compute all SQA metrics for cardiovascular data by segment or moving window. Metrics per segment or moving window include numbers of detected, expected, missing, and artifactual beats and percentages of missing and artifactual beats. Parameters ---------- data : pandas.DataFrame A DataFrame containing pre-processed ECG or PPG data. beats_ix : array_like An array containing the indices of detected beats. artifacts_ix : array_like An array containing the indices of artifactual beats. seg_size : int The segment size in seconds; by default, 60. min_hr : int, float The minimum acceptable heart rate against which the number of beats in the last partial segment will be compared; by default, 40. ts_col : str, optional The name of the column containing timestamps; by default, None. If a string value is given, the output will contain a timestamps column. rolling_window : int, optional The size, in seconds, of the sliding window across which to compute the SQA metrics; by default, None. rolling_step : int, optional The step size, in seconds, of the sliding windows; by default, 15. show_progress : bool, optional Whether to display a progress bar while the function runs; by default, True. Returns ------- metrics : pandas.DataFrame A DataFrame with all computed SQA metrics per segment. Notes ----- If a value is given in the `rolling_window` parameter, the rolling window approach will override the segmented approach, ignoring any `seg_size` value. Examples -------- >>> from heartview.pipeline import SQA >>> sqa = SQA.Cardio(fs = 1000) >>> artifacts_ix = sqa.identify_artifacts(beats_ix, method = 'both') >>> cardio_qa = sqa.compute_metrics(ecg, beats_ix, artifacts_ix, \ ... ts_col = 'Timestamp', \ ... seg_size = 60, min_hr = 40) """ df = data.copy() df.index = df.index.astype(int) df.loc[beats_ix, 'Beat'] = 1 if rolling_window is not None: metrics = pd.DataFrame() if ts_col is not None: seconds = self.get_seconds(data, beats_ix, ts_col, show_progress = show_progress) s = 1 for n in tqdm(range(0, len(seconds), rolling_step), disable = not show_progress): # Get missing beats window_missing = seconds.iloc[n:(n + rolling_window)] n_expected = round(window_missing['Mean HR'].median() * (seg_size / 60), 0) n_detected = window_missing['N Beats'].sum() n_missing = (n_expected - n_detected) \ if n_expected > n_detected else 0 perc_missing = round((n_missing / n_expected) * 100, 2) ts = window_missing['Timestamp'].iloc[0] # Summarize artifactual beats artifacts = self.get_artifacts( df, beats_ix, artifacts_ix, seg_size = 1, ts_col = ts_col) window_artifact = artifacts.iloc[n:(n + rolling_window)] n_artifact = window_artifact['N Artifact'].sum() perc_artifact = round((n_artifact / n_detected) * 100, 2) # Output summary metrics = pd.concat([metrics, pd.DataFrame.from_records([{ 'Moving Window': s, 'Timestamp': ts, 'N Expected': n_expected, 'N Detected': n_detected, 'N Missing': n_missing, '% Missing': perc_missing, 'N Artifact': n_artifact, '% Artifact': perc_artifact }])], ignore_index = True).reset_index(drop = True) s += 1 else: seconds = self.get_seconds(data, beats_ix, show_progress = show_progress) s = 1 for n in tqdm(range(0, len(seconds), rolling_step), disable = not show_progress): # Get missing beats window_missing = seconds.iloc[n:(n + rolling_window)] n_expected = round(window_missing['Mean HR'].median() * (seg_size / 60), 0) n_detected = window_missing['N Beats'].sum() n_missing = (n_expected - n_detected) \ if n_expected > n_detected else 0 perc_missing = round((n_missing / n_expected) * 100, 2) # Get artifactual beats artifacts = self.get_artifacts( df, beats_ix, artifacts_ix, seg_size = 1) window_artifact = artifacts.iloc[n:(n + rolling_window)] n_artifact = window_artifact['N Artifact'].sum() perc_artifact = round((n_artifact / n_detected) * 100, 2) # Output summary metrics = pd.concat([metrics, pd.DataFrame.from_records([{ 'Moving Window': s, 'N Expected': n_expected, 'N Detected': n_detected, 'N Missing': n_missing, '% Missing': perc_missing, 'N Artifact': n_artifact, '% Artifact': perc_artifact }])], ignore_index = True).reset_index(drop = True) s += 1 # Handle last partial rolling window of data last_seg_len = len(seconds) % rolling_window if last_seg_len > 0: last_detected = metrics['N Detected'].iloc[-1] last_expected_ratio = min_hr / metrics['N Expected'].iloc[:-1].median() last_expected = last_expected_ratio * last_seg_len if last_expected > last_detected: last_n_missing = last_expected - last_detected last_perc_missing = round( (last_n_missing / last_expected) * 100, 2) else: last_perc_missing = 0 last_n_missing = 0 metrics['N Expected'].iloc[-1] = last_expected metrics['N Missing'].iloc[-1] = last_n_missing metrics['% Missing'].iloc[-1] = last_perc_missing else: if ts_col is not None: missing = self.get_missing( df, beats_ix, seg_size, min_hr = min_hr, ts_col = ts_col, show_progress = show_progress) artifacts = self.get_artifacts( df, beats_ix, artifacts_ix, seg_size, ts_col) metrics = pd.merge(missing, artifacts, on = ['Segment', 'Timestamp']) metrics['Invalid'] = metrics['N Detected'].apply( lambda n: 1 if n < int(min_hr * (seg_size/60)) or n > 220 else np.nan) else: missing = self.get_missing( df, beats_ix, seg_size, show_progress = show_progress) artifacts = self.get_artifacts( df, beats_ix, artifacts_ix, seg_size) metrics = pd.merge(missing, artifacts, on = ['Segment']) metrics['Invalid'] = metrics['N Detected'].apply( lambda x: 1 if x < int(min_hr * (seg_size/60)) or x > 220 else np.nan) return metrics
[docs] def display_summary_table(self, sqa_df): """ Display the SQA summary table. Parameters ---------- sqa_df : pandas.DataFrame The DataFrame containing the SQA metrics per segment. Returns ------- table : dash_bootstrap_components.Table Summary table for SQA metrics. """ missing_n = len(sqa_df.loc[sqa_df['N Missing'] > 0]) artifact_n = len(sqa_df.loc[sqa_df['N Artifact'] > 0]) invalid_n = len(sqa_df.loc[sqa_df['Invalid'] == 1]) avg_missing = '{0:.2f}%'.format(sqa_df['% Missing'].mean()) avg_artifact = '{0:.2f}%'.format( sqa_df.loc[sqa_df['% Artifact'] > 0, '% Artifact'].mean()) summary = pd.DataFrame({ 'Signal Quality Metrics': ['Segments with Missing Beats', 'Segments with Artifactual Beats', 'Segments with Invalid Beats', 'Average % Missing Beats/Segment', 'Average % Artifactual Beats/Segment'], '': [missing_n, artifact_n, invalid_n, avg_missing, avg_artifact] }) summary.set_index('Signal Quality Metrics', inplace = True) table = dbc.Table.from_dataframe( summary, index = True, className = 'segmentTable', striped = False, hover = False, bordered = False ) return table
[docs] def get_artifacts(self, data, beats_ix, artifacts_ix, seg_size = 60, ts_col = None): """ Summarize the number and proportion of artifactual beats per segment. Parameters ---------- data : pandas.DataFrame A DataFrame containing the pre-processed ECG or PPG data. beats_ix : array_like An array containing the indices of detected beats. artifacts_ix : array_like An array containing the indices of artifactual beats. This is outputted from `SQA.Cardio.identify_artifacts()`. seg_size : int The size of the segment in seconds; by default, 60. ts_col : str, optional The name of the column containing timestamps; by default, None. If a string value is given, the output will contain a timestamps column. Returns ------- artifacts : pandas.DataFrame A DataFrame with the number and proportion of artifactual beats per segment. See Also -------- SQA.Cardio.identify_artifacts : Identify artifactual beats using both or either of the methods. """ df = data.copy() df.loc[beats_ix, 'Beat'] = 1 df.loc[artifacts_ix, 'Artifact'] = 1 n_seg = ceil(len(df) / (self.fs * seg_size)) segments = pd.Series(np.arange(1, n_seg + 1)) n_detected = df.groupby( df.index // (self.fs * seg_size))['Beat'].sum() n_artifact = df.groupby( df.index // (self.fs * seg_size))['Artifact'].sum() perc_artifact = round((n_artifact / n_detected) * 100, 2) if ts_col is not None: timestamps = df.groupby( df.index // (self.fs * seg_size)).first()[ts_col] artifacts = pd.concat([ segments, timestamps, n_artifact, perc_artifact, ], axis = 1) artifacts.columns = [ 'Segment', 'Timestamp', 'N Artifact', '% Artifact', ] else: artifacts = pd.concat([ segments, n_artifact, perc_artifact, ], axis = 1) artifacts.columns = [ 'Segment', 'N Artifact', '% Artifact', ] return artifacts
[docs] def identify_artifacts(self, beats_ix, method, initial_hr = None, prev_n = None, neighbors = None, tol = None): """ Identify locations of artifactual beats in cardiovascular data based on the criterion beat difference approach by Berntson et al. (1990), the Hegarty-Craver et al. (2018) approach, or both. Parameters ---------- beats_ix : array_like An array containing the indices of detected beats. method : str The artifact identification method for identifying artifacts. This must be 'hegarty', 'cbd', or 'both'. initial_hr : int, float, or 'auto', optional The heart rate value for the first interbeat interval (IBI) to be validated against; by default, 'auto' for automatic calculation using the mean heart rate value obtained from six consecutive IBIs with the smallest average successive difference. Required for the 'hegarty' method. prev_n : int, optional The number of preceding IBIs to validate against; by default, 6. Required for 'hegarty' method. neighbors : int, optional The number of surrounding IBIs with which to derive the criterion beat difference score; by default, 5. Required for 'cbd' method. tol : float, optional A configurable hyperparameter used to fine-tune the stringency of the criterion beat difference test; by default, 1. Required for 'cbd' method. Returns ------- artifacts_ix : array_like An array containing the indices of identified artifact beats. Notes ----- The source code for the criterion beat difference test is from work by Hoemann et al. (2020). References ---------- Berntson, G., Quigley, K., Jang, J., Boysen, S. (1990). An approach to artifact identification: Application to heart period data. Psychophysiology, 27(5), 586–598. Hegarty-Craver, M. et al. (2018). Automated respiratory sinus arrhythmia measurement: Demonstration using executive function assessment. Behavioral Research Methods, 50, 1816–1823. Hoemann, K. et al. (2020). Context-aware experience sampling reveals the scale of variation in affective experience. Scientific Reports, 10(1), 1–16. """ def identify_artifacts_hegarty(beats_ix, initial_hr = 'auto', prev_n = 6): """Identify locations of artifactual beats in cardiovascular data based on the approach by Hegarty-Craver et al. (2018).""" ibis = (np.diff(beats_ix) / self.fs) * 1000 beats = beats_ix[1:] # drop the first beat artifact_beats = [] valid_beats = [beats_ix[0]] # assume first beat is valid # Set the initial IBI to compare against if initial_hr == 'auto': successive_diff = np.abs(np.diff(ibis)) min_diff_ix = np.convolve( successive_diff, np.ones(6) / 6, mode = 'valid').argmin() first_ibi = ibis[min_diff_ix:min_diff_ix + 6].mean() else: first_ibi = 60000 / initial_hr for n in range(len(ibis)): current_ibi = ibis[n] current_beat = beats[n] # Check against an estimate of the first N IBIs if n < prev_n: if n == 0: ibi_estimate = first_ibi else: next_five = np.insert(ibis[:n], 0, first_ibi) ibi_estimate = np.median(next_five) # Check against an estimate of the preceding N IBIs else: ibi_estimate = np.median(ibis[n - (prev_n):n]) # Set the acceptable/valid range of IBIs low = (26 / 32) * ibi_estimate high = (44 / 32) * ibi_estimate if low <= current_ibi <= high: valid_beats.append(current_beat) else: artifact_beats.append(current_beat) return np.array(valid_beats), np.array(artifact_beats) def identify_artifacts_cbd(beats_ix, neighbors = 5, tol = 1): """Identify locations of abnormal interbeat intervals (IBIs) using the criterion beat difference test by Berntson et al. (1990).""" # Derive IBIs from beat indices ibis = ((np.ediff1d(beats_ix)) / self.fs) * 1000 # Compute consecutive absolute differences across IBIs ibi_diffs = np.abs(np.ediff1d(ibis)) # Initialize an array to store "bad" IBIs ibi_bad = np.zeros(shape = len(ibis)) artifact_beats = [] if len(ibi_diffs) < neighbors: neighbors = len(ibis) for ii in range(len(ibi_diffs)): # If there are not enough neighbors in the beginning if ii < int(neighbors / 2) + 1: select = np.concatenate( (ibi_diffs[:ii], ibi_diffs[(ii + 1):(neighbors + 1)])) select_ibi = np.concatenate( (ibis[:ii], ibis[(ii + 1):(neighbors + 1)])) # If there are not enough neighbors at the end elif (len(ibi_diffs) - ii) < (int(neighbors / 2) + 1) and ( len(ibi_diffs) - ii) > 1: select = np.concatenate( (ibi_diffs[-(neighbors - 1):ii], ibi_diffs[ii + 1:])) select_ibi = np.concatenate( (ibis[-(neighbors - 1):ii], ibis[ii + 1:])) # If there is only one neighbor left to check against elif len(ibi_diffs) - ii == 1: select = ibi_diffs[-(neighbors - 1):-1] select_ibi = ibis[-(neighbors - 1):-1] else: select = np.concatenate( (ibi_diffs[ii - int(neighbors / 2):ii], ibi_diffs[(ii + 1):(ii + 1 + int(neighbors / 2))])) select_ibi = np.concatenate( (ibis[ii - int(neighbors / 2):ii], ibis[(ii + 1):(ii + 1 + int(neighbors / 2))])) # Calculate the quartile deviation QD = self._quartile_deviation(select) # Calculate the maximum expected difference (MED) MED = 3.32 * QD # Calculate the minimal artifact difference (MAD) MAD = (np.median(select_ibi) - 2.9 * QD) / 3 # Calculate the criterion beat difference score criterion_beat_diff = (MED + MAD) / 2 # Find indices of IBIs that fail the CBD check if (ibi_diffs[ii]) > tol * criterion_beat_diff: bad_neighbors = int(neighbors * 0.25) if ii + (bad_neighbors - 1) < len(beats_ix): artifact_beats.append(beats_ix[ii:(ii + bad_neighbors)]) else: artifact_beats.append( beats_ix[ii:(ii + (bad_neighbors - 1))]) ibi_bad[ii + 1] = 1 artifact_beats = np.array(artifact_beats).flatten() return artifact_beats if method == 'hegarty': initial_hr = initial_hr if initial_hr is not None else 'auto' prev_n = prev_n if prev_n is not None else 6 _, artifacts_ix = identify_artifacts_hegarty( beats_ix, initial_hr, prev_n) elif method == 'cbd': neighbors = neighbors if neighbors is not None else 5 tol = tol if tol is not None else 1 artifacts_ix = identify_artifacts_cbd( beats_ix, neighbors, tol) elif method == 'both': initial_hr = initial_hr if initial_hr is not None else 'auto' prev_n = prev_n if prev_n is not None else 6 neighbors = neighbors if neighbors is not None else 5 tol = tol if tol is not None else 1 _, artifact_hegarty = identify_artifacts_hegarty( beats_ix, initial_hr, prev_n) artifact_cbd = identify_artifacts_cbd( beats_ix, neighbors, tol) artifacts_ix = np.union1d(artifact_hegarty, artifact_cbd) else: raise ValueError( 'Invalid method. Method must be \'hegarty\', \'cbd\', ' 'or \'both\'.') return artifacts_ix
[docs] def get_missing(self, data, beats_ix, seg_size = 60, min_hr = 40, ts_col = None, show_progress = True): """ Summarize the number and proportion of missing beats per segment. Parameters ---------- data : pandas.DataFrame The DataFrame containing the pre-processed ECG or PPG data. beats_ix : array-like An array containing the indices of detected beats. seg_size : int The size of the segment in seconds; by default, 60. min_hr : int, float The minimum acceptable heart rate against which the number of beats in the last partial segment will be compared; by default, 40. ts_col : str, optional The name of the column containing timestamps; by default, None. If a string value is given, the output will contain a timestamps column. show_progress : bool, optional Whether to display a progress by while the function runs; by default, True. Returns ------- missing : pandas.DataFrame A DataFrame with detected, expected, and missing numbers of beats per segment. """ seconds = self.get_seconds(data, beats_ix, ts_col, show_progress) seconds.index = seconds.index.astype(int) n_seg = ceil(len(seconds) / seg_size) segments = pd.Series(np.arange(1, n_seg + 1)) n_expected = ( seconds.groupby(seconds.index // seg_size)[ 'Mean HR'].median() * (seg_size / 60) ).fillna(0) n_detected = seconds.groupby( seconds.index // seg_size)['N Beats'].sum() n_missing = (n_expected - n_detected).clip(lower = 0) perc_missing = round((n_missing / n_expected) * 100, 2) # Handle last partial segment of data last_seg_len = len(seconds) % seg_size if last_seg_len > 0: last_detected = n_detected.iloc[-1] last_expected_ratio = min_hr / n_expected.iloc[:-1].median() last_expected = last_expected_ratio * last_seg_len if last_expected > last_detected: last_n_missing = last_expected - last_detected last_perc_missing = round( (last_n_missing / last_expected) * 100, 2) else: last_perc_missing = 0 last_n_missing = 0 n_expected.iloc[-1] = last_expected n_missing.iloc[-1] = last_n_missing perc_missing.iloc[-1] = last_perc_missing # Cast to int n_expected = n_expected.astype(int) n_missing = n_missing.astype(int) if ts_col is not None: timestamps = seconds.groupby( seconds.index // seg_size).first()['Timestamp'] missing = pd.concat([ segments, timestamps, n_detected, n_expected, n_missing, perc_missing, ], axis = 1) missing.columns = [ 'Segment', 'Timestamp', 'N Detected', 'N Expected', 'N Missing', '% Missing', ] else: missing = pd.concat([ segments, n_detected, n_expected, n_missing, perc_missing, ], axis = 1) missing.columns = [ 'Segment', 'N Detected', 'N Expected', 'N Missing', '% Missing', ] return missing
[docs] def get_seconds(self, data, beats_ix, ts_col = None, show_progress = True): """Get second-by-second HR, IBI, and beat counts from ECG or PPG data according to the approach by Graham (1978). Parameters ---------- data : pd.DataFrame The DataFrame containing the pre-processed ECG or PPG data. beats_ix : array-like An array containing the indices of detected beats. ts_col : str, optional The name of the column containing timestamps; by default, None. If a string value is given, the output will contain a timestamps column. show_progress : bool, optional Whether to display a progress bar while the function runs; by default, True. Returns ------- interval_data : pd.DataFrame A DataFrame containing second-by-second HR and IBI values. Notes ----- Rows with `NaN` values in the resulting DataFrame `interval_data` denote seconds during which no beats in the data were detected. References ---------- Graham, F. K. (1978). Constraints on measuring heart rate and period sequentially through real and cardiac time. Psychophysiology, 15(5), 492–495. """ df = data.copy() temp_beat = '_temp_beat' df.index = df.index.astype(int) df.loc[beats_ix, temp_beat] = 1 interval_data = [] # Iterate over each second s = 1 for i in tqdm(range(0, len(df), self.fs), disable = not show_progress): # Get data at the current second and evaluation window current_sec = df.iloc[i:(i + self.fs)] if i == 0: # Look at current and next second window = df.iloc[:(i + self.fs)] else: # Look at previous, current, and next second window = df.iloc[(i - self.fs):(min(i + self.fs, len(df)))] # Get mean IBI and HR values from the detected beats current_beats = current_sec[current_sec[temp_beat] == 1].index.values window_beats = window[window[temp_beat] == 1].index.values ibis = np.diff(window_beats) / self.fs * 1000 if len(ibis) == 0: mean_ibi = np.nan mean_hr = np.nan else: mean_ibi = np.mean(ibis) hrs = 60000 / ibis r_hrs = 1 / hrs mean_hr = 1 / np.mean(r_hrs) # Append values for the current second if ts_col is not None: interval_data.append({ 'Second': s, 'Timestamp': current_sec.iloc[0][ts_col], 'Mean HR': mean_hr, 'Mean IBI': mean_ibi, 'N Beats': len(current_beats) }) else: interval_data.append({ 'Second': s, 'Mean HR': mean_hr, 'Mean IBI': mean_ibi, 'N Beats': len(current_beats) }) s += 1 interval_data = pd.DataFrame(interval_data) return interval_data
[docs] def plot_missing(self, df, invalid_thresh = 30, title = None): """ Plot detected and missing beat counts. Parameters ---------- df : pandas.DataFrame() The DataFrame containing SQA metrics per segment. invalid_thresh : int, float The minimum number of beats detected for a segment to be considered valid; by default, 30. title : str, optional """ max_beats = ceil(df['N Detected'].max() / 10) * 10 nearest = ceil(max_beats / 2) * 2 dtick_value = nearest / 5 fig = go.Figure( data = [ go.Bar( x = df['Segment'], y = df['N Expected'], name = 'Missing', marker = dict(color = '#f2816d'), hovertemplate = '<b>Segment %{x}:</b> %{customdata:.0f} ' 'missing<extra></extra>'), go.Bar( x = df['Segment'], y = df['N Detected'], name = 'Detected', marker = dict(color = '#313c42'), hovertemplate = '<b>Segment %{x}:</b> %{y:.0f} ' 'detected<extra></extra>') ] ) fig.data[0].update(customdata = df['N Missing']) # Get invalid segment data points invalid_x = [] invalid_y = [] invalid_text = [] for segment_num, n_detected in zip(df['Segment'], df['N Detected']): if n_detected < invalid_thresh: invalid_x.append(segment_num) invalid_y.append( n_detected + 3) invalid_text.append('<b>!</b>') # Add scatter trace for invalid markers if invalid_x: fig.add_trace(go.Scatter( x = invalid_x, y = invalid_y, mode = 'text', text = invalid_text, textposition = 'top center', textfont = dict(size = 20, color = '#db0f0f'), showlegend = False, hoverinfo = 'skip' # disable tooltips )) if invalid_x: fig.add_annotation( text = '<span style="color: #db0f0f"><b>!</b></span> ' 'Invalid Number of Beats ', align = 'right', showarrow = False, xref = 'paper', yref = 'paper', x = 1, y = 1.3) fig.update_layout( xaxis_title = 'Segment Number', xaxis = dict( tickmode = 'linear', dtick = 1, range = [df['Segment'].min() - 0.5, df['Segment'].max() + 0.5]), yaxis = dict( title = 'Number of Beats', range = [0, max_beats], dtick = dtick_value), legend = dict( orientation = 'h', yanchor = 'bottom', y = 1.0, xanchor = 'right', x = 1.0), font = dict(family = 'Poppins', size = 13), height = 289, margin = dict(t = 70, r = 20, l = 40, b = 65), barmode = 'overlay', template = 'simple_white', ) if title is not None: fig.update_layout( title = title ) return fig
[docs] def plot_artifact(self, df, invalid_thresh = 30, title = None): """ Plot detected and artifact beat counts. Parameters ---------- df : pandas.DataFrame() The DataFrame containing SQA metrics per segment. invalid_thresh : int, float The minimum number of beats detected for a segment to be considered valid; by default, 30. title : str, optional """ max_beats = ceil(df['N Detected'].max() / 10) * 10 nearest = ceil(max_beats / 2) * 2 dtick_value = nearest / 5 fig = go.Figure( data = [ go.Bar( x = df['Segment'], y = df['N Detected'], name = 'Detected', marker = dict(color = '#313c42'), hovertemplate = '<b>Segment %{x}:</b> %{y:.0f} ' 'detected<extra></extra>'), go.Bar( x = df['Segment'], y = df['N Artifact'], name = 'Artifact', marker = dict(color = '#f2b463'), hovertemplate = '<b>Segment %{x}:</b> %{y:.0f} ' 'artifact<extra></extra>') ], ) # Get invalid segment data points invalid_x = [] invalid_y = [] invalid_text = [] for segment_num, n_detected in zip(df['Segment'], df['N Detected']): if n_detected < invalid_thresh: invalid_x.append(segment_num) invalid_y.append( n_detected + 3) invalid_text.append('<b>!</b>') # Add scatter trace for invalid markers if invalid_x: fig.add_trace(go.Scatter( x = invalid_x, y = invalid_y, mode = 'text', text = invalid_text, textposition = 'top center', textfont = dict(size = 20, color = '#db0f0f'), showlegend = False, hoverinfo = 'skip' # disable tooltips )) if invalid_x: fig.add_annotation( text = '<span style="color: #db0f0f"><b>!</b></span> ' 'Invalid Number of Beats ', align = 'right', showarrow = False, xref = 'paper', yref = 'paper', x = 1, y = 1.3) fig.update_layout( xaxis_title = 'Segment Number', xaxis = dict( tickmode = 'linear', dtick = 1, range = [df['Segment'].min() - 0.5, df['Segment'].max() + 0.5]), yaxis = dict( title = 'Number of Beats', range = [0, max_beats], dtick = dtick_value), legend = dict( orientation = 'h', yanchor = 'bottom', y = 1.0, xanchor = 'right', x = 1.0, traceorder = 'reversed'), font = dict(family = 'Poppins', size = 13), height = 289, margin = dict(t = 70, r = 20, l = 40, b = 65), barmode = 'overlay', template = 'simple_white', ) if title is not None: fig.update_layout( title = title ) return fig
def _get_iqr(self, data): """Compute the interquartile range of a data array.""" q75, q25 = np.percentile(data, [75, 25]) iqr = q75 - q25 return iqr def _quartile_deviation(self, data): """Compute the quartile deviation in the criterion beat difference test.""" iqr = self._get_iqr(data) QD = iqr * 0.5 return QD
# =================================== EDA ====================================
[docs] class EDA: """ A class for signal quality assessment on electrodermal activity (EDA) data. Parameters/Attributes --------------------- fs : int The sampling rate of the EDA data. eda_min : float, optional The minimum acceptable value for EDA data in microsiemens; by default, 0.05 uS. eda_max : float, optional The maximum acceptable value for EDA data in microsiemens; by default, 60 uS. eda_max_slope : float, optional The maximum slope of EDA data in microsiemens per second; by default, 5 uS/sec. temp_min : float, optional The minimum acceptable temperature in degrees Celsius; by default, 20. temp_max : float, optional The maximum acceptable temperature in degrees Celsius; by default, 40. invalid_spread_dur : float, optional The transition radius for artifacts in seconds; by default, 2. """ def __init__(self, fs, eda_min = 0.2, eda_max = 40, eda_max_slope = 5, temp_min = 20, temp_max = 40, invalid_spread_dur = 2): """ Initialize the EDA object. Parameters ---------- fs : int The sampling rate of the ECG or PPG recording. eda_min : float, optional The minimum acceptable value for EDA data in microsiemens; by default, 0.05 uS. eda_max : float, optional The maximum acceptable value for EDA data in microsiemens; by default, 60 uS. eda_max_slope : float, optional The maximum slope of EDA data in microsiemens per second; by default, 5 uS/sec. temp_min : float, optional The minimum acceptable temperature in degrees Celsius; by default, 20. temp_max : float, optional The maximum acceptable temperature in degrees Celsius; by default, 40. invalid_spread_dur : float, optional The transition radius for artifacts in seconds; by default, 2. """ self.fs = fs self.eda_min = eda_min self.eda_max = eda_max self.eda_max_slope = eda_max_slope self.temp_min = temp_min self.temp_max = temp_max self.invalid_spread_dur = invalid_spread_dur
[docs] def compute_metrics(self, signal, temp, timestamps = None, preprocessed = True, seg_size = 60, rolling_window = None, rolling_step = None): """ Summarize the number and proportion of valid and invalid data points in an electrodermal activity (EDA) signal per segment or across sliding windows. Parameters ---------- signal : array_like An array containing the EDA signal in microsiemens. temp : array_like, optional An array containing temperature data in Celsius; by default, None. timestamps : array_like, optional An array containing timestamps corresponding to the EDA data points; by default, None. preprocessed : boolean, optional Whether filtered EDA data is being inputted; by default, True. seg_size : int The segment size in seconds; by default, 60. rolling_window : int, optional The size, in seconds, of the sliding window across which to compute the EDA SQA metrics; by default, None. rolling_step : int, optional The step size, in seconds, of the sliding windows; by default, 15. Returns ------- metrics : pandas.DataFrame A DataFrame containing quality assessment metrics per segment. Notes ----- If a value is given in the `rolling_window` parameter, the rolling window approach will override the segmented approach, ignoring any `seg_size` value. See Also -------- SQA.EDA.assess_eda_quality : Identify locations of invalid and valid EDA data points. """ eda_min = self.eda_min eda_max = self.eda_max eda_max_slope = self.eda_max_slope temp_min = self.temp_min temp_max = self.temp_max invalid_spread_dur = self.invalid_spread_dur if seg_size < 0 or \ (rolling_window is not None and rolling_window < 0): raise ValueError('Window size must be set to a positive integer.') metrics = pd.DataFrame() seg_name = 'Moving Window' if rolling_window is not None else 'Segment' # Assess EDA quality edaqa = self.assess_eda_quality( signal, temp, preprocessed, seg_size, rolling_window, rolling_step) # Summarize EDA QA results for segment, qa in edaqa.items(): metrics = pd.concat([metrics, pd.DataFrame.from_records([{ seg_name: segment, 'N Valid': len(qa['valid']), '% Valid': round( (len(qa['valid']) / qa['length']) * 100, 2), 'N Invalid': len(qa['invalid']), '% Invalid': round( (len(qa['invalid']) / qa['length']) * 100, 2) }])], ignore_index = True) if timestamps is not None: if rolling_window is not None: metrics.insert(1, 'Timestamp', np.array( timestamps[::int(rolling_step * self.fs)])) else: metrics.insert(1, 'Timestamp', np.array( timestamps[::int(seg_size * self.fs)])) return metrics
[docs] def assess_eda_quality(self, signal, temp = None, preprocessed = True, seg_size = 60, rolling_window = None, rolling_step = 15): """ Identifies valid and invalid data points in electrodermal activity using the automated quality assessment procedure by Kleckner et al. (2017) by segment or across sliding windows. Parameters ---------- signal : array_like An array containing the EDA signal in microsiemens. temp : array_like An array containing temperature data in Celsius; by default, None. preprocessed : boolean, optional Whether filtered EDA data is being inputted; by default, True. seg_size : int The segment size in seconds; by default, 60. rolling_window : int, optional The size, in seconds, of the sliding window across which to compute the EDA SQA metrics; by default, None. rolling_step : int, optional The step size, in seconds, of the sliding windows; by default, 15. Returns ------- edaqa : dict A dictionary containing key-value pairs of segment or rolling window numbers and their corresponding dictionaries of valid and invalid EDA indices and lengths. Keys of nested dictionaries are 'valid', 'invalid', and 'length'. References ---------- Kleckner, I. R., Jones, R. M., Wilder-Smith, O., Wormwood, J. B., Akcakaya, M., Quigley, K. S., ... & Goodwin, M. S. (2017). Simple, transparent, and flexible automated quality assessment procedures for ambulatory electrodermal activity data. IEEE Transactions on Biomedical Engineering, 65(7), 1460-1467. """ eda_min = self.eda_min eda_max = self.eda_max eda_max_slope = self.eda_max_slope temp_min = self.temp_min temp_max = self.temp_max invalid_spread_dur = self.invalid_spread_dur # Check inputs if eda_min >= eda_max: raise ValueError("`eda_min` must be smaller than `eda_max`.") if temp_min >= temp_max: raise ValueError("`temp_min` must be smaller than `temp_max`.") # Get the sampling interval sampling_interval = 1 / self.fs # Filter data based on the methods in Kleckner et al. (2017) if not preprocessed: try: window = int(2 * self.fs) b = np.ones(window) / window signal = np.convolve(signal, b, mode = 'same') signal = self._filter_data(signal, window = 2) except ValueError: pass # Filter temperature data if temp is not None: window = int(2 * self.fs) b = np.ones(window) / window temp = np.convolve(temp, b, mode = 'same') temp = self._filter_data(temp, window = 2) def _edaqa(signal): """Evaluate the input signal against the automated EDA QA rules in Kleckner et al. (2017).""" nonlocal eda_min, eda_max, eda_max_slope, temp, temp_min, \ temp_max, sampling_interval slopes = np.concatenate([[0], np.diff(signal) / sampling_interval]) if temp is not None: # Handle unequal lengths of EDA and temp arrays if len(signal) != len(temp): temp = self._equalize_temp(signal, temp) invalid_checks = ( (signal < eda_min) | (signal > eda_max) | # Rule 1 (np.abs(slopes) > eda_max_slope) | # Rule 2 (temp < temp_min) | (temp > temp_max) # Rule 3 ) else: invalid_checks = ( (signal < eda_min) | (signal > eda_max) | # Rule 1 (np.abs(slopes) > eda_max_slope) # Rule 2 ) # Determine number of data points to spread for Rule 4 invalid_spread_length = int( invalid_spread_dur / sampling_interval) # Rule #4 invalid_data = np.zeros_like(signal, dtype = bool) for d in range(len(invalid_checks)): if invalid_checks[d]: start_idx = max(d - invalid_spread_length + 1, 0) end_idx = min(d + invalid_spread_length, len(invalid_checks)) invalid_data[start_idx:end_idx] = True valid_ix = np.where(~invalid_data)[0] invalid_ix = np.where(invalid_data)[0] return valid_ix, invalid_ix # Quality assessment of EDA data edaqa = {} if rolling_window is not None: w = 1 for n in range(0, len(signal), rolling_step): window = np.array(signal[n:n + int(self.fs * rolling_window)]) valid_ix, invalid_ix = _edaqa(window) edaqa[w] = {'valid': valid_ix, 'invalid': invalid_ix, 'length': len(window)} w += 1 else: s = 1 for n in range(0, len(signal), int(self.fs * seg_size)): segment = np.array(signal[n:n + int(self.fs * seg_size)]) valid_ix, invalid_ix = _edaqa(segment) edaqa[s] = {'valid': valid_ix, 'invalid': invalid_ix, 'length': len(segment)} s += 1 return edaqa
[docs] def plot_edaqa(self, metrics, title = None): """ Plot percentages of valid and invalid EDA data. Parameters ---------- metrics : pandas.DataFrame() The DataFrame outputted from `SQA.EDA.compute_metrics()` that contains EDA QA metrics per segment or sliding window. title : str, optional Returns ------- fig : plotly.graph_objects.Figure A figure containing a bar chart of percentages of invalid and valid EDA data points by segment or sliding window. See Also -------- SQA.EDA.compute_metrics : Summarize EDA QA metrics by segment or sliding window. """ def colormap(value): if value <= 25: return '#139253' # green elif (value > 25) & (value <= 90): return '#f2ac42' # yellow else: return '#f25847' # red colors = [colormap(value) for value in metrics['% Invalid']] fig = go.Figure( data = [ go.Bar( x = metrics['Segment'], y = [100] * len(metrics), # Height of 100% for each bar opacity = 0.3, # Set opacity to make them light grey hoverinfo = 'none', showlegend = False, marker = dict(color = 'lightgrey')), go.Bar( x = metrics['Segment'], y = metrics['% Invalid'], name = 'Invalid', marker = dict(color = colors), showlegend = False, hovertemplate = '<b>Segment %{x}:</b> %{y:.1f}% ' 'invalid<extra></extra>') ] ) fig.update_layout( xaxis_title = 'Segment Number', xaxis = dict(tickmode = 'linear', dtick = 1), yaxis = dict( title = '% Invalid', range = [0, 100], dtick = 20), font = dict(family = 'Poppins', size = 13), height = 289, margin = dict(t = 70, r = 20, l = 40, b = 65), barmode = 'overlay', template = 'simple_white', ) if title is not None: fig.update_layout( title = title ) return fig
def _equalize_temp(self, eda, temp): """Interpolate or truncate data in the temperature array to match the length of the EDA data array.""" eda_ix = np.arange(len(eda)) temp_ix = np.arange(len(temp)) if len(temp) < len(eda): interp_func = interp1d(temp_ix, temp, kind = 'linear', fill_value = 'extrapolate') temp = interp_func(eda_ix) if len(temp) > len(eda): temp = temp[:len(eda)] return temp def _filter_data(self, data, window): """Filter EDA and temperature data based on the approach in Kleckner et al. (2017).""" window = int(window * self.fs) b = np.ones(window) / window filtered = np.convolve(data, b, mode = 'same') return filtered