Episode 3: Sending UDP Streams

In this episode, you learn how to send EEG data through a UDP (User Datagram Protocol) interface using g.Pype.

Note

This page is still under development. Until we have the step-by-step instructions ready, please refer to the code example below.

File example_basic_udp_send.py (send UDP stream)View file on GitHub

 1"""
 2Basic UDP Send Example - Network Data Transmission via UDP Protocol
 3
 4This example demonstrates how to stream data over IP networks using the UDP
 5(User Datagram Protocol) for real-time BCI data transmission. UDP provides
 6low-latency, connectionless communication ideal for streaming applications
 7where speed is more important than guaranteed delivery.
 8
 9What this example shows:
10- Generating synthetic 8-channel EEG-like signals with noise
11- Capturing keyboard events as experimental markers
12- Combining signal and event data using Router
13- Streaming combined data over UDP network protocol
14- Headless operation (no GUI) for dedicated streaming servers
15
16Expected behavior:
17When you run this example:
18- UDP packets are sent to the configured network destination
19- Data includes 8 signal channels + 1 event channel (9 total)
20- Keyboard events are transmitted as numerical markers
21- Console shows "Pipeline is running. Press enter to stop."
22- Network clients can receive the UDP stream for analysis
23
24Network streaming details:
25- Protocol: UDP (User Datagram Protocol)
26- Port: Configurable in UDPSender
27- Data format: Binary packed multi-channel samples
28- Packet rate: 250 Hz (one packet per sample frame)
29- Destination: Broadcast or specific IP address
30
31Real-world applications:
32- Low-latency BCI control systems
33- Real-time signal monitoring across networks
34- Distributed processing (send to analysis computers)
35- Integration with custom analysis software
36- Multi-computer BCI setups
37- Remote data logging and backup systems
38
39UDP vs other protocols:
40- UDP: Fast, low-latency, no connection overhead (used here)
41- TCP: Reliable but higher latency (use for file transfer)
42- LSL: Specialized for neuroscience (use for research integration)
43- WebSockets: Browser-based applications
44
45Network configuration:
46- Sender: This g.Pype application (data source)
47- Receiver: Custom application or example_basic_udp_receive.py
48- Firewall: May need to allow UDP traffic on specified port
49- Local network: Works on same subnet by default
50
51Usage:
52    1. Configure network settings in UDPSender if needed
53    2. Run: python example_basic_udp_send.py
54    3. Use example_basic_udp_receive.py or custom client to receive
55    4. Press arrow keys to send event markers over network
56    5. Press Enter in console to stop streaming
57
58Note:
59    UDP is ideal for real-time applications where occasional packet loss
60    is acceptable in exchange for minimal latency and overhead.
61"""
62import gpype as gp
63
64fs = 250  # Sampling frequency in Hz
65
66if __name__ == "__main__":
67    # Create processing pipeline (no GUI needed for UDP streaming)
68    p = gp.Pipeline()
69
70    # Generate synthetic 8-channel EEG-like signals
71    source = gp.Generator(
72        sampling_rate=fs,
73        channel_count=8,  # 8 EEG channels
74        signal_frequency=10,  # 10 Hz alpha rhythm
75        signal_amplitude=10,  # Signal strength
76        signal_shape="sine",  # Clean sine waves
77        noise_amplitude=10,
78    )  # Background noise
79
80    # Capture keyboard input as event markers
81    keyboard = gp.Keyboard()  # Arrow keys -> event codes
82
83    # Combine signal data (8 channels) + keyboard events (1 channel)
84    router = gp.Router(input_selector=[gp.Router.ALL, gp.Router.ALL])
85
86    # UDP sender for low-latency network streaming
87    sender = gp.UDPSender()  # Streams to configured UDP destination
88
89    # Connect processing chain: signals + events -> UDP network stream
90    p.connect(source, router["in1"])  # Signal data -> Router input 1
91    p.connect(keyboard, router["in2"])  # Event data -> Router input 2
92    p.connect(router, sender)  # Combined data -> UDP stream
93
94    # Start headless UDP streaming operation
95    p.start()  # Begin UDP data transmission
96    input("Pipeline is running. Press enter to stop.")  # Wait for user
97    p.stop()  # Stop streaming and cleanup

File example_basic_udp_receive.py (standalone UDP receiver)View file on GitHub

  1"""
  2Basic UDP Receive Example - Network Data Reception and Visualization
  3
  4This example demonstrates how to receive and visualize data transmitted over
  5UDP networks in real-time. It complements example_basic_udp_send.py by showing
  6the receiving side of UDP communication, creating a standalone visualization
  7application for UDP data streams.
  8
  9What this example shows:
 10- UDP socket programming for real-time data reception
 11- Multi-channel time-series visualization using PyQtGraph
 12- Binary data unpacking from network packets
 13- EEG display with channel stacking
 14- Continuous visualization in real-time scope
 15
 16Expected behavior:
 17When you run this example:
 18- Opens UDP socket on localhost:56000 for incoming data
 19- Displays real-time visualization window
 20- Shows incoming multi-channel data as scrolling time-series plots
 21- Updates display at ~25 Hz for smooth real-time visualization
 22- Handles network timing variations and packet buffering
 23
 24Workflow with UDP Send example:
 251. Run this script first (starts UDP receiver and visualization)
 262. Run example_basic_udp_send.py (connects and streams data)
 273. Press arrow keys in the send example to see event markers
 284. Observe real-time data updates in the visualization
 29
 30Network configuration:
 31- Protocol: UDP (User Datagram Protocol)
 32- IP Address: 127.0.0.1 (localhost)
 33- Port: 56000 (configurable)
 34- Data format: Binary packed float64 arrays
 35- Packet size: Configurable frame size × channel count × 8 bytes
 36
 37Real-world applications:
 38- Real-time BCI data monitoring and quality assessment
 39- Network-based signal analysis and processing
 40- Distributed BCI systems with remote visualization
 41- Integration with custom data acquisition hardware
 42- Multi-computer BCI setups for specialized processing
 43- Remote data logging and backup systems
 44
 45Usage:
 46    1. Run: python example_basic_udp_receive.py
 47    2. Visualization window opens and waits for UDP data
 48    3. Run example_basic_udp_send.py to start data transmission
 49    4. Close window to stop reception
 50
 51Prerequisites:
 52    - pyqtgraph (pip install pyqtgraph)
 53    - PySide6 (pip install PySide6)
 54    - Active UDP data source on the configured port
 55
 56Note:
 57    UDP provides fast, low-latency communication ideal for real-time BCI
 58    applications where speed is prioritized over guaranteed packet delivery.
 59"""
 60
 61import socket
 62import numpy as np
 63import pyqtgraph as pg
 64from pyqtgraph.Qt import QtWidgets, QtCore
 65from PySide6.QtGui import QPalette, QColor
 66import sys
 67
 68# Network and display configuration constants
 69UDP_IP = "127.0.0.1"  # Listen on localhost
 70UDP_PORT = 56000  # UDP port for incoming data
 71FRAME_SIZE = 1  # Samples per UDP packet
 72CHANNEL_COUNT = 9  # Total channels (8 signals + 1 events)
 73SAMPLING_RATE = 250  # Expected sampling rate in Hz
 74TIME_WINDOW = 10  # Seconds of data to display
 75AMPLITUDE_LIMIT = 50  # µV - amplitude scaling for display
 76
 77# Calculated constants for data handling
 78MAX_POINTS = int(TIME_WINDOW * SAMPLING_RATE)  # Total points in buffer
 79BUFFER_SIZE = 65536  # UDP receive buffer size
 80
 81
 82class UDPTimeScope(QtWidgets.QMainWindow):
 83    """
 84    Real-time UDP data visualization application.
 85
 86    Creates a time-series display for multi-channel data received
 87    via UDP. Implements circular buffering and real-time plotting
 88    with PyQtGraph for smooth visualization of streaming data.
 89    """
 90
 91    def __init__(self):
 92        super().__init__()
 93        self.setWindowTitle("UDP Time Series Scope")
 94
 95        # Get system colors for professional appearance
 96        palette = self.palette()
 97        self.foreground_color = palette.color(QPalette.ColorRole.WindowText)
 98        self.background_color = palette.color(QPalette.ColorRole.Window)
 99
100        # Create main plot widget with scientific visualization styling
101        self.plot_widget = pg.PlotWidget()
102        self.setCentralWidget(self.plot_widget)
103        self.plot_item = self.plot_widget.getPlotItem()
104        self.plot_item.showGrid(x=True, y=True, alpha=0.3)  # Subtle grid
105        # Disable mouse interaction for fixed view
106        self.plot_item.getViewBox().setMouseEnabled(x=False, y=False)
107        self.plot_widget.setBackground(self.background_color)
108
109        # Configure axes labels and ranges for EEG-style display
110        self.plot_item.setLabels(left="Channels", bottom="Time (s)")
111        self.plot_item.setYRange(0, CHANNEL_COUNT)  # Stack channels vertically
112
113        # Create channel labels (CH1, CH2, etc.) positioned at channel centers
114        self.plot_item.getAxis("left").setTicks(
115            [
116                [
117                    (CHANNEL_COUNT - i - 0.5, f"CH{i + 1}")
118                    for i in range(CHANNEL_COUNT)  # noqa: E501
119                ]
120            ]
121        )
122        self.plot_item.setXRange(-0.5, TIME_WINDOW + 0.5)
123
124        # Create individual plot curves for each channel
125        self.curves = []
126        for _ in range(CHANNEL_COUNT):
127            # Each channel gets its own curve with consistent styling
128            curve = self.plot_item.plot(
129                pen=pg.mkPen(QColor(self.foreground_color), width=1)
130            )  # noqa: E501
131            self.curves.append(curve)
132
133        # Initialize data buffers for circular buffering
134        self.t_vec = np.arange(MAX_POINTS) / SAMPLING_RATE  # Time vector
135        # Circular buffer for multi-channel data storage
136        self.data_buffer = np.zeros((MAX_POINTS, CHANNEL_COUNT))
137        self.sample_index = 0  # Current position in circular buffer
138
139        # Set up UDP socket for non-blocking data reception
140        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
141        self.sock.bind((UDP_IP, UDP_PORT))  # Bind to specified address/port
142        self.sock.setblocking(False)  # Non-blocking for real-time operation
143
144        # Set up timer for regular plot updates (independent of data rate)
145        self.timer = QtCore.QTimer()
146        self.timer.timeout.connect(self.update_plot)
147        self.timer.start(40)  # ~25 Hz refresh rate for smooth visualization
148
149        self._last_second = None  # Track time axis updates
150
151    def update_plot(self):
152        """
153        Main update loop called by timer for real-time visualization.
154
155        Handles UDP packet reception, data buffering, and plot updates.
156        Runs at ~25 Hz for smooth visualization independent of data rate.
157        """
158        # Receive and process all available UDP packets
159        try:
160            while True:
161                # Non-blocking receive - get packet if available
162                packet, _ = self.sock.recvfrom(BUFFER_SIZE)
163
164                # Unpack binary data: convert bytes to float64 array
165                frame = np.frombuffer(packet, dtype=np.float64)
166                frame = frame.reshape((FRAME_SIZE, CHANNEL_COUNT))
167
168                # Store in circular buffer with proper indexing
169                idx = self.sample_index + np.arange(FRAME_SIZE)
170                idx %= MAX_POINTS  # Wrap around for circular buffering
171                self.data_buffer[idx, :] = frame
172                self.sample_index += FRAME_SIZE
173
174        except BlockingIOError:
175            # No data available - continue with plot update
176            pass
177
178        # Update visualization with decimated data for performance
179        # Decimate data based on window width to avoid oversampling display
180        N = max(1, int(MAX_POINTS / self.width()))
181        display = self.data_buffer[::N]  # Take every Nth sample
182        t_disp = self.t_vec[::N]  # Corresponding time points
183
184        # Update each channel curve with normalized and offset data
185        for i, curve in enumerate(self.curves):
186            # Stack channels vertically with amplitude scaling
187            offset = CHANNEL_COUNT - i - 0.5  # Vertical position
188            # Normalize amplitude and add channel offset
189            curve.setData(t_disp, display[:, i] / AMPLITUDE_LIMIT / 2 + offset)
190
191        # Update time axis labels for scrolling display
192        cur_second = int(
193            np.floor((self.sample_index % MAX_POINTS) / SAMPLING_RATE)
194        )  # noqa: E501
195        if cur_second != self._last_second:
196            time_window = TIME_WINDOW
197            if self.sample_index > MAX_POINTS:
198                # Scrolling mode: calculate proper time labels
199                ticks = []
200                for i in range(int(np.floor(time_window)) + 1):
201                    tick_val = (
202                        np.mod(i - (cur_second + 1), time_window)
203                        + cur_second
204                        + 1
205                    )  # noqa: E501
206                    offset = (
207                        np.floor(self.sample_index / MAX_POINTS - 1)
208                        * time_window
209                    )  # noqa: E501
210                    tick_val += offset
211                    tick_label = f"{tick_val:.0f}"
212                    ticks.append((i, tick_label))
213            else:
214                # Initial filling mode: simple sequential labels
215                ticks = [
216                    (i, f"{i:.0f}" if i <= cur_second else "")
217                    for i in range(int(np.floor(time_window)) + 1)
218                ]
219
220            # Apply new tick labels to time axis
221            self.plot_item.getAxis("bottom").setTicks([ticks])
222            self._last_second = cur_second
223
224
225if __name__ == "__main__":
226    # Create Qt application for GUI event loop
227    app = QtWidgets.QApplication(sys.argv)
228
229    # Create and configure main window
230    window = UDPTimeScope()
231    window.resize(1000, 500)  # Set reasonable window size
232    window.show()
233
234    # Start application event loop (blocks until window closes)
235    sys.exit(app.exec())