Source code for visualize_accelerometry.callbacks

"""
UI callback logic for annotation management.

Contains the ``CallbackManager`` class that wires widget events to
state mutations + view updates, plus helper functions for creating
annotation rows and building summary HTML.
"""

import os
from datetime import datetime, timedelta

import pandas as pd
import bokeh.plotting as bp

from .config import DISPLAYED_ANNOTATION_COLUMNS, TIME_FMT
from .data_loading import save_annotations


[docs] def capture_new_annotation(start_ts, end_ts, artifact, fname, uname): """Create a single-row DataFrame representing a new annotation. Parameters ---------- start_ts, end_ts : Timestamp Time bounds of the annotated segment. artifact : str Activity type (e.g. ``"chair_stand"``, ``"tug"``). fname : str Path to the signal file (basename is extracted). uname : str Username of the annotator. Returns ------- DataFrame One-row DataFrame matching ``ANNOTATION_COLUMNS``. """ pdf = pd.DataFrame( { "fname": os.path.basename(fname), "artifact": artifact, "segment": 0, "scoring": 0, "review": 0, "start_epoch": pd.to_datetime(start_ts).timestamp(), "end_epoch": pd.to_datetime(end_ts).timestamp(), "start_time": str(start_ts), "end_time": str(end_ts), "annotated_at": str(datetime.now()), "user": uname, "notes": "", }, index=[0], ) return pdf
[docs] def build_summary_html(state): """Build an HTML summary table for all annotations on the current file. Parameters ---------- state : AppState Current session state. Returns ------- str HTML string for the summary pane. """ pdf_annotations = state.pdf_annotations fname = state.fname file_start = state.file_start_timestamp file_end = state.file_end_timestamp artifacts = "" notes = "" reviews = "" if pdf_annotations.shape[0] > 0: pdf_sel = pdf_annotations.loc[ pdf_annotations["fname"] == os.path.basename(fname) ].reset_index(drop=True) if pdf_sel.shape[0] > 0: # Filter out rows with NaT timestamps (review-only flags) # before calling dt.strftime, which would raise on NaT. has_time = pdf_sel["start_time"].notna() & pdf_sel["end_time"].notna() pdf_timed = pdf_sel.loc[has_time].copy() if pdf_timed.shape[0] > 0: pdf_timed = pdf_timed.assign( **{ col: pdf_timed[col].dt.strftime("%d-%m %H:%M:%S") for col in ["start_time", "end_time"] } ) pdf_timed = pdf_timed.assign( annotations_txt=pdf_timed.apply( lambda x: f"{x['start_time']} - {x['end_time']} ({x['user']})", axis=1, ), notes_txt=pdf_timed.apply( lambda x: f"{x['notes']} ({x['user']})", axis=1 ), ) dct_artifacts = { artifact: "<br/>".join( pdf_timed.loc[ (pdf_timed["artifact"] == artifact) & (pdf_timed["scoring"] == 0) & (pdf_timed["segment"] == 0) ]["annotations_txt"].tolist() ) for artifact in ["chair_stand", "6min_walk", "3m_walk", "tug"] } dct_artifacts = {k: v for k, v in dct_artifacts.items() if v} artifacts = ( "<table cellpadding='2'>" + "<tr>" + "".join(f"<td><b>{a}</b></td>" for a in dct_artifacts) + "</tr><tr>" + "".join(f"<td>{dct_artifacts[a]}</td>" for a in dct_artifacts) + "</tr></table>" ) notes = "<br/>".join( pdf_timed.loc[pdf_timed["notes"].fillna("").str.strip() != ""][ "notes_txt" ].tolist() ) # Reviews can exist without time ranges, so use the full pdf_sel pdf_reviews = pdf_sel.loc[pdf_sel["review"] == 1].drop_duplicates( subset=["user", "artifact"] ) if pdf_reviews.shape[0] > 0: pdf_reviews = ( pdf_reviews.groupby("artifact")["user"] .apply(lambda x: ",".join(x)) .reset_index() ) pdf_reviews = pdf_reviews.assign( review_txt=pdf_reviews.apply( lambda x: f"{x['artifact']} : {x['user']}", axis=1 ), ) reviews = "<br/>".join(pdf_reviews["review_txt"].tolist()) # Guard against None timestamps before first file load start_str = ( pd.to_datetime(file_start).strftime("%d-%m-%Y %H:%M:%S") if file_start else "N/A" ) end_str = ( pd.to_datetime(file_end).strftime("%d-%m-%Y %H:%M:%S") if file_end else "N/A" ) return f""" <table style="width:100%; border-collapse:collapse; font-size:12px; margin-top:8px; font-family:'Montserrat',Helvetica,Arial,sans-serif;"> <tr style="background-color:#58595b;"> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Start Time</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">End Time</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Annotations</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Notes</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Reviews</th> </tr> <tr> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{start_str}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{end_str}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{artifacts}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{notes}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{reviews}</td> </tr> </table> """
def _filter_annotations_in_range(pdf_annotations, start_ts, end_ts, uname, fname): """Return a boolean mask of annotations within a time range for one user/file. Parameters ---------- pdf_annotations : DataFrame Full annotations DataFrame. start_ts, end_ts : Timestamp Time bounds of the selection. uname : str Filter to this user. fname : str Filter to this file (basename is extracted). Returns ------- Series[bool] Mask aligned with *pdf_annotations*. """ annot_start = pd.to_datetime(pdf_annotations["start_time"], errors="coerce") annot_end = pd.to_datetime(pdf_annotations["end_time"], errors="coerce") mask = ( annot_start.between(start_ts, end_ts, inclusive="both") & annot_end.between(start_ts, end_ts, inclusive="both") & (pdf_annotations["user"] == uname) & (pdf_annotations["fname"] == os.path.basename(fname)) ) return mask
[docs] class CallbackManager: """Orchestrates UI callbacks between widget events and AppState. Parameters ---------- state : AppState Per-session state object. widgets : dict Name-to-widget mapping populated by ``app.py``. Must include keys for all buttons, labels, and layout containers. """ def __init__(self, state, widgets): self.state = state self.w = widgets # ------------------------------------------------------------------ # Plot lifecycle # ------------------------------------------------------------------ def _notify(self, msg, duration=3000, kind="info"): """Show a toast notification in the bottom-right corner.""" import panel as pn getattr(pn.state.notifications, kind)(msg, duration=duration)
[docs] def update_plot(self, force_rebuild=False, _empty_depth=0): """Load data for the current file/anchor and update the plot. When an existing plot exists and *force_rebuild* is False, only the CDS data and axis ranges are patched (much faster than a full figure rebuild). Falls back to a full rebuild when no existing plot is available or when the fast path fails. If the file is empty or unreadable, shows a notification and advances to the next file in the dropdown. """ self._notify("Building plot\u2026", duration=2000) basename = os.path.splitext(os.path.basename(self.state.fname))[0] # Find the file picker entry (e.g. "alan--060294-20221208125829") # which includes the assigned user prefix label = basename for entry in self.state.lst_fnames: if entry.endswith(basename): label = entry break self.w["file_label"].object = f"### Annotating: {label}" try: pdf = self.state.load_file_data() except Exception as ex: pdf = None print(f"Error loading file {self.state.fname}: {ex}") if pdf is None or len(pdf) == 0: self._handle_empty_file(_depth=_empty_depth) return # Fast path: update existing CDS + ranges without rebuilding if ( not force_rebuild and self.state.signal_cds is not None and self.w.get("main_fig") is not None ): from .plotting import update_plot_data updated = update_plot_data( pdf, self.state.signal_cds, self.w["main_fig"], range_source=self.w.get("range_source"), ) if updated: self.state.selection_bounds = None self.update_annotations() self._update_nav_buttons() return # Full rebuild (first load or fast path failed) self._refresh_plot(pdf) self.state.selection_bounds = None self.update_annotations() self._update_nav_buttons()
def _handle_empty_file(self, _depth=0): """Show a notification and skip to the next non-empty file. Uses a depth counter to prevent unbounded recursion when consecutive files are all empty. """ if _depth >= len(self.state.lst_fnames): self._notify("All files are empty or unreadable.", duration=5000) return basename = os.path.basename(self.state.fname) self._notify( f"File '{basename}' is empty or could not be loaded. Skipping to next file.", duration=5000, ) # Find the next file in the dropdown list current_fnames = self.state.lst_fnames current_basename = basename # Find which entries match this file (ignoring user prefix) current_idx = None for i, fn in enumerate(current_fnames): parts = fn.split("--", 1) if len(parts) == 2 and parts[1] == os.path.splitext(current_basename)[0]: current_idx = i break if current_idx is not None and current_idx + 1 < len(current_fnames): next_fname = current_fnames[current_idx + 1] elif len(current_fnames) > 0: # Wrap around to the first file next_fname = current_fnames[0] else: return self.plot_new_file(next_fname, _empty_depth=_depth + 1) def _refresh_plot(self, pdf): """Rebuild Bokeh figures with new signal data. Swaps panes in the stable ``main_content`` Column by index so that the Panel layout reference stays valid across rebuilds. Re-wires the box-select callback on the new signal CDS. """ from .plotting import make_plot main_pane, range_pane, main_fig, signal_cds, range_source = make_plot( pdf, self.state.annotation_cds ) # Swap panes by index in the stable parent Column container = self.w["main_content"] container[self.w["main_plot_idx"]] = main_pane container[self.w["range_plot_idx"]] = range_pane self.w["main_plot"] = main_pane self.w["range_plot"] = range_pane self.w["main_fig"] = main_fig self.w["range_source"] = range_source self.state.signal_cds = signal_cds # Re-attach the selection callback to the new CDS if self.w.get("_selection_wire_fn"): signal_cds.selected.on_change("indices", self.w["_selection_wire_fn"]) # ------------------------------------------------------------------ # Annotation overlays (no plot rebuild — just CDS data updates) # ------------------------------------------------------------------
[docs] def update_annotations(self): """Sync annotation overlay CDS data and refresh selection state.""" self.state.update_annotation_sources() self.update_selection()
[docs] def update_selection(self): """Update button states and selection tables based on current bounds. Enables/disables annotation buttons depending on whether a region is selected and whether existing annotations fall within it. """ s = self.state w = self.w s.pdf_displayed_annotations = s.get_displayed_annotations() bounds = s.selection_bounds pdf_sel_data = pd.DataFrame(columns=["start_time", "end_time"]) pdf_sel_annot = pd.DataFrame(columns=s.pdf_annotations.columns) has_selection = bounds is not None and s.username != "None" if has_selection: start_ts, end_ts = bounds # Enable annotation creation buttons w["btn_clear"].disabled = False w["btn_tug"].disabled = False w["btn_3m_walk"].disabled = False w["btn_6min_walk"].disabled = False w["btn_chairstand"].disabled = False pdf_sel_data = pd.DataFrame( {"start_time": str(start_ts), "end_time": str(end_ts)}, index=[0] ) # Find existing annotations within the selected bounds disp_start = pd.to_datetime(s.pdf_displayed_annotations["start_time"], errors="coerce") disp_end = pd.to_datetime(s.pdf_displayed_annotations["end_time"], errors="coerce") pdf_sel_annot = s.pdf_displayed_annotations.loc[ disp_start.between(start_ts, end_ts, inclusive="both") & disp_end.between(start_ts, end_ts, inclusive="both") ] # Convert datetimes to strings for Bokeh DataTable display pdf_sel_annot = pdf_sel_annot.assign( **{col: pdf_sel_annot[col].astype(str) for col in ["start_time", "end_time"]} ) # Modification buttons only enabled when annotations exist in the selection has_annots = pdf_sel_annot.shape[0] > 0 w["btn_remove"].disabled = not has_annots w["btn_segment"].disabled = not has_annots w["btn_scoring"].disabled = not has_annots w["btn_review"].disabled = not has_annots w["btn_notes"].disabled = not has_annots w["notes_input"].disabled = not has_annots else: for key in [ "btn_clear", "btn_tug", "btn_3m_walk", "btn_6min_walk", "btn_chairstand", "btn_remove", "btn_segment", "btn_scoring", "btn_review", "btn_notes", ]: w[key].disabled = True w["notes_input"].disabled = True # Push data to the Bokeh DataTable sources s.selected_data.data = dict(bp.ColumnDataSource(pdf_sel_data).data) annot_cols = [c for c in DISPLAYED_ANNOTATION_COLUMNS if c in pdf_sel_annot.columns] if not annot_cols: pdf_sel_annot = pd.DataFrame(columns=DISPLAYED_ANNOTATION_COLUMNS) else: pdf_sel_annot = pdf_sel_annot[annot_cols] s.selected_annotations.data = dict(bp.ColumnDataSource(pdf_sel_annot).data)
# ------------------------------------------------------------------ # Annotation CRUD # ------------------------------------------------------------------
[docs] def mark_annotation(self, artifact): """Add a new annotation for the selected time range.""" s = self.state if s.selection_bounds: start_ts, end_ts = s.selection_bounds pdf_new = capture_new_annotation( start_ts, end_ts, artifact, s.fname, s.username ) s.pdf_annotations = pd.concat( [s.pdf_annotations, pdf_new], ignore_index=True ) self.update_annotations()
[docs] def toggle_flag(self, flag_name): """Toggle a boolean flag (segment/scoring/review) on selected annotations. Parameters ---------- flag_name : str Column name to toggle (``"segment"``, ``"scoring"``, or ``"review"``). """ s = self.state if not s.selection_bounds: self.update_annotations() return start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) selected = s.pdf_annotations.loc[mask].copy() s.pdf_annotations = s.pdf_annotations.loc[~mask] # Flip: 0→1, 1→0 selected = selected.assign( **{flag_name: (selected[flag_name] != 1).astype(int)} ) s.pdf_annotations = pd.concat( [s.pdf_annotations, selected], ignore_index=True ) self.update_annotations()
[docs] def remove_selected_annotations(self): """Delete all annotations within the current selection bounds.""" s = self.state if s.selection_bounds: start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) s.pdf_annotations = s.pdf_annotations.loc[~mask] self.update_annotations()
[docs] def add_notes(self, notes_text=""): """Set notes text on all annotations within the selection.""" s = self.state if not s.selection_bounds: self.update_annotations() return start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) selected = s.pdf_annotations.loc[mask].reset_index(drop=True) s.pdf_annotations = s.pdf_annotations.loc[~mask] selected = selected.assign(notes=notes_text) s.pdf_annotations = pd.concat([s.pdf_annotations, selected], ignore_index=True) self.w["notes_input"].value = "" self.update_annotations()
[docs] def save(self): """Persist annotations to disk and refresh the summary.""" s = self.state s.pdf_annotations = save_annotations( s.pdf_annotations, s.username, s.fname ) self.update_annotations() self.w["summary"].object = build_summary_html(s) self._notify("Annotations exported", duration=3000, kind="success")
# ------------------------------------------------------------------ # Navigation # ------------------------------------------------------------------
[docs] def plot_new_file(self, fname_with_user, _empty_depth=0): """Switch to a different file and update the plot. Parameters ---------- fname_with_user : str ``"username--filename"`` string from the file picker. """ self._notify("Loading file\u2026", duration=3000) s = self.state s.anchor_timestamp = None parts = fname_with_user.split("--", 1) s.fname = os.path.join( os.path.dirname(s.fname), parts[1] if len(parts) == 2 else parts[0], ) self.update_plot(_empty_depth=_empty_depth) self.w["summary"].object = build_summary_html(s)
[docs] def move_next_window(self): """Advance the anchor timestamp by one full window, clamped to file end.""" s = self.state if not s.file_end_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) end_dt = datetime.strptime(s.file_end_timestamp, TIME_FMT) new_anchor = anchor_dt + timedelta(seconds=s.windowsize) # Don't advance past the point where the window would exceed file end if new_anchor > end_dt: new_anchor = end_dt - timedelta(seconds=s.windowsize / 2) if new_anchor <= anchor_dt: return s.anchor_timestamp = new_anchor.strftime(TIME_FMT) self.update_plot()
[docs] def move_prev_window(self): """Move the anchor timestamp back by one full window, clamped to file start.""" s = self.state if not s.file_start_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) start_dt = datetime.strptime(s.file_start_timestamp, TIME_FMT) new_anchor = anchor_dt - timedelta(seconds=s.windowsize) # Don't go before the point where the window would precede file start if new_anchor < start_dt: new_anchor = start_dt + timedelta(seconds=s.windowsize / 2) if new_anchor >= anchor_dt: return s.anchor_timestamp = new_anchor.strftime(TIME_FMT) self.update_plot()
def _update_nav_buttons(self): """Enable or disable prev/next buttons based on file boundaries.""" s = self.state if not s.file_start_timestamp or not s.file_end_timestamp or not s.anchor_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) start_dt = datetime.strptime(s.file_start_timestamp, TIME_FMT) end_dt = datetime.strptime(s.file_end_timestamp, TIME_FMT) half_win = timedelta(seconds=s.windowsize / 2) self.w["btn_prev"].disabled = (anchor_dt - half_win) <= start_dt self.w["btn_next"].disabled = (anchor_dt + half_win) >= end_dt
[docs] def update_anchor_timestamp(self, value): """Parse and store a user-entered anchor time string. Parameters ---------- value : str Time string in ``TIME_FMT``. """ try: self.state.anchor_timestamp = datetime.strptime( value, TIME_FMT ).strftime(TIME_FMT) except Exception as ex: print(f"Invalid time entered: {value} ({ex})")
[docs] def update_windowsize(self, value): """Parse and store a user-entered window size. Parameters ---------- value : str Numeric string, optionally suffixed with ``"s"``. """ try: self.state.windowsize = float(str(value).strip().replace("s", "")) except Exception as ex: print(f"Invalid windowsize: {value} ({ex})")
[docs] def update_review_flags(self, new_reviews): """Sync file-level review flags with the multi-select widget. Review flags are annotation rows with ``review=1`` and no time range (``start_time`` is NaT). This method diffs the widget state against the current review flags and adds/removes rows accordingly. Parameters ---------- new_reviews : list of str Currently selected artifact types in the review widget. """ s = self.state basename = os.path.basename(s.fname) # Get the current review-flag artifacts (rows without time ranges) current_reviews = s.pdf_annotations.loc[ (s.pdf_annotations["user"] == s.username) & (s.pdf_annotations["fname"] == basename) & (s.pdf_annotations["review"] == 1) & (s.pdf_annotations["start_time"].isna()) ]["artifact"].tolist() if set(new_reviews) != set(current_reviews): # Remove existing review-only rows for this user/file s.pdf_annotations = s.pdf_annotations.loc[ ~( (s.pdf_annotations["user"] == s.username) & (s.pdf_annotations["fname"] == basename) & (s.pdf_annotations["review"] == 1) & (s.pdf_annotations["start_time"].isna()) ) ] # Add new review-flag rows (no time range) if new_reviews: new_rows = pd.DataFrame( [ { "fname": basename, "artifact": artifact, "segment": 0, "scoring": 0, "review": 1, "annotated_at": str(datetime.now()), "user": s.username, } for artifact in new_reviews ] ) s.pdf_annotations = pd.concat( [s.pdf_annotations, new_rows], ignore_index=True ).reset_index(drop=True) s.get_displayed_annotations()