Episode 3: Sending UDP Streams
In this episode, you learn how to send EEG data through a UDP (User Datagram Protocol) interface using g.Pype.
Note
This page is still under development. Until we have the step-by-step instructions ready, please refer to the code example below.
File example_basic_udp_send.py (send UDP stream) – View file on GitHub
1"""
2Basic UDP Send Example - Network Data Transmission via UDP Protocol
3
4This example demonstrates how to stream data over IP networks using the UDP
5(User Datagram Protocol) for real-time BCI data transmission. UDP provides
6low-latency, connectionless communication ideal for streaming applications
7where speed is more important than guaranteed delivery.
8
9What this example shows:
10- Generating synthetic 8-channel EEG-like signals with noise
11- Capturing keyboard events as experimental markers
12- Combining signal and event data using Router
13- Streaming combined data over UDP network protocol
14- Headless operation (no GUI) for dedicated streaming servers
15
16Expected behavior:
17When you run this example:
18- UDP packets are sent to the configured network destination
19- Data includes 8 signal channels + 1 event channel (9 total)
20- Keyboard events are transmitted as numerical markers
21- Console shows "Pipeline is running. Press enter to stop."
22- Network clients can receive the UDP stream for analysis
23
24Network streaming details:
25- Protocol: UDP (User Datagram Protocol)
26- Port: Configurable in UDPSender
27- Data format: Binary packed multi-channel samples
28- Packet rate: 250 Hz (one packet per sample frame)
29- Destination: Broadcast or specific IP address
30
31Real-world applications:
32- Low-latency BCI control systems
33- Real-time signal monitoring across networks
34- Distributed processing (send to analysis computers)
35- Integration with custom analysis software
36- Multi-computer BCI setups
37- Remote data logging and backup systems
38
39UDP vs other protocols:
40- UDP: Fast, low-latency, no connection overhead (used here)
41- TCP: Reliable but higher latency (use for file transfer)
42- LSL: Specialized for neuroscience (use for research integration)
43- WebSockets: Browser-based applications
44
45Network configuration:
46- Sender: This g.Pype application (data source)
47- Receiver: Custom application or example_basic_udp_receive.py
48- Firewall: May need to allow UDP traffic on specified port
49- Local network: Works on same subnet by default
50
51Usage:
52 1. Configure network settings in UDPSender if needed
53 2. Run: python example_basic_udp_send.py
54 3. Use example_basic_udp_receive.py or custom client to receive
55 4. Press arrow keys to send event markers over network
56 5. Press Enter in console to stop streaming
57
58Note:
59 UDP is ideal for real-time applications where occasional packet loss
60 is acceptable in exchange for minimal latency and overhead.
61"""
62import gpype as gp
63
64fs = 250 # Sampling frequency in Hz
65
66if __name__ == "__main__":
67 # Create processing pipeline (no GUI needed for UDP streaming)
68 p = gp.Pipeline()
69
70 # Generate synthetic 8-channel EEG-like signals
71 source = gp.Generator(
72 sampling_rate=fs,
73 channel_count=8, # 8 EEG channels
74 signal_frequency=10, # 10 Hz alpha rhythm
75 signal_amplitude=10, # Signal strength
76 signal_shape="sine", # Clean sine waves
77 noise_amplitude=10,
78 ) # Background noise
79
80 # Capture keyboard input as event markers
81 keyboard = gp.Keyboard() # Arrow keys -> event codes
82
83 # Combine signal data (8 channels) + keyboard events (1 channel)
84 router = gp.Router(input_selector=[gp.Router.ALL, gp.Router.ALL])
85
86 # UDP sender for low-latency network streaming
87 sender = gp.UDPSender() # Streams to configured UDP destination
88
89 # Connect processing chain: signals + events -> UDP network stream
90 p.connect(source, router["in1"]) # Signal data -> Router input 1
91 p.connect(keyboard, router["in2"]) # Event data -> Router input 2
92 p.connect(router, sender) # Combined data -> UDP stream
93
94 # Start headless UDP streaming operation
95 p.start() # Begin UDP data transmission
96 input("Pipeline is running. Press enter to stop.") # Wait for user
97 p.stop() # Stop streaming and cleanup
File example_basic_udp_receive.py (standalone UDP receiver) – View file on GitHub
1"""
2Basic UDP Receive Example - Network Data Reception and Visualization
3
4This example demonstrates how to receive and visualize data transmitted over
5UDP networks in real-time. It complements example_basic_udp_send.py by showing
6the receiving side of UDP communication, creating a standalone visualization
7application for UDP data streams.
8
9What this example shows:
10- UDP socket programming for real-time data reception
11- Multi-channel time-series visualization using PyQtGraph
12- Binary data unpacking from network packets
13- EEG display with channel stacking
14- Continuous visualization in real-time scope
15
16Expected behavior:
17When you run this example:
18- Opens UDP socket on localhost:56000 for incoming data
19- Displays real-time visualization window
20- Shows incoming multi-channel data as scrolling time-series plots
21- Updates display at ~25 Hz for smooth real-time visualization
22- Handles network timing variations and packet buffering
23
24Workflow with UDP Send example:
251. Run this script first (starts UDP receiver and visualization)
262. Run example_basic_udp_send.py (connects and streams data)
273. Press arrow keys in the send example to see event markers
284. Observe real-time data updates in the visualization
29
30Network configuration:
31- Protocol: UDP (User Datagram Protocol)
32- IP Address: 127.0.0.1 (localhost)
33- Port: 56000 (configurable)
34- Data format: Binary packed float64 arrays
35- Packet size: Configurable frame size × channel count × 8 bytes
36
37Real-world applications:
38- Real-time BCI data monitoring and quality assessment
39- Network-based signal analysis and processing
40- Distributed BCI systems with remote visualization
41- Integration with custom data acquisition hardware
42- Multi-computer BCI setups for specialized processing
43- Remote data logging and backup systems
44
45Usage:
46 1. Run: python example_basic_udp_receive.py
47 2. Visualization window opens and waits for UDP data
48 3. Run example_basic_udp_send.py to start data transmission
49 4. Close window to stop reception
50
51Prerequisites:
52 - pyqtgraph (pip install pyqtgraph)
53 - PySide6 (pip install PySide6)
54 - Active UDP data source on the configured port
55
56Note:
57 UDP provides fast, low-latency communication ideal for real-time BCI
58 applications where speed is prioritized over guaranteed packet delivery.
59"""
60
61import socket
62import numpy as np
63import pyqtgraph as pg
64from pyqtgraph.Qt import QtWidgets, QtCore
65from PySide6.QtGui import QPalette, QColor
66import sys
67
68# Network and display configuration constants
69UDP_IP = "127.0.0.1" # Listen on localhost
70UDP_PORT = 56000 # UDP port for incoming data
71FRAME_SIZE = 1 # Samples per UDP packet
72CHANNEL_COUNT = 9 # Total channels (8 signals + 1 events)
73SAMPLING_RATE = 250 # Expected sampling rate in Hz
74TIME_WINDOW = 10 # Seconds of data to display
75AMPLITUDE_LIMIT = 50 # µV - amplitude scaling for display
76
77# Calculated constants for data handling
78MAX_POINTS = int(TIME_WINDOW * SAMPLING_RATE) # Total points in buffer
79BUFFER_SIZE = 65536 # UDP receive buffer size
80
81
82class UDPTimeScope(QtWidgets.QMainWindow):
83 """
84 Real-time UDP data visualization application.
85
86 Creates a time-series display for multi-channel data received
87 via UDP. Implements circular buffering and real-time plotting
88 with PyQtGraph for smooth visualization of streaming data.
89 """
90
91 def __init__(self):
92 super().__init__()
93 self.setWindowTitle("UDP Time Series Scope")
94
95 # Get system colors for professional appearance
96 palette = self.palette()
97 self.foreground_color = palette.color(QPalette.ColorRole.WindowText)
98 self.background_color = palette.color(QPalette.ColorRole.Window)
99
100 # Create main plot widget with scientific visualization styling
101 self.plot_widget = pg.PlotWidget()
102 self.setCentralWidget(self.plot_widget)
103 self.plot_item = self.plot_widget.getPlotItem()
104 self.plot_item.showGrid(x=True, y=True, alpha=0.3) # Subtle grid
105 # Disable mouse interaction for fixed view
106 self.plot_item.getViewBox().setMouseEnabled(x=False, y=False)
107 self.plot_widget.setBackground(self.background_color)
108
109 # Configure axes labels and ranges for EEG-style display
110 self.plot_item.setLabels(left="Channels", bottom="Time (s)")
111 self.plot_item.setYRange(0, CHANNEL_COUNT) # Stack channels vertically
112
113 # Create channel labels (CH1, CH2, etc.) positioned at channel centers
114 self.plot_item.getAxis("left").setTicks(
115 [
116 [
117 (CHANNEL_COUNT - i - 0.5, f"CH{i + 1}")
118 for i in range(CHANNEL_COUNT) # noqa: E501
119 ]
120 ]
121 )
122 self.plot_item.setXRange(-0.5, TIME_WINDOW + 0.5)
123
124 # Create individual plot curves for each channel
125 self.curves = []
126 for _ in range(CHANNEL_COUNT):
127 # Each channel gets its own curve with consistent styling
128 curve = self.plot_item.plot(
129 pen=pg.mkPen(QColor(self.foreground_color), width=1)
130 ) # noqa: E501
131 self.curves.append(curve)
132
133 # Initialize data buffers for circular buffering
134 self.t_vec = np.arange(MAX_POINTS) / SAMPLING_RATE # Time vector
135 # Circular buffer for multi-channel data storage
136 self.data_buffer = np.zeros((MAX_POINTS, CHANNEL_COUNT))
137 self.sample_index = 0 # Current position in circular buffer
138
139 # Set up UDP socket for non-blocking data reception
140 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
141 self.sock.bind((UDP_IP, UDP_PORT)) # Bind to specified address/port
142 self.sock.setblocking(False) # Non-blocking for real-time operation
143
144 # Set up timer for regular plot updates (independent of data rate)
145 self.timer = QtCore.QTimer()
146 self.timer.timeout.connect(self.update_plot)
147 self.timer.start(40) # ~25 Hz refresh rate for smooth visualization
148
149 self._last_second = None # Track time axis updates
150
151 def update_plot(self):
152 """
153 Main update loop called by timer for real-time visualization.
154
155 Handles UDP packet reception, data buffering, and plot updates.
156 Runs at ~25 Hz for smooth visualization independent of data rate.
157 """
158 # Receive and process all available UDP packets
159 try:
160 while True:
161 # Non-blocking receive - get packet if available
162 packet, _ = self.sock.recvfrom(BUFFER_SIZE)
163
164 # Unpack binary data: convert bytes to float64 array
165 frame = np.frombuffer(packet, dtype=np.float64)
166 frame = frame.reshape((FRAME_SIZE, CHANNEL_COUNT))
167
168 # Store in circular buffer with proper indexing
169 idx = self.sample_index + np.arange(FRAME_SIZE)
170 idx %= MAX_POINTS # Wrap around for circular buffering
171 self.data_buffer[idx, :] = frame
172 self.sample_index += FRAME_SIZE
173
174 except BlockingIOError:
175 # No data available - continue with plot update
176 pass
177
178 # Update visualization with decimated data for performance
179 # Decimate data based on window width to avoid oversampling display
180 N = max(1, int(MAX_POINTS / self.width()))
181 display = self.data_buffer[::N] # Take every Nth sample
182 t_disp = self.t_vec[::N] # Corresponding time points
183
184 # Update each channel curve with normalized and offset data
185 for i, curve in enumerate(self.curves):
186 # Stack channels vertically with amplitude scaling
187 offset = CHANNEL_COUNT - i - 0.5 # Vertical position
188 # Normalize amplitude and add channel offset
189 curve.setData(t_disp, display[:, i] / AMPLITUDE_LIMIT / 2 + offset)
190
191 # Update time axis labels for scrolling display
192 cur_second = int(
193 np.floor((self.sample_index % MAX_POINTS) / SAMPLING_RATE)
194 ) # noqa: E501
195 if cur_second != self._last_second:
196 time_window = TIME_WINDOW
197 if self.sample_index > MAX_POINTS:
198 # Scrolling mode: calculate proper time labels
199 ticks = []
200 for i in range(int(np.floor(time_window)) + 1):
201 tick_val = (
202 np.mod(i - (cur_second + 1), time_window)
203 + cur_second
204 + 1
205 ) # noqa: E501
206 offset = (
207 np.floor(self.sample_index / MAX_POINTS - 1)
208 * time_window
209 ) # noqa: E501
210 tick_val += offset
211 tick_label = f"{tick_val:.0f}"
212 ticks.append((i, tick_label))
213 else:
214 # Initial filling mode: simple sequential labels
215 ticks = [
216 (i, f"{i:.0f}" if i <= cur_second else "")
217 for i in range(int(np.floor(time_window)) + 1)
218 ]
219
220 # Apply new tick labels to time axis
221 self.plot_item.getAxis("bottom").setTicks([ticks])
222 self._last_second = cur_second
223
224
225if __name__ == "__main__":
226 # Create Qt application for GUI event loop
227 app = QtWidgets.QApplication(sys.argv)
228
229 # Create and configure main window
230 window = UDPTimeScope()
231 window.resize(1000, 500) # Set reasonable window size
232 window.show()
233
234 # Start application event loop (blocks until window closes)
235 sys.exit(app.exec())