Source code for heartview.heartview

from zipfile import ZipFile, ZipExtFile
from tqdm import tqdm
from scipy.signal import resample as scipy_resample
from plotly.subplots import make_subplots
from heartview.pipeline.ACC import compute_magnitude
import warnings
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import datetime as dt
import pyedflib

[docs] class Actiwave: """ A class for convenient pre-processing of data from the Actiwave Cardio device. Parameters/Attributes --------------------- file : str The path of the Actiwave Cardio device file saved in European Data Format (.edf). """ def __init__(self, file): """ Initialize the Actiwave object. Parameters ---------- file : str The path of the Actiwave Cardio device file saved in European Data Format (.edf). """ if not file.endswith(('.edf', '.EDF')): raise ValueError( 'Invalid file path. The `file` parameter must take a string ' 'value ending in \'.EDF\' or \'.edf\'.') else: self.file = file
[docs] def preprocess(self, time_aligned = False): """ Pre-process electrocardiograph (ECG) and acceleration data from an Actiwave Cardio file. Parameters ---------- time_aligned : bool, optional Whether to time-align ECG and acceleration data based on the sampling rate of the ECG data; by default, False. Returns ------- tuple or pandas.DataFrame If `time_aligned` is False, returns a tuple (`ecg`, `acc`), where `ecg` is a DataFrame containing the pre-processed ECG data and `acc` is a DataFrame containing the pre-processed X-, Y-, and Z-axis acceleration data. If `time_aligned` is True, returns a single DataFrame containing time-synced ECG and acceleration data according to the ECG data's timestamps. """ f = pyedflib.EdfReader(self.file) start = dt.datetime.timestamp(f.getStartdatetime()) end = start + f.getFileDuration() ecg, acc = pd.DataFrame(), pd.DataFrame() signal_labels = f.getSignalLabels() ecg_chn = [i for i in range(len(signal_labels)) if 'ECG' in signal_labels[i]] acc_chn = [i for i in range(len(signal_labels)) if 'X' in signal_labels[i] or 'Y' in signal_labels[i] or 'Z' in signal_labels[i]] acc_sig = dict(zip(['X', 'Y', 'Z'], acc_chn)) ecg_fs = f.getSampleFrequency(ecg_chn[0]) acc_fs = f.getSampleFrequency(acc_chn[0]) # Get ECG data ecg['Timestamp'] = np.arange(start, end, (1 / ecg_fs)) ecg['ECG'] = pd.Series(f.readSignal(ecg_chn[0]) / 1000) ecg['Timestamp'] = ecg['Timestamp'].apply( lambda t: dt.datetime.utcfromtimestamp(t)) # Get ACC data acc['Timestamp'] = np.arange(start, end, (1 / acc_fs)) for k, v in acc_sig.items(): acc[k] = pd.Series(f.readSignal(v)) acc['Magnitude'] = np.sqrt(acc[['X', 'Y', 'Z']].apply( lambda x: x ** 2).sum(axis = 1)) acc['Timestamp'] = acc['Timestamp'].apply( lambda t: dt.datetime.utcfromtimestamp(t)) f.close() if time_aligned: resampled = pd.DataFrame() for col in ['X', 'Y', 'Z']: rs = scipy_resample(acc[col], len(ecg)) resampled = pd.concat( [resampled, pd.Series(rs, name = col)], axis = 1) preprocessed = pd.concat([ecg, resampled], axis = 1) return preprocessed else: return ecg, acc
[docs] def get_ecg_fs(self): """ Get the sampling rate of ECG data from an Actiwave Cardio device. Returns ------- fs : int, float The sampling rate of the ECG recording. """ f = pyedflib.EdfReader(self.file) signal_labels = f.getSignalLabels() for chn in range(len(signal_labels)): if 'ECG' in signal_labels[chn]: ecg_chn = chn try: fs = f.getSampleFrequency(ecg_chn) return fs except NameError: raise NameError('No ECG channel found.') finally: f.close()
[docs] def get_acc_fs(self): """ Get the sampling rate of accelerometer data from an Actiwave Cardio device. Returns ------- fs : int, float The sampling rate of the accelerometer recording. """ f = pyedflib.EdfReader(self.file) signal_labels = f.getSignalLabels() for chn in range(len(signal_labels)): if 'X' in signal_labels[chn]: acc_chn = chn try: fs = f.getSampleFrequency(acc_chn) return fs except NameError: raise NameError('No ACC channels found.') finally: f.close()
# ==================== Empatica E4 Pre-Processing and SQA ====================
[docs] class Empatica: """ A class to conveniently pre-process and assess quality of PPG and EDA data from Empatica E4 devices. Attributes ---------- file : str The path of the Empatica archive file with a '.zip' extension. """
[docs] class Data: """A class to store pre-processed data variables.""" def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value)
def __init__(self, file): """ Initialize the Empatica object. Parameters ---------- file : str The path of the Empatica archive file with a '.zip' extension. """ if not file.endswith(('.zip', '.ZIP')): raise ValueError( 'Invalid file path. The `file` parameter must take a string ' 'value ending in \'.zip\' or \'.ZIP\'.') else: self.file = file
[docs] def preprocess(self, time_aligned = False): """ Pre-process all data from the Empatica E4. Parameters ---------- time_aligned : bool, optional Whether to time-align all data based on the signal with the highest sampling rate (i.e. blood volume pulse); by default, False. Returns ------- data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding pre-processed data: If `time_aligned` is False: acc : pandas.DataFrame A DataFrame containing the pre-processed ACC data with corresponding timestamps. bvp : pandas.DataFrame A DataFrame containing the pre-processed BVP data with corresponding timestamps. eda : pandas.DataFrame A DataFrame containing the pre-processed EDA data with corresponding timestamps. hr : pandas.DataFrame A DataFrame containing the pre-processed HR data with corresponding timestamps. ibi : pandas.DataFrame A DataFrame containing the pre-processed IBI data with corresponding timestamps and seconds elapsed since the start time of the IBI recording. temp : pandas.DataFrame A DataFrame containing the pre-processed temperature data with corresponding timestamps. start_time : float The Unix-formatted start time of the E4 recording. bvp_fs : float The sampling rate of the BVP recording. eda_fs : float The sampling rate of the EDA recording. If `time_aligned` is True: hrv : pandas.DataFrame A DataFrame containing time-synced BVP, HR, IBI, and acceleration data. eda : pandas.DataFrame A DataFrame containing time-synced EDA, temperature, and acceleration data. start_time : float The Unix-formatted start time of the E4 recording. bvp_fs : float The sampling rate of the BVP recording. eda_fs : float The sampling rate of the EDA recording. Examples -------- >>> from heartview import heartview >>> e4_archive = 'Sample_E4_Data.zip' >>> E4 = heartview.Empatica(e4_archive) >>> ALL_E4_DATA = E4.preprocess() """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() for file in e4_files: if 'ACC' in file: with archive.open(file) as acc_file: acc_data = self.get_acc().acc if 'BVP' in file: with archive.open(file) as bvp_file: bvp_data = self.get_bvp().bvp start_time = self.get_bvp().start bvp_fs = self.get_bvp().fs if 'EDA' in file: with archive.open(file) as eda_file: eda_data = self.get_eda().eda start_time = self.get_eda().start eda_fs = self.get_eda().fs if 'HR' in file: with archive.open(file) as hr_file: hr_data = self.get_hr().hr if 'IBI' in file: with archive.open(file) as ibi_file: ibi_data = self.get_ibi().ibi if 'TEMP' in file: with archive.open(file) as temp_file: temp_data = self.get_temp().temp if time_aligned: # Merge IBI and HR values into BVP data frame full_hrv = pd.merge_asof( bvp_data, ibi_data.drop(['Seconds'], axis = 1), on = 'Timestamp', direction = 'nearest') full_hrv = pd.merge_asof( full_hrv, hr_data, on = 'Timestamp', direction = 'nearest') bvp_ts = bvp_data['Timestamp'].values ibi_ts = ibi_data['Timestamp'].values hr_ts = hr_data['Timestamp'].values ibi_insertion_points = np.searchsorted(bvp_ts, ibi_ts) - 1 hr_insertion_points = np.searchsorted(bvp_ts, hr_ts) full_hrv.loc[~np.isin(np.arange(len(full_hrv)), ibi_insertion_points), 'IBI'] = np.nan full_hrv.loc[~np.isin(np.arange(len(full_hrv)), hr_insertion_points), 'HR'] = np.nan # Resample acceleration data to match BVP and EDA sampling rates acc_rs = pd.DataFrame() acc_cols = ['X', 'Y', 'Z', 'Magnitude'] for ref_data in [bvp_data, eda_data]: acc_rs[acc_cols] = acc_data[acc_cols].apply( lambda a: scipy_resample(a, len(ref_data))) if ref_data is bvp_data: full_hrv = pd.merge(full_hrv, acc_rs, left_index = True, right_index = True) else: full_eda = pd.merge(eda_data, temp_data, on = 'Timestamp', how = 'inner') full_eda = pd.merge(full_eda, acc_rs, left_index = True, right_index = True) data = self.Data(**{'hrv': full_hrv, 'eda': full_eda, 'start': start_time, 'bvp_fs': bvp_fs, 'eda_fs': eda_fs}) else: data = self.Data(**{'acc': acc_data, 'bvp': bvp_data, 'eda': eda_data, 'hr': hr_data, 'ibi': ibi_data, 'temp': temp_data, 'start': start_time, 'bvp_fs': bvp_fs, 'eda_fs': eda_fs}) return data
[docs] def get_acc(self): """ Get the pre-processed acceleration data and its start time and sampling rate from the Empatica E4. Returns ------- acc_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding accelerometer data variables: acc : pandas.DataFrame A DataFrame containing the pre-processed BVP data with corresponding timestamps. start : float The Unix-formatted start time of the BVP recording. fs : int The sampling rate of the BVP data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() acc_file = None for file in e4_files: if 'ACC' in file: acc_file = file break if acc_file is None: raise ValueError('No "ACC.csv" file found.') with archive.open(file) as acc_file: acc, acc_start, acc_fs = self._get_e4_data( acc_file, name = ['X', 'Y', 'Z']) acc = acc.apply(lambda x: (x / 64) * 9.81 if x.name != 'Timestamp' else x) acc['Magnitude'] = compute_magnitude( acc['X'], acc['Y'], acc['Z']) acc_data = self.Data(**{'acc': acc, 'start': acc_start, 'fs': acc_fs}) return acc_data
[docs] def get_bvp(self): """ Get the raw blood volume pulse (BVP) data and its start time and sampling rate from the Empatica E4. Returns ------- bvp_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding BVP data variables: bvp : pandas.DataFrame A DataFrame containing the pre-processed BVP data with corresponding timestamps. start : float The Unix-formatted start time of the BVP recording. fs : int The sampling rate of the BVP data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() bvp_file = None for file in e4_files: if 'BVP' in file: bvp_file = file break if bvp_file is None: raise ValueError('No "BVP.csv" file found.') with archive.open(bvp_file) as bvp_file: bvp, bvp_start, bvp_fs = self._get_e4_data( bvp_file, name = 'BVP') bvp_data = self.Data(**{'bvp': bvp, 'start': bvp_start, 'fs': bvp_fs}) return bvp_data
[docs] def get_eda(self): """ Get the raw electrodermal activity (EDA) data and its recording start time and sampling rate from the Empatica E4. Returns ------- eda_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding EDA data variables: eda : pandas.DataFrame A DataFrame containing the pre-processed EDA data with corresponding timestamps. start : float The Unix-formatted start time of the EDA recording. fs : int The sampling rate of the EDA data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() eda_file = None for file in e4_files: if 'EDA' in file: eda_file = file break if eda_file is None: raise ValueError('No "EDA.csv" file found.') with archive.open(eda_file) as eda_file: eda, eda_start, eda_fs = self._get_e4_data( eda_file, name = 'EDA') eda_data = self.Data(**{'eda': eda, 'start': eda_start, 'fs': eda_fs}) return eda_data
[docs] def get_hr(self): """ Get the pre-processed heart rate (HR) data, start time of the first HR measurement, and sampling rate from the Empatica E4. Returns ------- hr_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding HR data variables: hr : pandas.DataFrame A DataFrame containing the pre-processed HR data with corresponding timestamps. start : float The Unix-formatted start time of the HR measurements. fs : int The sampling rate of the BVP data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() hr_file = None for file in e4_files: if 'HR' in file: hr_file = file break if hr_file is None: raise ValueError('No "HR.csv" file found.') with archive.open(file) as hr_file: hr, hr_start, hr_fs = self._get_e4_data( hr_file, name = 'HR') hr_data = self.Data(**{'hr': hr, 'start': hr_start, 'fs': hr_fs}) return hr_data
[docs] def get_ibi(self): """ Get the pre-processed interbeat interval (IBI) data and the start time of the first interval from the Empatica E4. Returns ------- ibi_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding IBI data variables: ibi : pandas.DataFrame A DataFrame containing the pre-processed IBI data with corresponding timestamps. start : int The Unix-formatted start time of the IBI data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() ibi_file = None for file in e4_files: if 'IBI' in file: ibi_file = file break if ibi_file is None: raise ValueError('No "IBI.csv" file found.') with archive.open(file) as ibi_file: ibi = pd.read_csv(ibi_file, header = 0, names = ['Seconds', 'IBI']) ibi_file.seek(0) ibi_start = self._get_e4_start_time(ibi_file) ibi['IBI'] *= 1000 ibi.insert( 0, 'Timestamp', (ibi['Seconds'] + ibi_start).apply( lambda t: dt.datetime.utcfromtimestamp(t))) ibi_data = self.Data(**{'ibi': ibi, 'start': ibi_start}) return ibi_data
[docs] def get_temp(self): """ Get the raw skin temperature data and its recording start time and sampling rate from the Empatica E4. Returns ------- temp_data : Empatica.Data object An `Empatica.Data` object with the following attributes and corresponding temperature data variables: temp : pandas.DataFrame A DataFrame containing the pre-processed temperature data with corresponding timestamps. start : float The Unix-formatted start time of the temperature recording. fs : int The sampling rate of the temperature data. """ with ZipFile(self.file, 'r') as archive: e4_files = archive.namelist() temp_file = None for file in e4_files: if 'TEMP' in file: temp_file = file break if temp_file is None: raise ValueError('No "TEMP.csv" file found.') with archive.open(temp_file) as temp_file: temp, temp_start, temp_fs = self._get_e4_data( temp_file, name = 'Temp') temp_data = self.Data(**{'temp': temp, 'start': temp_start, 'fs': temp_fs}) return temp_data
[docs] def get_e4_beats(self, bvp_data, ibi_data, start_time, show_progress = True): """ Get locations of beats from Empatica E4 interbeat interval (IBI) data relative to its blood volume pulse (BVP) data. Parameters ---------- bvp_data : pandas.DataFrame A DataFrame containing the Empatica E4 BVP data, outputted from `Empatica.preprocess()`. ibi_data : pandas.DataFrame A DataFrame containing the Empatica E4 IBI data, outputted from `Empatica.preprocess()`. start_time : int The Unix timestamp of the recording start time. show_progress : bool, optional Whether to display a progress bar while the function runs; by default, True. Returns ------- e4_beats : list A list containing the indices of beats extracted from IBI data of the Empatica E4. """ ibi = ibi_data.copy() bvp = bvp_data.copy() ibi['Unix'] = ibi['Seconds'] + start_time ibi['Timestamp'] = ibi['Unix'].apply( lambda t: dt.datetime.utcfromtimestamp(t)) bvp['Timestamp'] = pd.to_datetime(bvp['Timestamp']) e4_beats = [] for t in tqdm(ibi['Timestamp'], disable = not show_progress): time_diff = np.abs(bvp['Timestamp'] - t) closest_ix = time_diff.idxmin() e4_beats.append(closest_ix) return e4_beats
[docs] def plot_signals(self, segment = 1, seg_size = 60, interactive = True): """ Display a plot of a segment of signals recorded with the Empatica E4 device. Parameters ---------- segment : int, optional The number of the position of the segment to plot; by default, 1. seg_size : int, optional The segment size in seconds; by default, 60. interactive : bool, optional Whether to plot an interactive visualization; by default, True. Returns ------- fig : plotly.graph_objects.Figure or None If `interactive` is True, displays and returns an interactive Plotly figure containing the plotted signals. If `interactive` is False, displays a static figure and returns None. """ data = self.preprocess() dtypes = ('acc', 'bvp', 'eda', 'temp') if interactive: fig = make_subplots( rows = 4, cols = 1, shared_xaxes = True, vertical_spacing = 0.02, row_heights = [0.2, 0.3, 0.3, 0.2]) for n in range(len(dtypes)): if dtypes[n] in ('acc', 'bvp'): fs = data.bvp_fs seg_start = int((segment - 1) * fs * seg_size) seg_end = seg_start + int(fs * seg_size) signal_name = 'ACC' if dtypes[n] == 'acc' else 'BVP' color = 'forestgreen' if dtypes[n] == 'acc' else '#3562bd' ylabel = 'm/s²' if dtypes[n] == 'acc' else '' if dtypes[n] == 'acc': x = data.acc['Timestamp'].iloc[seg_start:seg_end] y = data.acc['Magnitude'].iloc[seg_start:seg_end] else: x = data.bvp['Timestamp'].iloc[seg_start:seg_end] y = data.bvp['BVP'].iloc[seg_start:seg_end] else: fs = data.eda_fs seg_start = int((segment - 1) * fs * seg_size) seg_end = seg_start + int(fs * seg_size) signal_name = 'EDA' if dtypes[n] == 'eda' else 'Temperature' color = '#43c9de' if dtypes[n] == 'eda' else '#8b3ac9' ylabel = 'uS' if dtypes[n] == 'eda' else '°C' if dtypes[n] == 'eda': x = data.eda['Timestamp'].iloc[seg_start:seg_end] y = data.eda['EDA'].iloc[seg_start:seg_end] else: x = data.temp['Timestamp'].iloc[seg_start:seg_end] y = data.temp['Temp'].iloc[seg_start:seg_end] fig.add_trace( go.Scatter( x = x, y = y, name = signal_name, line = dict(color = color, width = 1.5), hovertemplate = f'<b>{signal_name}</b>: %{{y:.2f}} ' f'{ylabel}<extra></extra>'), row = n+1, col = 1) fig.update_yaxes( title_text = ylabel, row = 1, col = 1, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') fig.show() return fig else: fig, axs = plt.subplots(4, 1, figsize = (10, 8)) for n in range(len(dtypes)): fs = data.eda_fs seg_start = int((segment - 1) * fs * seg_size) seg_end = seg_start + int(fs * seg_size) if dtypes[n] in ('acc', 'bvp'): signal_name = 'ACC' if dtypes[n] == 'acc' else 'BVP' color = 'forestgreen' if dtypes[n] == 'acc' else '#3562bd' ylabel = 'm/s²' if dtypes[n] == 'acc' else 'BVP' if dtypes[n] == 'acc': x = data.acc['Timestamp'].iloc[seg_start:seg_end] y = data.acc['Magnitude'].iloc[seg_start:seg_end] else: x = data.bvp['Timestamp'].iloc[seg_start:seg_end] y = data.bvp['BVP'].iloc[seg_start:seg_end] else: signal_name = 'EDA' if dtypes[n] == 'eda' else 'Temperature' color = '#43c9de' if dtypes[n] == 'eda' else '#8b3ac9' ylabel = 'uS' if dtypes[n] == 'eda' else '°C' if dtypes[n] == 'eda': x = data.eda['Timestamp'].iloc[seg_start:seg_end] y = data.eda['EDA'].iloc[seg_start:seg_end] else: x = data.temp['Timestamp'].iloc[seg_start:seg_end] y = data.temp['Temp'].iloc[seg_start:seg_end] for ax in axs: ax.plot(x, y, label = signal_name, color = color, lw = 1.2) ax.set_xlabel('Timestamp') ax.set_ylabel(ylabel) ax.legend(frameon = False) plt.tight_layout() plt.show() return fig, axs
def _get_e4_data(self, file, name): """Extract data from an Empatica E4 file.""" if not isinstance(name, list) and not isinstance(name, str): raise ValueError('The `name` parameter must take either a string ' 'or a list of strings.') else: if isinstance(name, list): col_name = name else: col_name = [name] data = pd.read_csv(file, header = 1, names = col_name) if isinstance(file, str): fs = self._get_e4_fs(file) start_time = self._get_e4_start_time(file) else: if hasattr(file, 'seek'): file.seek(0) fs = self._get_e4_fs(file) file.seek(0) start_time = self._get_e4_start_time(file) timestamps = pd.date_range( start = pd.to_datetime(start_time, unit = 's'), periods = len(data), freq = f'{1 / fs}S') timestamps = pd.Series(timestamps, name = 'Timestamp') data = pd.merge(timestamps, data, left_index = True, right_index = True) return data, start_time, fs def _get_e4_fs(self, file): """Get the sampling rate from an Empatica E4 file.""" contents = pd.read_csv(file, header = None, nrows = 2, usecols = [0]) fs = contents.iloc[1].item() return fs def _get_e4_start_time(self, file): """Get the Unix-formatted start time of an Empatica E4 recording.""" contents = pd.read_csv(file, header = None, nrows = 2, usecols = [0]) if type(file) is ZipExtFile: if 'IBI' in file.name: start = contents.loc[0, 0] else: start = contents.iloc[0].item() else: if file.endswith('IBI.csv'): start = contents.loc[0, 0] else: start = contents.iloc[0].item() return start
# ======================== Other Data Pre-Processing ========================= def get_duration(data, fs, unit = 'sec'): """ Get the duration of a signal. Parameters ---------- data : array_like An array or DataFrame containing the signal. fs : int The sampling rate of the data. unit : str The unit in which the duration should be calculated; by default, in seconds (`sec`). Returns ------- dur : float The duration of the signal. """ dur = len(data) / fs if unit not in ['sec', 's', 'min', 'm', 'hour', 'h']: raise ValueError('The `unit` parameter must take \'sec\', \'min\', ' 'or \'hour\'.') else: if unit in ('min', 'm'): return round((dur / 60), 2) if unit == ('hour', 'h'): return round(((dur / 60) / 60), 2) return round(dur, 2) def segment_data(data, fs, seg_size): """ Segment data into specific window sizes. Parameters ---------- data : pd.DataFrame The DataFrame containing the data to be segmented. fs : int The sampling rate of the data. seg_size : int The window size, in seconds, into which the data should be segmented. Returns ------- df : pd.DataFrame The original DataFrame with data segmented with labels in a 'Segment' column. """ df = data.copy() df.insert(0, 'Segment', 0) segment = 1 for n in range(0, len(df), int(seg_size * fs)): df.loc[n:(n + int(seg_size * fs)), 'Segment'] = segment segment += 1 return df def compute_ibis(data, fs, beats_ix, ts_col = None): """ Compute interbeat intervals from beat locations in electrocardiograph (ECG) or photoplethysmograph (PPG) data. Parameters ---------- data : pd.DataFrame The DataFrame containing the pre-processed ECG/PPG data. fs : int The sampling rate of the ECG/PPG data. beats_ix : array_like An array of indices corresponding to beat occurrences. ts_col : str The name of the column in `data` containing timestamp values; by default, None. Returns ------- ibi : pd.DataFrame A DataFrame containing timestamps and IBI values. """ df = data.copy() ibis = (np.diff(beats_ix) / fs) * 1000 if ts_col is not None: ibi = df[[ts_col]].copy() else: ibi = pd.DataFrame({'Sample': np.arange(len(df)) + 1}) for n, ix in enumerate(beats_ix[1:]): ibi.loc[ix, 'IBI'] = ibis[n] return ibi def plot_cardio_signals(signal, fs, ibi, signal_type, x = 'Timestamp', y = 'Filtered', acc = None, seg_num = 1, seg_size = 60, title = None): """ Create subplots of the electrocardiograph (ECG) or photoplethysmograph (PPG), interbeat interval (IBI), and acceleration data (if any). Parameters ---------- signal : pandas.DataFrame A DataFrame containing the pre-processed ECG or PPG data with beat and artifact occurrences in a "Beat" and "Artifact" column. fs : int The sampling rate of the ECG or PPG data. ibi : pandas.DataFrame A DataFrame containing IBI values in an "IBI" column. signal_type : str The type of cardiovascular data being plotted. This must be either 'ECG' or 'PPG'. x : str, optional The name of the column of values in the `signal` DataFrame to plot along the x-axis; by default, 'Timestamp'. y : str, optional The column name of values to plot along the y-axis; by default, 'Filtered'. acc : pandas.DataFrame, optional A DataFrame containing pre-processed acceleration data with magnitude values in a "Magnitude" column. seg_num : int The segment to plot. seg_size : int The size of the segment, in seconds; by default, 60. Returns ------- fig : plotly.graph_objects.Figure A figure containing subplots of ECG or PPG data with beat annotations and its corresponding IBI data. See Also -------- heartview.compute_ibis : Compute IBIs in a DataFrame time-aligned to its corresponding cardiovascular data. """ seg_start = int((seg_num - 1) * seg_size * fs) seg_end = int(seg_start + (fs * seg_size)) for df in [signal, ibi]: df[x] = pd.to_datetime(df[x]) signal_segment = signal.iloc[seg_start:seg_end] ibi_segment = ibi.iloc[seg_start:seg_end].dropna() x_array = signal_segment[x] if not pd.api.types.is_datetime64_any_dtype(x_array): artifact_hover = '<b>Potential Artifact</b> <extra></extra>' beat_hover = '<b>Beat</b> <extra></extra>' else: artifact_hover = '<b>Potential Artifact</b>: %{x|%H:%M:%S.%3f} ' \ '<extra></extra>' beat_hover = '<b>Beat</b>: %{x|%H:%M:%S.%3f} <extra></extra>' if signal_type == 'PPG' or signal_type == 'BVP': y_axis = 'bvp' else: y_axis = 'mV' if acc is not None: fig = make_subplots(rows = 3, cols = 1, shared_xaxes = True, vertical_spacing = 0.02, row_heights = [0.25, 0.50, 0.25]) # ACC subplot acc = scipy_resample(acc['Magnitude'], len(signal)) acc_segment = acc[seg_start:seg_end] fig.add_trace( go.Scatter( x = x_array, y = acc_segment, name = 'ACC', line = dict(color = 'forestgreen', width = 1.5), hovertemplate = '<b>ACC</b>: %{y:.2f} m/s² <extra></extra>'), row = 1, col = 1) fig.update_yaxes( title_text = 'm/s²', title_standoff = 5, row = 1, col = 1, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') # ECG/PPG subplot fig.add_trace( go.Scatter( x = x_array, y = signal_segment[y], name = signal_type, showlegend = True, line = dict(color = '#3562bd', width = 1.5), hovertemplate = f'<b>{signal_type}:</b> %{{y:.2f}} {y_axis} ' f'<extra></extra>'), row = 2, col = 1) fig.update_yaxes( title_text = y_axis, title_standoff = 5, row = 2, col = 1, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') # IBI subplot fig.add_trace( go.Scatter( x = ibi_segment[x], y = ibi_segment['IBI'], name = 'IBI', line = dict(color = '#eb4034', width = 1.5), hovertemplate = '<b>IBI</b>: %{y:.2f} ms <extra></extra>'), row = 3, col = 1) fig.update_yaxes( title_text = 'ms', row = 3, col = 1, title_standoff = 1, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') # Detected beats fig.add_trace( go.Scatter( x = signal_segment.loc[signal_segment.Beat == 1, x], y = signal_segment.loc[signal_segment.Beat == 1, y], name = 'Detected Beat', showlegend = True, mode = 'markers', marker = dict(color = '#f9c669', size = 6), hovertemplate = beat_hover), row = 2, col = 1) # Artifactual beats fig.add_trace( go.Scatter( x = signal_segment.loc[signal_segment.Artifact == 1, x], y = signal_segment.loc[signal_segment.Artifact == 1, y], name = 'Potential Artifact', showlegend = True, mode = 'markers', marker = dict(color = 'red'), hovertemplate = artifact_hover), row = 2, col = 1) else: fig = make_subplots(rows = 2, cols = 1, shared_xaxes = True, vertical_spacing = 0.02, row_heights = [0.6, 0.4]) # ECG/PPG subplot fig.add_trace( go.Scatter( x = x_array, y = signal_segment[y], name = signal_type, showlegend = True, line = dict(color = '#3562bd', width = 1.5), hovertemplate = f'<b>{signal_type}:</b> %{{y:.2f}} {y_axis} ' f'<extra></extra>'), row = 1, col = 1) fig.update_yaxes( title_text = y_axis, row = 1, col = 1, title_standoff = 5, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') # IBI subplot fig.add_trace( go.Scatter( x = ibi_segment[x], y = ibi_segment['IBI'], name = 'IBI', line = dict(color = '#eb4034', width = 1.5), hovertemplate = '<b>IBI</b>: %{y:.2f} ms <extra></extra>'), row = 2, col = 1) fig.update_yaxes( title_text = 'ms', row = 2, col = 1, title_standoff = 1, showgrid = True, gridwidth = 0.5, gridcolor = 'lightgrey', griddash = 'dot', tickcolor = 'grey', linecolor = 'grey') # Detected beats fig.add_trace( go.Scatter( x = signal_segment.loc[signal_segment.Beat == 1, x], y = signal_segment.loc[signal_segment.Beat == 1, y], name = 'Detected Beat', showlegend = True, mode = 'markers', marker = dict(color = '#f9c669', size = 6), hovertemplate = beat_hover), row = 1, col = 1) # Artifactual beats fig.add_trace( go.Scatter( x = signal_segment.loc[signal_segment.Artifact == 1, x], y = signal_segment.loc[signal_segment.Artifact == 1, y], name = 'Potential Artifact', showlegend = True, mode = 'markers', marker = dict(color = 'red'), hovertemplate = artifact_hover), row = 1, col = 1) # Format shared x-axis x_min = signal_segment[x].min() x_max = signal_segment[x].max() fig.update_xaxes( tickfont = dict(size = 14), tickcolor = 'grey', linecolor = 'grey', range = [x_min, x_max]) # Format figure fig.update_layout( height = 450, title_text = title, template = 'simple_white', font = dict(family = 'Poppins', color = 'black'), legend = dict( font = dict(size = 16), orientation = 'h', yanchor = 'bottom', y = 1.05, xanchor = 'right', x = 1.0), annotations = [dict( text = x.capitalize(), x = 0.5, y = -0.22, showarrow = False, xref = 'paper', yref = 'paper', font = dict(size = 16) )], margin = dict(l = 20, r = 20, t = 60, b = 70) ) return fig def plot_signal(df, x, y, fs, seg_size = 60, segment = 1, n_segments = 1, signal_type = None, peaks = None): """ Visualize a signal. Parameters ---------- df : pandas.DataFrame The DataFrame containing the signal data. x : str The column containing the x-axis value (e.g., `'Time'`). y : str, list The column(s) of the signal data (y-axis values). fs : int, float The sampling rate. seg_size : int The size of the segment, in seconds; by default, 60. segment : int, float, None The segment number; by default, 1. For example, segment `1` denotes the first segment of the recording. This argument can also be set to `None` if `df` contains a 'Segment' column. n_segments : int, float The number of segments to be visualized; by default, 1. signal_type : str The type of signal being plotted (i.e., 'ecg', 'bvp', 'acc', 'ibi'); by default, None. peaks : str The column containing peak occurrences, i.e., a sequence of `0` and/or `1` denoting False or True occurrences of peaks. By default, peaks will be plotted on the first trace. Returns ------- fig : matplotlib.axes.AxesSubplot The signal visualization. """ if segment is None and \ 'segment' in [c.lower() for c in df.columns.tolist()]: seg = df.loc[(df.Segment >= 1) & (df.Segment <= 2)] else: start = int(segment - 1) * seg_size * fs end = int(((segment - 1) + n_segments) * seg_size * fs) seg = df.iloc[start:end] # Set plotting parameters plt.rcParams['font.size'] = 14 palette1 = {'blue': '#4c73c2', 'red': '#eb4034', 'green': '#63b068', 'grey': '#bdbdbd'} palette2 = ['#ec2049', '#176196', '#f7db4f', '#63b068'] # Set up the figure fig = go.Figure() # Plot a single signal if not isinstance(y, list): fig.add_trace(go.Scatter( x = seg[x], y = seg[y], mode = 'lines', hovertemplate = '%{x}' + '<br>%{y:.2f}' + '<extra></extra>', name = f'{y}')) # Add peaks if peaks != None: fig.add_trace(go.Scatter( x = seg[x], y = np.where(seg[peaks] == 1, seg[y], np.nan), mode = 'markers', marker = dict(size = 8, color = 'gold', line_width = 1), hovertemplate = '<b>Peak</b>: %{y} <extra></extra>', name = 'Peaks')) fig.update_layout(yaxis_title = y) # Plot multiple signals else: for yval in range(len(y)): fig.add_trace(go.Scatter( x = seg[x], y = seg[y[yval]], mode = 'lines', line = dict(color = palette2[yval]), hovertemplate = '%{x}' + '<br>%{y:.2f}' + '<extra></extra>', name = f'{y[yval]}')) # Add peaks if peaks is not None: fig.add_trace(go.Scatter( x = seg[x], y = np.where(seg[peaks] == 1, seg[y[0]], np.nan), mode = 'markers', marker = dict(size = 8, color = 'gold', line_width = 1), hovertemplate = '<b>Peak</b>: %{y} <extra></extra>', name = 'Peaks')) # Format the plot fig.update_layout( xaxis_title = x, template = 'simple_white', height = 300, margin = dict(l = 10, r = 30, b = 50, t = 50, pad = 3) ) # Label axes and set trace colors according to signal type if signal_type == 'ecg' or signal_type == 'bvp': if isinstance(y, list): for d in range(len(fig.data)): fig.data[d].line.color = palette2[d] return fig.update_layout(yaxis_title = signal_type.upper()) else: return fig.update_traces( line_color = palette1['blue']).update_layout(yaxis_title = y) elif signal_type == 'acc': if isinstance(y, list): for d in range(len(fig.data)): fig.data[d].line.color = palette2[d] return fig.update_layout(yaxis_title = 'm/s<sup>2</sup>') else: return fig.update_traces( line_color = palette1['green']).update_layout( yaxis_title = 'm/s<sup>2</sup>') elif signal_type == 'ibi': if isinstance(y, list): for d in range(len(fig.data)): fig.data[d].line.color = palette2[d] return fig.update_layout(yaxis_title = 'ms') else: return fig.update_traces( line_color = palette1['red']).update_layout(yaxis_title = 'ms') else: return fig def plot_ibi_from_ecg(df, x, y, segment, n_segments): """ Visualize an IBI series generated from ECG data. Parameters ---------- df : pd.DataFrame The DataFrame containing the signal data. x : str The column containing the x-axis value (e.g., `'Time'`). y : str The column containing the IBI series (e.g., `'IBI'`). segment : int, float The segment number. For example, segment `1` denotes the first segment of the recording. n_segments : int, float The number of segments to be visualized; by default, 1. Returns ------- fig : matplotlib.axes.AxesSubplot The IBI series plot. """ start = int(segment) end = round(segment + n_segments) seg = df.loc[df['Segment'].between(start, end, inclusive = 'both')] plt.rcParams['font.size'] = 14 fig = go.Figure() fig.add_trace(go.Scatter( x = seg[x], y = seg[y], mode = 'lines', marker = dict(color = '#eb4034'), hovertemplate = '%{x}' + '<br>%{y:.2f} ms' + '<extra></extra>', name = f'{y}')) ymin = np.nanmin(seg[y].values.flatten()) * 0.95 ymax = np.nanmax(seg[y].values.flatten()) * 1.05 fig.update_layout( yaxis_range = (ymin, ymax), yaxis_title = 'IBI (ms)', xaxis_title = x, template = 'simple_white', height = 300, margin = dict(l = 50, r = 10, b = 50, t = 30, pad = 10) ) fig.update_yaxes( title_standoff = 10) return fig
[docs] def write_beat_editor_file(data, fs, signal_col, beats_col, ts_col = None, filename = None): """ Create a JSON file for input to the Beat Editor. Parameters ---------- data : pd.DataFrame A DataFrame containing the cardiac data. Must contain at least the following columns: - Cardiac signal - Beat occurrences labeled as 1 Optionally, `data` can include: - A timestamp column (specified by `ts_col`). If not provided, sample indices are used. - An "Artifact" column, where artifact occurrences are labeled as 1. This allows the Beat Editor to visualize artifactual beat locations. fs : int The sampling frequency of the signal. signal_col : str The name of the column in `data` containing the cardiac signal. beats_col : str The name of the column in `data` containing beat occurrences. ts_col : str, optional The name of the column in `data` containing the timestamps. If not provided, timestamps are assumed to correspond to the DataFrame index. filename : str, optional The name of the JSON file to write. If no filename is provided, the default filename 'heartview_edit.json' is used. Returns ------- None """ from pathlib import Path # Set the output JSON filename if filename is None: json_filename = 'heartview_edit.json' else: json_filename = filename + '_edit.json' # Check required columns required_cols = [('signal_col', signal_col), ('beats_col', beats_col)] if ts_col: required_cols.append(('ts_col', ts_col)) for name, col in required_cols: if col not in data.columns: raise ValueError(f'`{name}` not found in input data.') # Check if there are any beats if data[beats_col].sum() == 0: warnings.warn('No beat occurrences found in input data.', UserWarning) # Convert to `datetime` format if provided if ts_col is not None: data[ts_col] = pd.to_datetime(data[ts_col]) data.rename(columns = {ts_col: 'Timestamp'}, inplace = True) else: data.insert(0, 'Sample', data.index + 1) # Format columns for JSON keys and save as JSON if 'Segment' not in data.columns: data.insert(0, 'Segment', (data.index // (fs * 60)) + 1) data.rename(columns = {signal_col: 'Signal', beats_col: 'Beat'}, inplace = True) root = Path(__file__).resolve().parents[1] data_dir = root / 'beat-editor' / 'data' data_dir.mkdir(parents = True, exist_ok = True) json_path = data_dir / json_filename data.to_json(json_path, orient = 'records', date_format = 'epoch', lines = False) print(f'Beat Editor JSON file written to {json_path}')
[docs] def process_beat_edits(orig_data, edits): """ Apply manual corrections from the Beat Editor output to original data. Edits are aligned either by sample index or timestamp, depending on the structure of `orig_data`. Parameters ---------- orig_data : pd.DataFrame A DataFrame containing the original cardiac data inputted to the Beat Editor. Must contain a 'Beat' column and either: - 'Timestamp' column (datetime), or - 'Sample' column (integer sample indices) edits : pd.DataFrame A DataFrame of edit instructions parsed from a Beat Editor `_edited.json` file. Must contain the following columns: - 'x': the location of each edit, in the same unit as either `orig_data['Timestamp']` (datetime) or `orig_data['Sample']` (integer) - 'editType': type of edit, with values 'ADD' or 'DELETE' Returns ------- processed : pd.DataFrame A copy of `orig_data` with the following additional columns: - 'Deletion': 1 where beats were deleted, otherwise NaN - 'Addition': 1 where beats were added, otherwise NaN - 'Unusable': 1 where segments are marked unusable, otherwise NaN - 'Edited': 1 where all final beats are, otherwise NaN """ if all(col not in edits.columns for col in ['x', 'from', 'to']): raise ValueError('Input edits missing necessary columns.') else: processed = orig_data.copy() # Handle beat insertions/deletions if 'x' in edits.columns: mask_x = edits['x'].notna() if 'Timestamp' in processed.columns: edits.loc[mask_x, 'x'] = pd.to_datetime( edits.loc[mask_x, 'x'], unit = 'ms', errors = 'coerce') edits.loc[mask_x, 'Sample'] = edits.loc[mask_x, 'x'].apply( lambda x: (processed['Timestamp'] - x).abs().idxmin() if 'Timestamp' in processed.columns else (processed['Sample'] - x).abs().idxmin() ) # Handle unusable markings if all(col in edits.columns for col in ['from', 'to']): mask_from = edits['from'].notna() mask_to = edits['to'].notna() if 'Timestamp' in processed.columns: edits.loc[mask_from, 'from'] = pd.to_datetime( edits.loc[mask_from, 'from'], unit = 'ms', errors = 'coerce') edits.loc[mask_to, 'to'] = pd.to_datetime( edits.loc[mask_to, 'to'], unit = 'ms', errors = 'coerce') # Store aligned sample indices edits.loc[mask_from, 'Sample_from'] = edits.loc[ mask_from, 'from'].apply( lambda x: (processed['Timestamp'] - x).abs().idxmin() if 'Timestamp' in processed.columns else (processed['Sample'] - x).abs().idxmin() ) edits.loc[mask_to, 'Sample_to'] = edits.loc[mask_to, 'to'].apply( lambda x: (processed['Timestamp'] - x).abs().idxmin() if 'Timestamp' in processed.columns else (processed['Sample'] - x).abs().idxmin() ) # Identify rows for beat deletion and addition deletions = edits.loc[edits.editType == 'DELETE', 'Sample'].values additions = edits.loc[edits.editType == 'ADD', 'Sample'].values # Flag deletions and additions in the processed DataFrame processed.loc[deletions, 'Deletion'] = 1 processed.loc[additions, 'Addition'] = 1 # Add 'unusable' labels for start, end in zip(edits.Sample_from, edits.Sample_to): if pd.notna(start) and pd.notna(end): processed.loc[int(start):int(end), 'Unusable'] = 1 # Add the corrected beat column processed['Edited'] = processed.Beat.copy() processed.loc[processed.Unusable == 1, 'Edited'] = np.nan processed.loc[processed.Deletion == 1, 'Edited'] = np.nan processed.loc[processed.Addition == 1, 'Edited'] = 1 return processed