Source code for brian2.monitors.spikemonitor

"""
Module defining `EventMonitor` and `SpikeMonitor`.
"""

import numpy as np

from brian2.core.functions import timestep
from brian2.core.names import Nameable
from brian2.core.spikesource import SpikeSource
from brian2.core.variables import Variables
from brian2.groups.group import CodeRunner
from brian2.monitors.ratemonitor import RateMonitor
from brian2.units.allunits import hertz, second
from brian2.units.fundamentalunits import Quantity, check_units

__all__ = ["EventMonitor", "SpikeMonitor"]


[docs] class EventMonitor(RateMonitor): """ Record events from a `NeuronGroup` or another event source. The recorded events can be accessed in various ways: the attributes `~EventMonitor.i` and `~EventMonitor.t` store all the indices and event times, respectively. Alternatively, you can get a dictionary mapping neuron indices to event trains, by calling the `event_trains` method. Parameters ---------- source : `NeuronGroup`, `SpikeSource` The source of events to record. event : str The name of the event to record variables : str or sequence of str, optional Which variables to record at the time of the event (in addition to the index of the neuron). Can be the name of a variable or a list of names. record : bool, optional Whether or not to record each event in `i` and `t` (the `count` will always be recorded). Defaults to ``True``. when : str, optional When to record the events, by default records events in the same slot where the event is emitted. See :ref:`scheduling` for possible values. order : int, optional The priority of of this group for operations occurring at the same time step and in the same scheduling slot. Defaults to the order where the event is emitted + 1, i.e. it will be recorded directly afterwards. name : str, optional A unique name for the object, otherwise will use ``source.name+'_eventmonitor_0'``, etc. codeobj_class : class, optional The `CodeObject` class to run code with. See Also -------- SpikeMonitor """ invalidates_magic_network = False add_to_magic_network = True def __init__( self, source, event, variables=None, record=True, when=None, order=None, name="eventmonitor*", codeobj_class=None, ): if not isinstance(source, SpikeSource): raise TypeError( f"{self.__class__.__name__} can only monitor groups " "producing spikes (such as NeuronGroup), but the given " f"argument is of type {type(source)}." ) #: The source we are recording from self.source = source #: Whether to record times and indices of events self.record = record #: The array of event counts (length = size of target group) self.count = None del self.count # this is handled by the Variable mechanism if event not in source.events: if event == "spike": threshold_text = " Did you forget to set a 'threshold'?" else: threshold_text = "" raise ValueError( f"Recorded group '{source.name}' does not define an event " f"'{event}'.{threshold_text}" ) if when is None: if order is not None: raise ValueError("Cannot specify order if when is not specified.") # TODO: Would be nicer if there was a common way of accessing the # relevant object for NeuronGroup and SpikeGeneratorGroup if hasattr(source, "thresholder"): parent_obj = source.thresholder[event] else: parent_obj = source when = parent_obj.when order = parent_obj.order + 1 elif order is None: order = 0 #: The event that we are listening to self.event = event if variables is None: variables = {} elif isinstance(variables, str): variables = {variables} #: The additional variables that will be recorded self.record_variables = set(variables) for variable in variables: if variable not in source.variables: raise ValueError( f"'{variable}' is not a variable of the recorded group" ) if self.record: self.record_variables |= {"i", "t"} # Some dummy code so that code generation takes care of the indexing # and subexpressions code = [f"_to_record_{v} = _source_{v}" for v in sorted(self.record_variables)] code = "\n".join(code) self.codeobj_class = codeobj_class # Since this now works for general events not only spikes, we have to # pass the information about which variable to use to the template, # it can not longer simply refer to "_spikespace" eventspace_name = f"_{event}space" # Handle subgroups correctly start = getattr(source, "start", 0) stop = getattr(source, "stop", len(source)) source_N = getattr(source, "_source_N", len(source)) Nameable.__init__(self, name=name) self.variables = Variables(self) self.variables.add_reference(eventspace_name, source) for variable in self.record_variables: source_var = source.variables[variable] self.variables.add_reference(f"_source_{variable}", source, variable) self.variables.add_auxiliary_variable( f"_to_record_{variable}", dimensions=source_var.dim, dtype=source_var.dtype, ) self.variables.add_dynamic_array( variable, size=0, dimensions=source_var.dim, dtype=source_var.dtype, read_only=True, ) self.variables.add_arange("_source_idx", size=len(source)) self.variables.add_array( "count", size=len(source), dtype=np.int32, read_only=True, index="_source_idx", ) self.variables.add_constant("_source_start", start) self.variables.add_constant("_source_stop", stop) self.variables.add_constant("_source_N", source_N) self.variables.add_array( "N", size=1, dtype=np.int32, read_only=True, scalar=True ) record_variables = { varname: self.variables[varname] for varname in self.record_variables } template_kwds = { "eventspace_variable": source.variables[eventspace_name], "record_variables": record_variables, "record": self.record, } needed_variables = {eventspace_name} | self.record_variables CodeRunner.__init__( self, group=self, code=code, template="spikemonitor", name=None, # The name has already been initialized clock=source.clock, when=when, order=order, needed_variables=needed_variables, template_kwds=template_kwds, ) self.variables.create_clock_variables(self._clock, prefix="_clock_") self.add_dependency(source) self.written_readonly_vars = { self.variables[varname] for varname in self.record_variables } self._enable_group_attributes()
[docs] def resize(self, new_size): # Note that this does not set N, this has to be done in the template # since we use a restricted pointer to access it (which promises that # we only change the value through this pointer) for variable in self.record_variables: self.variables[variable].resize(new_size)
[docs] def reinit(self): """ Clears all recorded spikes """ raise NotImplementedError()
@property def it(self): """ Returns the pair (`i`, `t`). """ if not self.record: raise AttributeError( "Indices and times have not been recorded." "Set the record argument to True to record " "them." ) return self.i, self.t @property def it_(self): """ Returns the pair (`i`, `t_`). """ if not self.record: raise AttributeError( "Indices and times have not been recorded." "Set the record argument to True to record " "them." ) return self.i, self.t_ def _values_dict(self, first_pos, sort_indices, used_indices, var): sorted_values = self.state(var, use_units=False)[sort_indices] dim = self.variables[var].dim event_values = {} current_pos = 0 # position in the all_indices array for idx in range(len(self.source)): if current_pos < len(used_indices) and used_indices[current_pos] == idx: if current_pos < len(used_indices) - 1: event_values[idx] = Quantity( sorted_values[ first_pos[current_pos] : first_pos[current_pos + 1] ], dim=dim, ) else: event_values[idx] = Quantity( sorted_values[first_pos[current_pos] :], dim=dim ) current_pos += 1 else: event_values[idx] = Quantity([], dim=dim) return event_values
[docs] def values(self, var): """ Return a dictionary mapping neuron indices to arrays of variable values at the time of the events (sorted by time). Parameters ---------- var : str The name of the variable. Returns ------- values : dict Dictionary mapping each neuron index to an array of variable values at the time of the events Examples -------- >>> from brian2 import * >>> G = NeuronGroup(2, '''counter1 : integer ... counter2 : integer ... max_value : integer''', ... threshold='counter1 >= max_value', ... reset='counter1 = 0') >>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS CodeRunner(...) >>> G.max_value = [50, 100] >>> mon = EventMonitor(G, event='spike', variables='counter2') >>> run(10*ms) >>> counter2_values = mon.values('counter2') >>> print(counter2_values[0]) [ 50 100] >>> print(counter2_values[1]) [100] """ if not self.record: raise AttributeError( "Indices and times have not been recorded." "Set the record argument to True to record " "them." ) indices = self.i[:] # We have to make sure that the sort is stable, otherwise our spike # times do not necessarily remain sorted. sort_indices = np.argsort(indices, kind="mergesort") used_indices, first_pos = np.unique(self.i[:][sort_indices], return_index=True) return self._values_dict(first_pos, sort_indices, used_indices, var)
[docs] def all_values(self): """ Return a dictionary mapping recorded variable names (including ``t``) to a dictionary mapping neuron indices to arrays of variable values at the time of the events (sorted by time). This is equivalent to (but more efficient than) calling `values` for each variable and storing the result in a dictionary. Returns ------- all_values : dict Dictionary mapping variable names to dictionaries which themselves are mapping neuron indicies to arrays of variable values at the time of the events. Examples -------- >>> from brian2 import * >>> G = NeuronGroup(2, '''counter1 : integer ... counter2 : integer ... max_value : integer''', ... threshold='counter1 >= max_value', ... reset='counter1 = 0') >>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS CodeRunner(...) >>> G.max_value = [50, 100] >>> mon = EventMonitor(G, event='spike', variables='counter2') >>> run(10*ms) >>> all_values = mon.all_values() >>> print(all_values['counter2'][0]) [ 50 100] >>> print(all_values['t'][1]) [ 9.9] ms """ if not self.record: raise AttributeError( "Indices and times have not been recorded." "Set the record argument to True to record " "them." ) indices = self.i[:] sort_indices = np.argsort(indices, kind="mergesort") used_indices, first_pos = np.unique(self.i[:][sort_indices], return_index=True) all_values_dict = {} for varname in self.record_variables - {"i"}: all_values_dict[varname] = self._values_dict( first_pos, sort_indices, used_indices, varname ) return all_values_dict
[docs] def event_trains(self): """ Return a dictionary mapping neuron indices to arrays of event times. Equivalent to calling ``values('t')``. Returns ------- event_trains : dict Dictionary that stores an array with the event times for each neuron index. See Also -------- SpikeMonitor.spike_trains """ return self.values("t")
[docs] @check_units(bin_size=second) def binned_rate(self, bin_size): """ Return the event rates binned with the given bin size. Parameters ---------- bin_size : `Quantity` The size of the bins in seconds. Should be a multiple of dt. Returns ------- bins : `Quantity` The start time of the bins. binned_values : `Quantity` The binned rates as a 2D array (neurons × bins) in Hz. Notes ----- The returned bin times represent the **start** of each bin interval, not the center. This is consistent with how Brian2 records spike times and other temporal data. For example, a spike recorded at time `t` occurred during the interval `[t, t+dt)`. For plotting purposes, especially with larger bin sizes, you may want to use bin centers instead of bin starts for a more intuitive visualization. You can easily calculate the bin centers by adding half the bin size:: >> bins, rates = monitor.binned_rate(10*ms) >> bin_centers = bins + 10*ms / 2 >> plt.plot(bin_centers, rates) This adjustment is particularly helpful when the bins are large relative to the time scale of interest, as it better represents where the rate measurement applies within each time window. """ if (bin_size / self.clock.dt) % 1 > 1e-6: raise ValueError("bin_size has to be a multiple of dt.") # Get the total duration and number of bins bin_timesteps = timestep(bin_size, self.clock.dt) num_bins = int(self.clock.timestep // bin_timesteps) bin_starts_timesteps = (np.arange(num_bins)) * bin_timesteps bins = bin_starts_timesteps * self.clock.dt num_neurons = len(self.source) # Now we initialize the binned values array (neurons × bins) binned_values = np.zeros((num_neurons, num_bins)) if self.record and len(self.t) > 0: # Get the event times and indices event_times = self.t[:] event_timesteps = np.asarray(timestep(event_times, self.clock.dt)) bin_indices = event_timesteps // bin_timesteps np.add.at(binned_values, (self.i[:], bin_indices), 1) # Convert counts to rates (Hz) binned_values = binned_values / float(bin_size) return bins, Quantity(binned_values, dim=hertz.dim)
@property def num_events(self): """ Returns the total number of recorded events. """ return self.N[:] def __repr__(self): classname = self.__class__.__name__ return f"<{classname}, recording event '{self.event}' from '{self.group.name}'>"
[docs] def after_run(self): super().after_run() # In Cython runtime mode, we directly update the underlying dynamic array, # so the size attribute of the Variable does not get updated automatically for var in self.record_variables: try: self.variables[var].size = len(self.variables[var].get_value()) except NotImplementedError: pass # Does not apply to standalone mode
[docs] class SpikeMonitor(EventMonitor): """ Record spikes from a `NeuronGroup` or other spike source. The recorded spikes can be accessed in various ways (see Examples below): the attributes `~SpikeMonitor.i` and `~SpikeMonitor.t` store all the indices and spike times, respectively. Alternatively, you can get a dictionary mapping neuron indices to spike trains, by calling the `spike_trains` method. If you record additional variables with the ``variables`` argument, these variables can be accessed by their name (see Examples). Parameters ---------- source : (`NeuronGroup`, `SpikeSource`) The source of spikes to record. variables : str or sequence of str, optional Which variables to record at the time of the spike (in addition to the index of the neuron). Can be the name of a variable or a list of names. record : bool, optional Whether or not to record each spike in `i` and `t` (the `count` will always be recorded). Defaults to ``True``. when : str, optional When to record the events, by default records events in the same slot where the event is emitted. See :ref:`scheduling` for possible values. order : int, optional The priority of of this group for operations occurring at the same time step and in the same scheduling slot. Defaults to the order where the event is emitted + 1, i.e. it will be recorded directly afterwards. name : str, optional A unique name for the object, otherwise will use ``source.name+'_spikemonitor_0'``, etc. codeobj_class : class, optional The `CodeObject` class to run code with. Examples -------- >>> from brian2 import * >>> spikes = SpikeGeneratorGroup(3, [0, 1, 2], [0, 1, 2]*ms) >>> spike_mon = SpikeMonitor(spikes) >>> net = Network(spikes, spike_mon) >>> net.run(3*ms) >>> print(spike_mon.i[:]) [0 1 2] >>> print(spike_mon.t[:]) [ 0. 1. 2.] ms >>> print(spike_mon.t_[:]) [ 0. 0.001 0.002] >>> from brian2 import * >>> G = NeuronGroup(2, '''counter1 : integer ... counter2 : integer ... max_value : integer''', ... threshold='counter1 >= max_value', ... reset='counter1 = 0') >>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS CodeRunner(...) >>> G.max_value = [50, 100] >>> mon = SpikeMonitor(G, variables='counter2') >>> net = Network(G, mon) >>> net.run(10*ms) >>> print(mon.i[:]) [0 0 1] >>> print(mon.counter2[:]) [ 50 100 100] """ def __init__( self, source, variables=None, record=True, when=None, order=None, name="spikemonitor*", codeobj_class=None, ): #: The array of spike counts (length = size of target group) self.count = None del self.count # this is handled by the Variable mechanism super().__init__( source, event="spike", variables=variables, record=record, when=when, order=order, name=name, codeobj_class=codeobj_class, ) @property def num_spikes(self): """ Returns the total number of recorded spikes. """ return self.num_events # We "re-implement" the following functions only to get more specific # doc strings (and to make sure that the methods are included in the # reference documentation for SpikeMonitor).
[docs] def spike_trains(self): """ Return a dictionary mapping neuron indices to arrays of spike times. Returns ------- spike_trains : dict Dictionary that stores an array with the spike times for each neuron index. Examples -------- >>> from brian2 import * >>> spikes = SpikeGeneratorGroup(3, [0, 1, 2], [0, 1, 2]*ms) >>> spike_mon = SpikeMonitor(spikes) >>> run(3*ms) >>> spike_trains = spike_mon.spike_trains() >>> spike_trains[1] array([ 1.]) * msecond """ return self.event_trains()
[docs] def values(self, var): """ Return a dictionary mapping neuron indices to arrays of variable values at the time of the spikes (sorted by time). Parameters ---------- var : str The name of the variable. Returns ------- values : dict Dictionary mapping each neuron index to an array of variable values at the time of the spikes. Examples -------- >>> from brian2 import * >>> G = NeuronGroup(2, '''counter1 : integer ... counter2 : integer ... max_value : integer''', ... threshold='counter1 >= max_value', ... reset='counter1 = 0') >>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS CodeRunner(...) >>> G.max_value = [50, 100] >>> mon = SpikeMonitor(G, variables='counter2') >>> run(10*ms) >>> counter2_values = mon.values('counter2') >>> print(counter2_values[0]) [ 50 100] >>> print(counter2_values[1]) [100] """ return super().values(var)
[docs] def all_values(self): """ Return a dictionary mapping recorded variable names (including ``t``) to a dictionary mapping neuron indices to arrays of variable values at the time of the spikes (sorted by time). This is equivalent to (but more efficient than) calling `values` for each variable and storing the result in a dictionary. Returns ------- all_values : dict Dictionary mapping variable names to dictionaries which themselves are mapping neuron indicies to arrays of variable values at the time of the spikes. Examples -------- >>> from brian2 import * >>> G = NeuronGroup(2, '''counter1 : integer ... counter2 : integer ... max_value : integer''', ... threshold='counter1 >= max_value', ... reset='counter1 = 0') >>> G.run_regularly('counter1 += 1; counter2 += 1') # doctest: +ELLIPSIS CodeRunner(...) >>> G.max_value = [50, 100] >>> mon = SpikeMonitor(G, variables='counter2') >>> run(10*ms) >>> all_values = mon.all_values() >>> print(all_values['counter2'][0]) [ 50 100] >>> print(all_values['t'][1]) [ 9.9] ms """ return super().all_values()
def __repr__(self): classname = self.__class__.__name__ return f"<{classname}, recording from '{self.group.name}'>"