Episode 4: Oddball Paradigms
In this episode, you learn how to conduct an auditory oddball paradigm experiment using g.Pype.
Note
This page is still under development. Until we have the step-by-step instructions ready, please refer to the code example below.
File example_paradigm_aep_oddball.py – View file on GitHub
1from pathlib import Path
2import os
3
4import gpype as gp
5
6parent_dir = os.path.dirname(os.path.abspath(__file__))
7paradigm = os.path.join(parent_dir, "paradigms", "AEPOddball.xml")
8sampling_rate = 250
9channel_count = 8
10id_target = 1
11id_nontarget = 2
12
13if __name__ == "__main__":
14
15 # Create main application & pipeline
16 app = gp.MainApp()
17 p = gp.Pipeline()
18
19 # Amplifier: BCI Core-8 (uncomment if you want to use it)
20 # amp = gp.BCICore8()
21
22 # Amplifier: g.Nautilus (uncomment if you want to use it)
23 # amp = gp.GNautilus(sampling_rate=sampling_rate,
24 # channel_count=channel_count)
25
26 # Signal generator (uncomment if you want to use it)
27 amp = gp.Generator(
28 sampling_rate=sampling_rate,
29 channel_count=channel_count,
30 signal_frequency=10,
31 signal_amplitude=15,
32 signal_shape="sine",
33 noise_amplitude=10,
34 )
35
36 # Bandpass from 1 to 30 Hz
37 bandpass = gp.Bandpass(f_lo=1, f_hi=30)
38
39 # Bandstop (notch) filter for 50 and 60 Hz
40 notch50 = gp.Bandstop(f_lo=48, f_hi=52)
41 notch60 = gp.Bandstop(f_lo=58, f_hi=62)
42
43 # Presenter trigger receiver
44 trig_receiver = gp.UDPReceiver()
45
46 # Trigger data
47 trig_node_target = gp.Trigger(
48 time_pre=0.2, time_post=0.7, target=id_target
49 )
50 trig_node_nontarget = gp.Trigger(
51 time_pre=0.2, time_post=0.7, target=id_nontarget
52 )
53 # Keyboard capture
54 key_capture = gp.Keyboard()
55
56 # Time series scope
57 mk = gp.TimeSeriesScope.Markers
58 markers = [
59 mk(
60 color="#ff0000",
61 label="Target",
62 channel=channel_count,
63 value=id_target,
64 ),
65 mk(
66 color="#00aa00",
67 label="Nontarget",
68 channel=channel_count,
69 value=id_nontarget,
70 ),
71 mk(
72 color="#0000ff", label="M Key", channel=channel_count + 1, value=77
73 ),
74 ]
75 scope = gp.TimeSeriesScope(
76 amplitude_limit=50, time_window=10, markers=markers
77 )
78
79 # Trigger scope
80 trig_scope = gp.TriggerScope(
81 amplitude_limit=5, plots=["target", "nontarget", "target-nontarget"]
82 )
83
84 # Merge signals for scope and data saving
85 router_scope = gp.Router(
86 input_selector=[gp.Router.ALL, gp.Router.ALL, gp.Router.ALL]
87 )
88 router_raw = gp.Router(
89 input_selector=[gp.Router.ALL, gp.Router.ALL, gp.Router.ALL]
90 )
91
92 # File writer
93 filename = Path(paradigm).stem
94 writer = gp.FileWriter(file_name=f"{filename}.csv")
95
96 # connect amplifier to filter nodes
97 p.connect(amp, bandpass)
98 p.connect(bandpass, notch50)
99 p.connect(notch50, notch60)
100
101 # merge scope data
102 p.connect(notch60, router_scope["in1"])
103 p.connect(trig_receiver, router_scope["in2"])
104 p.connect(key_capture, router_scope["in3"])
105
106 # merge raw data
107 p.connect(amp, router_raw["in1"])
108 p.connect(trig_receiver, router_raw["in2"])
109 p.connect(key_capture, router_raw["in3"])
110
111 # connect inputs of time series scope and file writer
112 p.connect(router_scope, scope)
113 p.connect(router_raw, writer)
114
115 # connect trigger node and scope
116 p.connect(notch60, trig_node_target[gp.Constants.Defaults.PORT_IN])
117 p.connect(trig_receiver, trig_node_target[gp.Trigger.PORT_TRIGGER])
118 p.connect(notch60, trig_node_nontarget[gp.Constants.Defaults.PORT_IN])
119 p.connect(trig_receiver, trig_node_nontarget[gp.Trigger.PORT_TRIGGER])
120 p.connect(trig_node_target, trig_scope["target"])
121 p.connect(trig_node_nontarget, trig_scope["nontarget"])
122
123 # Create main app and add widgets
124 presenter = gp.ParadigmPresenter(paradigm)
125 app.add_widget(presenter)
126 app.add_widget(scope)
127 app.add_widget(trig_scope)
128
129 # start pipeline and main app
130 p.start()
131 app.run()
132 p.stop()