Source code for gpype.frontend.widgets.spectrum_scope

import threading
import time

import ioiocore as ioc
import numpy as np
import pyqtgraph as pg
from PySide6.QtCore import QEvent, QObject, Qt
from PySide6.QtGui import QColor, QFont, QPalette

from ...backend.core.i_port import IPort
from ...common.constants import Constants
from .base.scope import Scope

PORT_IN = ioc.Constants.Defaults.PORT_IN


[docs] class SpectrumScope(Scope): """Frequency domain visualization widget for spectral analysis. Displays real-time frequency spectrum of input signals with configurable amplitude limits and averaging. Shows multiple channels with automatic scaling and frequency axis labeling. """ DEFAULT_AMPLITUDE_LIMIT = 50 DEFAULT_NUM_AVERAGES = 10
[docs] class Configuration(Scope.Configuration):
[docs] class Keys(Scope.Configuration.Keys): AMPLITUDE_LIMIT = "amplitude_limit" NUM_AVERAGES = "num_averages"
[docs] class KeysOptional: HIDDEN_CHANNELS = "hidden_channels"
[docs] def __init__( self, amplitude_limit: float = None, num_averages: int = None, hidden_channels: list = None, **kwargs, ): """Initialize the SpectrumScope widget. Args: amplitude_limit (float, optional): Maximum amplitude for display scaling. Defaults to 50. num_averages (int, optional): Number of spectra to average. Defaults to 10. hidden_channels (list, optional): List of channel indices to hide. Defaults to empty list. **kwargs: Additional arguments passed to parent classes. """ if amplitude_limit is None: amplitude_limit = self.DEFAULT_AMPLITUDE_LIMIT if num_averages is None: num_averages = self.DEFAULT_NUM_AVERAGES if amplitude_limit > 5e3 or amplitude_limit < 1: raise ValueError("amplitude_limit without reasonable range.") if hidden_channels is None: hidden_channels = [] input_ports = [IPort.Configuration(name=PORT_IN)] Scope.__init__( self, input_ports=input_ports, amplitude_limit=amplitude_limit, name="Spectrum Scope", hidden_channels=hidden_channels, num_averages=num_averages, **kwargs, ) self._max_points: int = None self._data_buffer: np.ndarray = None self._display_buffer: np.ndarray = None self._plot_index: int = 0 self._buffer_full: bool = False self._sample_index: int = 0 self._start_time = time.time() self._update_counts = 0 self._step_counts = 0 self._step_rate = 0 self._lock = threading.Lock() self._new_data = False self._rate_label = None p = self.widget.palette() self._foreground_color = p.color(QPalette.ColorRole.WindowText) self._background_color = p.color(QPalette.ColorRole.Window)
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Set up the spectrum scope with frequency vector and channels. Args: data (dict): Initial data dictionary. port_context_in (dict): Input port context information. Returns: dict: Output port context from parent setup. Raises: ValueError: If required parameters are missing or invalid. """ c = port_context_in[PORT_IN] sampling_rate = c.get(Constants.Keys.SAMPLING_RATE) if sampling_rate is None: raise ValueError("sampling rate must be provided.") channel_count = c.get(Constants.Keys.CHANNEL_COUNT) if channel_count is None: raise ValueError("channel count must be provided.") frame_size = c.get(Constants.Keys.FRAME_SIZE) if frame_size is None: raise ValueError("frame size must be provided.") if frame_size <= 1: raise ValueError("frame size must be greater than 1.") self._f_vec = np.fft.rfftfreq(frame_size, 1 / sampling_rate) hidden_channels = self.config[ self.Configuration.KeysOptional.HIDDEN_CHANNELS ] self._channel_vec = [ i for i in range(channel_count) if i not in hidden_channels ] self._channel_count = len(self._channel_vec) self._frame_size = frame_size self._sampling_rate = sampling_rate self._data_buffer: list = [] self._num_averages = self.config[self.Configuration.Keys.NUM_AVERAGES] self._display_buffer = np.zeros((frame_size, self._channel_count)) self._new_data = False self._start_time = time.time() return super().setup(data, port_context_in)
def _update(self): """Update the spectrum display with current averaged data. Called periodically by the widget timer to refresh the frequency domain visualization with averaged spectral data. """ if not self._new_data: return # Set up UI elements. Note that this has to be done in the main Qt # thread (like this) ylim = (0, self._channel_count) if self._curves is None: # Create curves [self.add_curve() for _ in range(self._channel_count)] amp_lim = self.config[self.Configuration.Keys.AMPLITUDE_LIMIT] yl = f"EEG Amplitudes (0 ... {amp_lim} µV)" self.set_labels(x_label="Frequency (Hz)", y_label=yl) ticks = [ ( self._channel_count - i - 0.5, f"CH{self._channel_vec[i] + 1}", ) for i in range(self._channel_count) ] self._plot_item.getAxis("left").setTicks([ticks]) self._plot_item.setYRange(*ylim) with self._lock: if not self._data_buffer: return self._display_buffer = np.mean( np.stack(self._data_buffer, axis=2), axis=2 ) self._display_buffer = np.abs(self._display_buffer) self._new_data = False ch_lim_key = self.Configuration.Keys.AMPLITUDE_LIMIT ch_lim = self.config[ch_lim_key] for i in range(len(self._channel_vec)): d = self._channel_count - i - 0.5 self._curves[i].setData( self._f_vec, self._display_buffer[:, self._channel_vec[i]] / ch_lim / 2 + d, antialias=False, ) # update xlim fw = self._f_vec[-1] margin = fw * 0.0125 xlim = (-margin, fw + margin) self._plot_item.setXRange(*xlim)
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Process incoming FFT data and update buffer for averaging. Args: data (dict): Dictionary containing FFT amplitude data. Returns: dict: Unchanged input data (pass-through). """ with self._lock: fft_input = data[PORT_IN] self._data_buffer.append(fft_input) if len(self._data_buffer) > self._num_averages: self._data_buffer.pop(0) self._new_data = True