Source code for visualize_accelerometry.callbacks

"""
UI callback logic for annotation management.

Contains the ``CallbackManager`` class that wires widget events to
state mutations + view updates, plus helper functions for creating
annotation rows and building summary HTML.
"""

import os
from datetime import datetime, timedelta

import pandas as pd
import bokeh.plotting as bp

from .config import (
    DISPLAYED_ANNOTATION_COLUMNS, TIME_FMT, WALKING_SUGGESTION_COLUMNS,
)
from .data_loading import save_annotations


def _row_to_segment(row):
    """Convert a walking-suggestions xlsx row to the in-memory segment dict."""
    return {
        "start_time": pd.to_datetime(row["start_time"]),
        "end_time": pd.to_datetime(row["end_time"]),
        "duration_s": float(row["duration_s"]),
        "mean_step_freq_hz": float(row["mean_step_freq_hz"]),
        "dismissed": bool(row.get("deleted", False)),
    }


[docs] def capture_new_annotation(start_ts, end_ts, artifact, fname, uname): """Create a single-row DataFrame representing a new annotation. Parameters ---------- start_ts, end_ts : Timestamp Time bounds of the annotated segment. artifact : str Activity type (e.g. ``"chair_stand"``, ``"tug"``). fname : str Path to the signal file (basename is extracted). uname : str Username of the annotator. Returns ------- DataFrame One-row DataFrame matching ``ANNOTATION_COLUMNS``. """ pdf = pd.DataFrame( { "fname": os.path.basename(fname), "artifact": artifact, "segment": 0, "scoring": 0, "review": 0, "start_epoch": pd.to_datetime(start_ts).timestamp(), "end_epoch": pd.to_datetime(end_ts).timestamp(), "start_time": str(start_ts), "end_time": str(end_ts), "annotated_at": str(datetime.now()), "user": uname, "notes": "", }, index=[0], ) return pdf
[docs] def build_summary_html(state): """Build an HTML summary table for all annotations on the current file. Parameters ---------- state : AppState Current session state. Returns ------- str HTML string for the summary pane. """ pdf_annotations = state.pdf_annotations fname = state.fname file_start = state.file_start_timestamp file_end = state.file_end_timestamp artifacts = "" notes = "" reviews = "" if pdf_annotations.shape[0] > 0: pdf_sel = pdf_annotations.loc[ pdf_annotations["fname"] == os.path.basename(fname) ].reset_index(drop=True) if pdf_sel.shape[0] > 0: # Filter out rows with NaT timestamps (review-only flags) # before calling dt.strftime, which would raise on NaT. has_time = pdf_sel["start_time"].notna() & pdf_sel["end_time"].notna() pdf_timed = pdf_sel.loc[has_time].copy() if pdf_timed.shape[0] > 0: pdf_timed = pdf_timed.assign( **{ col: pdf_timed[col].dt.strftime("%d-%m %H:%M:%S") for col in ["start_time", "end_time"] } ) pdf_timed = pdf_timed.assign( annotations_txt=pdf_timed.apply( lambda x: f"{x['start_time']} - {x['end_time']} ({x['user']})", axis=1, ), notes_txt=pdf_timed.apply( lambda x: f"{x['notes']} ({x['user']})", axis=1 ), ) dct_artifacts = { artifact: "<br/>".join( pdf_timed.loc[ (pdf_timed["artifact"] == artifact) & (pdf_timed["scoring"] == 0) & (pdf_timed["segment"] == 0) ]["annotations_txt"].tolist() ) for artifact in ["chair_stand", "6min_walk", "3m_walk", "tug"] } dct_artifacts = {k: v for k, v in dct_artifacts.items() if v} artifacts = ( "<table cellpadding='2'>" + "<tr>" + "".join(f"<td><b>{a}</b></td>" for a in dct_artifacts) + "</tr><tr>" + "".join(f"<td>{dct_artifacts[a]}</td>" for a in dct_artifacts) + "</tr></table>" ) notes = "<br/>".join( pdf_timed.loc[pdf_timed["notes"].fillna("").str.strip() != ""][ "notes_txt" ].tolist() ) # Reviews can exist without time ranges, so use the full pdf_sel pdf_reviews = pdf_sel.loc[pdf_sel["review"] == 1].drop_duplicates( subset=["user", "artifact"] ) if pdf_reviews.shape[0] > 0: pdf_reviews = ( pdf_reviews.groupby("artifact")["user"] .apply(lambda x: ",".join(x)) .reset_index() ) pdf_reviews = pdf_reviews.assign( review_txt=pdf_reviews.apply( lambda x: f"{x['artifact']} : {x['user']}", axis=1 ), ) reviews = "<br/>".join(pdf_reviews["review_txt"].tolist()) # Guard against None timestamps before first file load start_str = ( pd.to_datetime(file_start).strftime("%d-%m-%Y %H:%M:%S") if file_start else "N/A" ) end_str = ( pd.to_datetime(file_end).strftime("%d-%m-%Y %H:%M:%S") if file_end else "N/A" ) return f""" <table style="width:100%; border-collapse:collapse; font-size:12px; margin-top:8px; font-family:'Montserrat',Helvetica,Arial,sans-serif;"> <tr style="background-color:#58595b;"> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Start Time</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">End Time</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Annotations</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Notes</th> <th style="padding:5px 10px; color:#fff; font-size:11px; text-align:left;">Reviews</th> </tr> <tr> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{start_str}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{end_str}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{artifacts}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{notes}</td> <td style="padding:5px 10px; border-bottom:1px solid #e0e0e0; font-size:11px;">{reviews}</td> </tr> </table> """
def _filter_annotations_in_range(pdf_annotations, start_ts, end_ts, uname, fname): """Return a boolean mask of annotations within a time range for one user/file. Parameters ---------- pdf_annotations : DataFrame Full annotations DataFrame. start_ts, end_ts : Timestamp Time bounds of the selection. uname : str Filter to this user. fname : str Filter to this file (basename is extracted). Returns ------- Series[bool] Mask aligned with *pdf_annotations*. """ annot_start = pd.to_datetime(pdf_annotations["start_time"], errors="coerce") annot_end = pd.to_datetime(pdf_annotations["end_time"], errors="coerce") mask = ( annot_start.between(start_ts, end_ts, inclusive="both") & annot_end.between(start_ts, end_ts, inclusive="both") & (pdf_annotations["user"] == uname) & (pdf_annotations["fname"] == os.path.basename(fname)) ) return mask
[docs] class CallbackManager: """Orchestrates UI callbacks between widget events and AppState. Parameters ---------- state : AppState Per-session state object. widgets : dict Name-to-widget mapping populated by ``app.py``. Must include keys for all buttons, labels, and layout containers. """ def __init__(self, state, widgets): self.state = state self.w = widgets # ------------------------------------------------------------------ # Plot lifecycle # ------------------------------------------------------------------ def _notify(self, msg, duration=3000, kind="info"): """Show a toast notification in the bottom-right corner.""" import panel as pn getattr(pn.state.notifications, kind)(msg, duration=duration)
[docs] def update_plot(self, force_rebuild=False, _empty_depth=0): """Load data for the current file/anchor and update the plot. When an existing plot exists and *force_rebuild* is False, only the CDS data and axis ranges are patched (much faster than a full figure rebuild). Falls back to a full rebuild when no existing plot is available or when the fast path fails. If the file is empty or unreadable, shows a notification and advances to the next file in the dropdown. """ self._notify("Building plot\u2026", duration=2000) basename = os.path.splitext(os.path.basename(self.state.fname))[0] # Find the file picker entry (e.g. "alan--060294-20221208125829") # which includes the assigned user prefix label = basename for entry in self.state.lst_fnames: if entry.endswith(basename): label = entry break self.w["file_label"].object = f"### Annotating: {label}" try: pdf = self.state.load_file_data() except Exception as ex: pdf = None print(f"Error loading file {self.state.fname}: {ex}") if pdf is None or len(pdf) == 0: self._handle_empty_file(_depth=_empty_depth) return # Fast path: update existing CDS + ranges without rebuilding if ( not force_rebuild and self.state.signal_cds is not None and self.w.get("main_fig") is not None ): from .plotting import update_plot_data updated = update_plot_data( pdf, self.state.signal_cds, self.w["main_fig"], range_source=self.w.get("range_source"), ) if updated: self.state.selection_bounds = None self.update_annotations() self._update_nav_buttons() return # Full rebuild (first load or fast path failed) self._refresh_plot(pdf) self.state.selection_bounds = None self.update_annotations() self._update_nav_buttons()
def _handle_empty_file(self, _depth=0): """Show a notification and skip to the next non-empty file. Uses a depth counter to prevent unbounded recursion when consecutive files are all empty. """ if _depth >= len(self.state.lst_fnames): self._notify("All files are empty or unreadable.", duration=5000) return basename = os.path.basename(self.state.fname) self._notify( f"File '{basename}' is empty or could not be loaded. Skipping to next file.", duration=5000, ) # Find the next file in the dropdown list current_fnames = self.state.lst_fnames current_basename = basename # Find which entries match this file (ignoring user prefix) current_idx = None for i, fn in enumerate(current_fnames): parts = fn.split("--", 1) if len(parts) == 2 and parts[1] == os.path.splitext(current_basename)[0]: current_idx = i break if current_idx is not None and current_idx + 1 < len(current_fnames): next_fname = current_fnames[current_idx + 1] elif len(current_fnames) > 0: # Wrap around to the first file next_fname = current_fnames[0] else: return self.plot_new_file(next_fname, _empty_depth=_depth + 1) def _refresh_plot(self, pdf): """Rebuild Bokeh figures with new signal data. Swaps panes in the stable ``main_content`` Column by index so that the Panel layout reference stays valid across rebuilds. Re-wires the box-select callback on the new signal CDS. """ from .plotting import make_plot main_pane, range_pane, main_fig, signal_cds, range_source = make_plot( pdf, self.state.annotation_cds ) # Swap panes by index in the stable parent Column container = self.w["main_content"] container[self.w["main_plot_idx"]] = main_pane container[self.w["range_plot_idx"]] = range_pane self.w["main_plot"] = main_pane self.w["range_plot"] = range_pane self.w["main_fig"] = main_fig self.w["range_source"] = range_source self.state.signal_cds = signal_cds # Re-attach the selection callback to the new CDS if self.w.get("_selection_wire_fn"): signal_cds.selected.on_change("indices", self.w["_selection_wire_fn"]) # ------------------------------------------------------------------ # Annotation overlays (no plot rebuild — just CDS data updates) # ------------------------------------------------------------------
[docs] def update_annotations(self): """Sync annotation overlay CDS data and refresh selection state.""" self.state.update_annotation_sources() self.update_selection()
[docs] def update_selection(self): """Update button states and selection tables based on current bounds. Enables/disables annotation buttons depending on whether a region is selected and whether existing annotations fall within it. """ s = self.state w = self.w s.pdf_displayed_annotations = s.get_displayed_annotations() bounds = s.selection_bounds pdf_sel_data = pd.DataFrame(columns=["start_time", "end_time"]) pdf_sel_annot = pd.DataFrame(columns=s.pdf_annotations.columns) has_selection = bounds is not None and s.username != "None" if has_selection: start_ts, end_ts = bounds # Enable annotation creation buttons w["btn_clear"].disabled = False w["btn_tug"].disabled = False w["btn_3m_walk"].disabled = False w["btn_6min_walk"].disabled = False w["btn_chairstand"].disabled = False pdf_sel_data = pd.DataFrame( {"start_time": str(start_ts), "end_time": str(end_ts)}, index=[0] ) # Find existing annotations within the selected bounds disp_start = pd.to_datetime(s.pdf_displayed_annotations["start_time"], errors="coerce") disp_end = pd.to_datetime(s.pdf_displayed_annotations["end_time"], errors="coerce") pdf_sel_annot = s.pdf_displayed_annotations.loc[ disp_start.between(start_ts, end_ts, inclusive="both") & disp_end.between(start_ts, end_ts, inclusive="both") ] # Convert datetimes to strings for Bokeh DataTable display pdf_sel_annot = pdf_sel_annot.assign( **{col: pdf_sel_annot[col].astype(str) for col in ["start_time", "end_time"]} ) # Modification buttons only enabled when annotations exist in the selection has_annots = pdf_sel_annot.shape[0] > 0 w["btn_remove"].disabled = not has_annots w["btn_segment"].disabled = not has_annots w["btn_scoring"].disabled = not has_annots w["btn_review"].disabled = not has_annots w["btn_notes"].disabled = not has_annots w["notes_input"].disabled = not has_annots else: for key in [ "btn_clear", "btn_tug", "btn_3m_walk", "btn_6min_walk", "btn_chairstand", "btn_remove", "btn_segment", "btn_scoring", "btn_review", "btn_notes", ]: w[key].disabled = True w["notes_input"].disabled = True # Push data to the Bokeh DataTable sources s.selected_data.data = dict(bp.ColumnDataSource(pdf_sel_data).data) annot_cols = [c for c in DISPLAYED_ANNOTATION_COLUMNS if c in pdf_sel_annot.columns] if not annot_cols: pdf_sel_annot = pd.DataFrame(columns=DISPLAYED_ANNOTATION_COLUMNS) else: pdf_sel_annot = pdf_sel_annot[annot_cols] s.selected_annotations.data = dict(bp.ColumnDataSource(pdf_sel_annot).data)
# ------------------------------------------------------------------ # Annotation CRUD # ------------------------------------------------------------------
[docs] def mark_annotation(self, artifact): """Add a new annotation for the selected time range.""" s = self.state if s.selection_bounds: start_ts, end_ts = s.selection_bounds pdf_new = capture_new_annotation( start_ts, end_ts, artifact, s.fname, s.username ) s.pdf_annotations = pd.concat( [s.pdf_annotations, pdf_new], ignore_index=True ) self.update_annotations()
[docs] def toggle_flag(self, flag_name): """Toggle a boolean flag (segment/scoring/review) on selected annotations. Parameters ---------- flag_name : str Column name to toggle (``"segment"``, ``"scoring"``, or ``"review"``). """ s = self.state if not s.selection_bounds: self.update_annotations() return start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) selected = s.pdf_annotations.loc[mask].copy() s.pdf_annotations = s.pdf_annotations.loc[~mask] # Flip: 0→1, 1→0 selected = selected.assign( **{flag_name: (selected[flag_name] != 1).astype(int)} ) s.pdf_annotations = pd.concat( [s.pdf_annotations, selected], ignore_index=True ) self.update_annotations()
[docs] def remove_selected_annotations(self): """Delete all annotations within the current selection bounds.""" s = self.state if s.selection_bounds: start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) s.pdf_annotations = s.pdf_annotations.loc[~mask] self.update_annotations()
[docs] def add_notes(self, notes_text=""): """Set notes text on all annotations within the selection.""" s = self.state if not s.selection_bounds: self.update_annotations() return start_ts, end_ts = s.selection_bounds mask = _filter_annotations_in_range( s.pdf_annotations, start_ts, end_ts, s.username, s.fname ) selected = s.pdf_annotations.loc[mask].reset_index(drop=True) s.pdf_annotations = s.pdf_annotations.loc[~mask] selected = selected.assign(notes=notes_text) s.pdf_annotations = pd.concat([s.pdf_annotations, selected], ignore_index=True) self.w["notes_input"].value = "" self.update_annotations()
[docs] def save(self): """Persist annotations to disk and refresh the summary.""" s = self.state s.pdf_annotations = save_annotations( s.pdf_annotations, s.username, s.fname ) self.update_annotations() self.w["summary"].object = build_summary_html(s) self._notify("Annotations exported", duration=3000, kind="success")
# ------------------------------------------------------------------ # Navigation # ------------------------------------------------------------------
[docs] def plot_new_file(self, fname_with_user, _empty_depth=0): """Switch to a different file and update the plot. Parameters ---------- fname_with_user : str ``"username--filename"`` string from the file picker. """ self._notify("Loading file\u2026", duration=3000) s = self.state s.anchor_timestamp = None parts = fname_with_user.split("--", 1) s.fname = os.path.join( os.path.dirname(s.fname), parts[1] if len(parts) == 2 else parts[0], ) # Walking suggestions are file-specific; load any persisted ones # for the new file so the user sees their prior detection state. # If no saved suggestions exist for this file, the list is empty. s.clear_walking_suggestions() self.load_persisted_walking_suggestions() self.update_plot(_empty_depth=_empty_depth) self.w["summary"].object = build_summary_html(s)
[docs] def move_next_window(self): """Advance the anchor timestamp by one full window, clamped to file end.""" s = self.state if not s.file_end_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) end_dt = datetime.strptime(s.file_end_timestamp, TIME_FMT) new_anchor = anchor_dt + timedelta(seconds=s.windowsize) # Don't advance past the point where the window would exceed file end if new_anchor > end_dt: new_anchor = end_dt - timedelta(seconds=s.windowsize / 2) if new_anchor <= anchor_dt: return s.anchor_timestamp = new_anchor.strftime(TIME_FMT) self.update_plot()
[docs] def move_prev_window(self): """Move the anchor timestamp back by one full window, clamped to file start.""" s = self.state if not s.file_start_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) start_dt = datetime.strptime(s.file_start_timestamp, TIME_FMT) new_anchor = anchor_dt - timedelta(seconds=s.windowsize) # Don't go before the point where the window would precede file start if new_anchor < start_dt: new_anchor = start_dt + timedelta(seconds=s.windowsize / 2) if new_anchor >= anchor_dt: return s.anchor_timestamp = new_anchor.strftime(TIME_FMT) self.update_plot()
def _update_nav_buttons(self): """Enable or disable prev/next buttons based on file boundaries.""" s = self.state if not s.file_start_timestamp or not s.file_end_timestamp or not s.anchor_timestamp: return anchor_dt = datetime.strptime(s.anchor_timestamp, TIME_FMT) start_dt = datetime.strptime(s.file_start_timestamp, TIME_FMT) end_dt = datetime.strptime(s.file_end_timestamp, TIME_FMT) half_win = timedelta(seconds=s.windowsize / 2) self.w["btn_prev"].disabled = (anchor_dt - half_win) <= start_dt self.w["btn_next"].disabled = (anchor_dt + half_win) >= end_dt
[docs] def update_anchor_timestamp(self, value): """Parse and store a user-entered anchor time string. Parameters ---------- value : str Time string in ``TIME_FMT``. """ try: self.state.anchor_timestamp = datetime.strptime( value, TIME_FMT ).strftime(TIME_FMT) except Exception as ex: print(f"Invalid time entered: {value} ({ex})")
[docs] def update_windowsize(self, value): """Parse and store a user-entered window size. Parameters ---------- value : str Numeric string, optionally suffixed with ``"s"``. """ try: self.state.windowsize = float(str(value).strip().replace("s", "")) except Exception as ex: print(f"Invalid windowsize: {value} ({ex})")
[docs] def update_review_flags(self, new_reviews): """Sync file-level review flags with the multi-select widget. Review flags are annotation rows with ``review=1`` and no time range (``start_time`` is NaT). This method diffs the widget state against the current review flags and adds/removes rows accordingly. Parameters ---------- new_reviews : list of str Currently selected artifact types in the review widget. """ s = self.state basename = os.path.basename(s.fname) # Get the current review-flag artifacts (rows without time ranges) current_reviews = s.pdf_annotations.loc[ (s.pdf_annotations["user"] == s.username) & (s.pdf_annotations["fname"] == basename) & (s.pdf_annotations["review"] == 1) & (s.pdf_annotations["start_time"].isna()) ]["artifact"].tolist() if set(new_reviews) != set(current_reviews): # Remove existing review-only rows for this user/file s.pdf_annotations = s.pdf_annotations.loc[ ~( (s.pdf_annotations["user"] == s.username) & (s.pdf_annotations["fname"] == basename) & (s.pdf_annotations["review"] == 1) & (s.pdf_annotations["start_time"].isna()) ) ] # Add new review-flag rows (no time range) if new_reviews: new_rows = pd.DataFrame( [ { "fname": basename, "artifact": artifact, "segment": 0, "scoring": 0, "review": 1, "annotated_at": str(datetime.now()), "user": s.username, } for artifact in new_reviews ] ) s.pdf_annotations = pd.concat( [s.pdf_annotations, new_rows], ignore_index=True ).reset_index(drop=True) s.get_displayed_annotations()
# ------------------------------------------------------------------ # Walking detection (Urbanek 2015 SHW) # ------------------------------------------------------------------
[docs] def load_persisted_walking_suggestions(self): """Populate the in-memory suggestion list from the shared xlsx for the currently-loaded file. Loads *all* rows for this file, including dismissed ones — the UI displays dismissed entries as red rows so the user can toggle their state. Only non-dismissed entries appear in the plot overlay CDS. Called on session start and after a file switch so that prior detection runs survive page refresh. """ from .data_loading import load_walking_suggestions s = self.state try: saved = load_walking_suggestions() except Exception as ex: print(f"Walking suggestions: failed to load xlsx: {ex}") saved = None s.walking_suggestions = [] s.walking_suggestion_idx = None if saved is not None and not saved.empty: basename = os.path.basename(s.fname) mine = saved.loc[saved["fname"] == basename] loaded = [_row_to_segment(row) for _, row in mine.iterrows()] s.walking_suggestions = loaded if loaded: # Land on the first non-dismissed if any, else the first row first_active = next( (i for i, seg in enumerate(loaded) if not seg["dismissed"]), 0, ) s.walking_suggestion_idx = first_active self._sync_walking_overlay_cds() self._update_walking_nav_state()
def _sync_walking_overlay_cds(self): """Push only non-dismissed segments into the plot overlay CDS.""" s = self.state active = [ seg for seg in s.walking_suggestions if not seg["dismissed"] ] s.annotation_cds["walking_suggestion"].data = { "start_time": [seg["start_time"] for seg in active], "end_time": [seg["end_time"] for seg in active], }
[docs] def detect_walking(self): """Run sustained-harmonic-walking detection on the entire file. Loads the whole HDF5 file (not just the visible window), scans it with the SHW algorithm, persists the result to the user's walking-suggestions xlsx, then populates the in-memory list and plot overlay with the non-deleted entries. Dismissals from prior sessions for the same file are preserved by matching (start_epoch, end_epoch). """ from .data_loading import ( get_full_filedata, load_walking_suggestions, save_walking_suggestions, ) from .walking_detection import detect_walking_segments s = self.state basename = os.path.basename(s.fname) self._notify("Scanning file for walking segments…", duration=4000) try: pdf_full = get_full_filedata(s.fname) except Exception as ex: print(f"Walking detect: failed to load {s.fname}: {ex}") self._notify("Could not load file for detection.", duration=3000, kind="error") return segments = detect_walking_segments(pdf_full) # Preserve prior dismissals for this file existing = load_walking_suggestions() deleted_keys = set() if not existing.empty: this_file = existing.loc[existing["fname"] == basename] for _, row in this_file.iterrows(): if bool(row.get("deleted", False)): deleted_keys.add( (float(row["start_epoch"]), float(row["end_epoch"])) ) # Build fresh rows for this file, marking previously-dismissed ones detected_at = str(pd.Timestamp.now()) new_rows = [] for seg in segments: start_epoch = pd.to_datetime(seg["start_time"]).timestamp() end_epoch = pd.to_datetime(seg["end_time"]).timestamp() new_rows.append( { "fname": basename, "start_time": str(seg["start_time"]), "end_time": str(seg["end_time"]), "start_epoch": start_epoch, "end_epoch": end_epoch, "duration_s": seg["duration_s"], "mean_step_freq_hz": seg["mean_step_freq_hz"], "detected_at": detected_at, "deleted": (start_epoch, end_epoch) in deleted_keys, } ) # Replace just this file's rows; keep rows for other files intact other_rows = ( existing.loc[existing["fname"] != basename] if not existing.empty else existing ) merged = pd.concat( [other_rows, pd.DataFrame(new_rows, columns=WALKING_SUGGESTION_COLUMNS)], ignore_index=True, ) save_walking_suggestions(merged) # In-memory list: store all segments (including dismissed) so the # UI can render dismissed ones as red rows that can be toggled. in_memory = [] for seg, row in zip(segments, new_rows): in_memory.append( { "start_time": seg["start_time"], "end_time": seg["end_time"], "duration_s": seg["duration_s"], "mean_step_freq_hz": seg["mean_step_freq_hz"], "dismissed": bool(row["deleted"]), } ) s.walking_suggestions = in_memory # Land on the first non-dismissed entry, or 0 if everything's dismissed if in_memory: s.walking_suggestion_idx = next( (i for i, seg in enumerate(in_memory) if not seg["dismissed"]), 0, ) else: s.walking_suggestion_idx = None self._sync_walking_overlay_cds() self._update_walking_nav_state() dismissed_count = sum(1 for seg in in_memory if seg["dismissed"]) active_count = len(in_memory) - dismissed_count if not in_memory: self._notify("No sustained walking detected.", duration=3000) return if active_count == 0: self._notify( f"All {len(in_memory)} detected segments are dismissed.", duration=3000, ) return suffix = "s" if active_count != 1 else "" dismissed_note = ( f" ({dismissed_count} dismissed)" if dismissed_count else "" ) self._notify( f"Found {active_count} walking segment{suffix}{dismissed_note}.", duration=3000, ) # Jump to the first non-dismissed candidate so the user sees one in context self._navigate_to_current_walking()
[docs] def jump_to_walking_suggestion(self, idx): """Jump directly to the i-th walking candidate (from the list click).""" s = self.state if not s.walking_suggestions: return if idx < 0 or idx >= len(s.walking_suggestions): return s.walking_suggestion_idx = idx self._navigate_to_current_walking() self._update_walking_nav_state()
[docs] def toggle_walking_dismissed(self, idx): """Flip the ``dismissed`` flag on a single walking candidate. Dismissed segments stay in the list (rendered red) so the user can reinstate them with another click on the ✕. The plot overlay shows only non-dismissed segments. Change is persisted immediately to the shared walking-suggestions xlsx. """ from .data_loading import ( load_walking_suggestions, save_walking_suggestions, ) s = self.state if not s.walking_suggestions: return if idx < 0 or idx >= len(s.walking_suggestions): return seg = s.walking_suggestions[idx] new_state = not seg["dismissed"] seg["dismissed"] = new_state # Refresh the plot overlay so the dismissed segment disappears # (or reappears) from the orange dashed boxes self._sync_walking_overlay_cds() # Persist via (fname, start_epoch, end_epoch) match — exact across # the Excel round-trip because epoch is stored as a float. try: existing = load_walking_suggestions() if not existing.empty: basename = os.path.basename(s.fname) start_epoch = pd.to_datetime(seg["start_time"]).timestamp() end_epoch = pd.to_datetime(seg["end_time"]).timestamp() mask = ( (existing["fname"] == basename) & (existing["start_epoch"].astype(float) == start_epoch) & (existing["end_epoch"].astype(float) == end_epoch) ) if mask.any(): existing.loc[mask, "deleted"] = new_state save_walking_suggestions(existing) except Exception as ex: print(f"Walking toggle: failed to persist deleted flag: {ex}") self._update_walking_nav_state()
[docs] def clear_walking_suggestions(self): """Discard the current candidate list and clear the overlay.""" self.state.clear_walking_suggestions() self._update_walking_nav_state()
def _navigate_to_current_walking(self): """Recenter the time window on the active walking candidate.""" s = self.state if s.walking_suggestion_idx is None: return seg = s.walking_suggestions[s.walking_suggestion_idx] # Center the window on the candidate's midpoint so the annotator # sees context on both sides. Keeps the current windowsize — # the user can zoom independently if they want a tighter view. midpoint = seg["start_time"] + (seg["end_time"] - seg["start_time"]) / 2 s.anchor_timestamp = midpoint.strftime(TIME_FMT) self.w["time_input"].value = s.anchor_timestamp self.update_plot() def _update_walking_nav_state(self): """Refresh the walking-detection status pane and button states.""" s = self.state segs = s.walking_suggestions idx = s.walking_suggestion_idx # Status text if not segs: html = ( "<div style='font-size:11px;color:#666;padding:4px 0;'>" "No walking suggestions yet.</div>" ) else: seg = segs[idx] dismissed_count = sum(1 for x in segs if x.get("dismissed", False)) tail = ( f" &middot; <span style='color:#c62828;'>{dismissed_count} " f"dismissed</span>" if dismissed_count else "" ) current_note = ( " <span style='color:#c62828;'>(dismissed)</span>" if seg.get("dismissed", False) else "" ) html = ( f"<div style='font-size:11px;padding:4px 0;'>" f"<b>{idx + 1} / {len(segs)}</b>" f"{tail}<br>" f"{seg['duration_s']:.0f}s &middot; " f"{seg['mean_step_freq_hz']:.2f} Hz{current_note}" f"</div>" ) if "walking_status" in self.w: self.w["walking_status"].object = html # Clear button: disabled when nothing to clear if "btn_clear_walking" in self.w: self.w["btn_clear_walking"].disabled = not segs # Scrollable click-to-jump list if "walking_list_col" in self.w: self._populate_walking_list() def _populate_walking_list(self): """Rebuild the click-to-jump button list of detected segments. Each row is a (jump_button, ✕_button) pair. The ✕ toggles the segment's ``dismissed`` state; dismissed rows are rendered with a red, struck-through label. The wide jump button still navigates regardless of dismissed state, so users can review a dismissed segment without reinstating it. """ import panel as pn col = self.w["walking_list_col"] segs = self.state.walking_suggestions active_idx = self.state.walking_suggestion_idx if not segs: col.objects = [] return jump_css_active = [ ":host { font-size: 10px; flex: 1 1 auto !important; }" "button { text-align: left !important; padding: 2px 6px !important; " "white-space: nowrap !important; overflow: hidden !important; " "text-overflow: ellipsis !important; }" ] # Dismissed rows: muted background, red strike-through text. jump_css_dismissed = [ ":host { font-size: 10px; flex: 1 1 auto !important; }" "button { text-align: left !important; padding: 2px 6px !important; " "white-space: nowrap !important; overflow: hidden !important; " "text-overflow: ellipsis !important; " "color: #c62828 !important; text-decoration: line-through !important; " "background: #fce4ec !important; border-color: #f4c2c2 !important; }" ] toggle_css = [ ":host { font-size: 10px; flex: 0 0 24px !important; }" "button { padding: 2px 4px !important; color: #888 !important; }" ] rows = [] for i, seg in enumerate(segs): label = ( f"{seg['start_time'].strftime('%b %d %H:%M:%S')} " f{seg['duration_s']:.0f}s · {seg['mean_step_freq_hz']:.2f} Hz" ) is_active = i == active_idx is_dismissed = bool(seg.get("dismissed", False)) if is_dismissed: btn_type = "light" css = jump_css_dismissed else: btn_type = "success" if is_active else "light" css = jump_css_active jump_btn = pn.widgets.Button( name=label, button_type=btn_type, margin=(1, 0), stylesheets=css, ) jump_btn.on_click(lambda e, idx=i: self.jump_to_walking_suggestion(idx)) toggle_btn = pn.widgets.Button( name="✕", button_type="light", width=24, margin=(1, 0), stylesheets=toggle_css, ) toggle_btn.on_click(lambda e, idx=i: self.toggle_walking_dismissed(idx)) rows.append( pn.Row(jump_btn, toggle_btn, sizing_mode="stretch_width", margin=(0, 0)) ) col.objects = rows