Source code for ibllib.io.extractors.ephys_passive

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Author: Niccolò Bonacchi
# @Date: Monday, September 7th 2020, 11:51:17 am
import json
import logging
from pathlib import Path
from packaging import version
import scipy.signal

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from ibldsp.utils import sync_timestamps

import ibllib.io.raw_data_loaders as rawio
from ibllib.io.extractors import ephys_fpga
from ibllib.io.extractors.base import BaseExtractor
import ibllib.io.extractors.passive_plotting as passive_plots

log = logging.getLogger("ibllib")

FRAME_FS = 60  # Sampling freq of the ipad screen, in Hertz
PATH_FIXTURES_V7 = Path(ephys_fpga.__file__).parent / "ephys_sessions"
PATH_FIXTURES_V8 = Path(ephys_fpga.__file__).parent / 'ephys_sessions' / 'passiveChoiceWorld_trials_fixtures.pqt'


# ------------------------------------------------------------------
# Loader utils
# ------------------------------------------------------------------
[docs] def load_task_replay_fixtures( session_path: Path, task_collection: str = 'raw_passive_data', settings: dict | None = None, task_version: version.Version | None = None, adjust: bool = True ) -> pd.DataFrame: """ Load the expected task replay sequence for a passive session. Parameters ---------- session_path: Path The path to a session task_collection: str The collection containing task data settings: dict, optional A dictionary containing the session settings. task_version: str, optional The iblrig version used for the session. Returns ------- task_replay: pd.DataFrame A dataframe containing the expected task replay sequence. """ if settings is None: settings = rawio.load_settings(session_path, task_collection=task_collection) if task_version is None: pars = map(settings.get, ['IBLRIG_VERSION', 'IBLRIG_VERSION_TAG']) task_version = next((k for k in pars if k is not None), '0.0.0') if task_version == '': task_version = settings.get('PARAMS', {}).get('IBLRIG_VERSION', '0.0.0') task_version = version.parse(task_version) task_protocol = settings.get('PYBPOD_PROTOCOL', '') if task_version >= version.parse('8.0.0') or 'passiveChoiceWorldIndependent' in task_protocol: task_replay = _load_v8_fixture_df(settings) else: task_replay = _load_v7_fixture_df(settings) # For all versions before 8.29.0, there was a bug where the first gabor stimulus was blank and the # following gabor stimuli showed the parameters from the previous gabor trial. if task_version <= version.parse('8.29.0') and adjust: # Shift the parameters of all gabor stimuli by one trial idx = task_replay.index[task_replay['stim_type'] == 'G'].values task_replay.iloc[idx[1:], :] = task_replay.iloc[idx[:-1], :] # Remove the first gabor stimulus from the template idx = task_replay.index[task_replay['stim_type'] == 'G'][0] task_replay = task_replay.drop(idx) # There was a bug in iblrig version 8.0 to 8.27.3, where the tone was played instead of the noise # See commit https://github.com/int-brain-lab/iblrig/commit/54d803b73de89173debd3003a55e0e4a3d8965f7 if version.parse('8.0.0') <= task_version <= version.parse('8.27.3'): idx = task_replay.index[task_replay['stim_type'] == 'N'].values task_replay.loc[idx, 'stim_type'] = 'T' return task_replay
def _load_v8_fixture_df(settings: dict) -> pd.DataFrame: """ Load the task replay fixture for iblrig v8 sessions. Parameters ---------- settings: dict A dictionary containing the session settings. Returns ------- replay_trials: pd.DataFrame A dataframe containing the expected task replay sequence. """ all_trials = pd.read_parquet(PATH_FIXTURES_V8) pars = map(settings.get, ['SESSION_TEMPLATE_ID', 'PREGENERATED_SESSION_NUM']) session_id = next((k for k in pars if k is not None), None) replay_trials = all_trials[all_trials['session_id'] == session_id].copy() # In some cases the task replay is repeated, in these cases repeat the replay trials table as needed num_total_stims = settings.get('NUM_STIM_PRESENTATIONS', len(replay_trials)) num_repeats = int(np.ceil(num_total_stims / len(replay_trials))) if num_repeats > 1: log.warning(f"Session template {session_id} has only {len(replay_trials)} trials. " f"The passive session was likely ran with a higher NUM_STIM_PRESENTATIONS. " f"Repeating to reach {num_total_stims} presentations.") replay_trials = pd.concat([replay_trials] * num_repeats, ignore_index=True).iloc[:num_total_stims] # Rename stim_phase to phase for consistency replay_trials = replay_trials.rename(columns={'stim_phase': 'phase'}) return replay_trials def _load_v7_fixture_df(settings: dict) -> pd.DataFrame: """ Load the task replay fixture for sessions with iblrig versions less than v8. Parameters ---------- settings: dict A dictionary containing the session settings. Returns ------- replay_trials: pd.DataFrame A dataframe containing the expected task replay sequence. """ pars = map(settings.get, ['PRELOADED_SESSION_NUM', 'PREGENERATED_SESSION_NUM', 'SESSION_TEMPLATE_ID']) session_id = next((k for k in pars if k is not None), None) session_order = settings.get('SESSION_ORDER', None) if session_order: assert settings["SESSION_ORDER"][settings["SESSION_IDX"]] == session_id gabor_params = np.load(PATH_FIXTURES_V7.joinpath(f"session_{session_id}_passive_pcs.npy")) replay_sequence = np.load(PATH_FIXTURES_V7.joinpath(f"session_{session_id}_passive_stimIDs.npy")) stim_delays = np.load(PATH_FIXTURES_V7.joinpath(f"session_{session_id}_passive_stimDelays.npy")) replay_trials = pd.DataFrame(columns=['session_id', 'stim_delay', 'stim_type', 'position', 'contrast', 'phase']) replay_trials['session_id'] = [session_id] * len(replay_sequence) replay_trials['stim_delay'] = stim_delays replay_trials['stim_type'] = replay_sequence gabor_idx = replay_trials['stim_type'] == 'G' replay_trials.loc[gabor_idx, 'position'] = gabor_params[:, 0] replay_trials.loc[gabor_idx, 'contrast'] = gabor_params[:, 1] replay_trials.loc[gabor_idx, 'phase'] = gabor_params[:, 2] return replay_trials def _load_passive_stim_meta() -> dict: """" Load the passive stimulus metadata fixture file. Returns ------- dict A dictionary containing the passive stimulus metadata. """ path_fixtures = Path(ephys_fpga.__file__).parent.joinpath("ephys_sessions") with open(path_fixtures.joinpath("passive_stim_meta.json"), "r") as f: meta = json.load(f) return meta def _load_spacer_info() -> dict: """ Load the spacer template used to detect passive periods. Returns ------- dict: A dictionary containing the spacer template information """ meta = _load_passive_stim_meta() info = { 't_quiet': meta['VISUAL_STIM_0']['delay_around'], 'template': np.array(meta['VISUAL_STIM_0']['ttl_frame_nums'], dtype=np.float32) / FRAME_FS, 'jitter': 3 / FRAME_FS, 'n_spacers': np.sum(np.array(meta['STIM_ORDER']) == 0) } return info def _load_rf_mapping(session_path: Path, task_collection: str = 'raw_passive_data') -> np.ndarray: """ Load the receptive field mapping stimulus frames from a passive session. Parameters ---------- session_path: Path The path to a session task_collection: str, default 'raw_passive_data' The collection containing the task data. Returns ------- frames: np.ndarray The frames of the receptive field mapping stimulus. """ file = session_path.joinpath(task_collection, "_iblrig_RFMapStim.raw.bin") frame_array = np.fromfile(file, dtype="uint8") meta = _load_passive_stim_meta() mkey = "VISUAL_STIM_" + {v: k for k, v in meta["VISUAL_STIMULI"].items()}["receptive_field_mapping"] y_pix, x_pix, _ = meta[mkey]["stim_file_shape"] frames = np.transpose(np.reshape(frame_array, [y_pix, x_pix, -1], order="F"), [2, 1, 0]) return frames # ------------------------------------------------------------------ # 1/3 Define start and end times of the 3 passive periods # ------------------------------------------------------------------
[docs] def extract_passive_periods( session_path: Path, sync_collection: str = 'raw_ephys_data', sync: dict | None = None, sync_map: dict | None = None, tmin: float | None = None, tmax: float | None = None ) -> pd.DataFrame: """ Extract passive protocol periods from a session. Parameters ---------- session_path: Path The path to a session sync_collection : str, default 'raw_ephys_data' The collection containing the sync data. sync : dict, optional Preloaded sync dictionary. If None, it is loaded via `ephys_fpga.get_sync_and_chn_map`. sync_map : dict, optional Channel map for sync. Loaded if None. tmin, tmax : float, optional Time window to restrict the extraction. Returns ------- pd.DataFrame A dataFrame containing passive periods. Contains rows: 'start', 'stop' and columns: 'passiveProtocol', 'spontaneousActivity', 'RFM', 'taskReplay' """ # Load the sync data if not already available if sync is None or sync_map is None: sync, sync_map = ephys_fpga.get_sync_and_chn_map(session_path, sync_collection) fttl = ephys_fpga.get_sync_fronts(sync, sync_map["frame2ttl"], tmin=tmin, tmax=tmax) fttl = ephys_fpga._clean_frame2ttl(fttl, display=False) tmax = tmax or sync['times'][-1] tmin = tmin or 0 t_start_passive, t_starts, t_ends = _get_spacer_times(fttl['times'], tmin, tmax) t_starts_col = np.insert(t_starts, 0, t_start_passive) t_ends_col = np.insert(t_ends, 0, t_ends[-1]) passive_periods = pd.DataFrame( [t_starts_col, t_ends_col], index=["start", "stop"], columns=["passiveProtocol", "spontaneousActivity", "RFM", "taskReplay"], ) return passive_periods
def _get_spacer_times(ttl_signal: np.ndarray, tmin: float, tmax: float, thresh: float = 3.2): """ Find the times of spacer onset/offset in the ttl_signal Parameters ---------- ttl_signal : np.ndarray The frame2ttl signal containing the spacers tmin : float The time of the start of the passive period tmax : float The time of the end of the passive period thresh : float, optional The threshold value for the fttl convolved signal (with spacer template) to pass over to detect a spacer, by default 3.0 Returns ------- spacer_times : np.ndarray An array of shape (n_spacers, 2) containing the start and end times of each spacer """ spacer_info = _load_spacer_info() spacer_model = spacer_info['jitter'] + np.diff(spacer_info['template'])[2:-2] # spacer_model = jitter + np.diff(spacer_template)[2:-2] # diff ttl signal to compare to spacer_model dttl = np.diff(ttl_signal) # Remove diffs larger and smaller than max and min diff in model to clean up signal dttl[dttl > np.max(spacer_model)] = 0 dttl[dttl < np.min(spacer_model)] = 0 # Convolve cleaned diff ttl signal w/ spacer model conv_dttl = np.correlate(dttl, spacer_model, mode="same") # Find the peaks in the convolved signal that pass over a certain threshold idxs_spacer_middle, _ = scipy.signal.find_peaks(conv_dttl, height=thresh) # Find the duration of the spacer model spacer_around = int((np.floor(len(spacer_model) / 2))) is_valid = np.zeros((idxs_spacer_middle.size), dtype=bool) # Assert that before and after the peak the spacing between ttls is going up and down respectively for i, t in enumerate(idxs_spacer_middle): before = np.all(np.diff(dttl[t - spacer_around:t]) >= 0) after = np.all(np.diff(dttl[t:t + spacer_around]) <= 0) is_valid[i] = before and after idxs_spacer_middle = idxs_spacer_middle[is_valid] # Pull out spacer times (middle) ts_spacer_middle = ttl_signal[idxs_spacer_middle] # Put beginning/end of spacer times into an array spacer_length = np.max(spacer_info['template']) spacer_times = np.zeros(shape=(ts_spacer_middle.shape[0], 2)) for i, t in enumerate(ts_spacer_middle): spacer_times[i, 0] = t - (spacer_length / 2) - spacer_info['t_quiet'] spacer_times[i, 1] = t + (spacer_length / 2) + spacer_info['t_quiet'] # Check correct number of spacers found if spacer_info['n_spacers'] != np.size(spacer_times) / 2: error_nspacer = True # NOTE THIS ONLY WORKS FOR THE CASE WHERE THE SIGNAL IS EXTRACTED ACCORDING TO BPOD SIGNAL # sometimes the first spacer is truncated # assess whether the first spacer is undetected, and then launch another spacer detection on truncated fttl # with a lower threshold value # Note: take *3 for some margin if (spacer_times[0][0] > tmin + (spacer_info['template'][-1] + spacer_info['jitter']) * 3 and (np.size(spacer_times) / 2) == spacer_info['n_spacers'] - 1): # Truncate signals fttl_t = ttl_signal[np.where(ttl_signal < spacer_times[0][0])] conv_dttl_t = conv_dttl[np.where(ttl_signal < spacer_times[0][0])] ddttl = np.diff(np.diff(fttl_t)) # Find spacer location # NB: cannot re-use the same algo for spacer detection as conv peaks towards spacer end # 1. Find time point at which conv raises above a given threshold value thresh_alt = 2.0 idx_nearend_spacer = int(np.where((conv_dttl_t[1:-2] < thresh_alt) & (conv_dttl_t[2:-1] > thresh_alt))[0]) ddttl = ddttl[0:idx_nearend_spacer] # 2. Find time point before this, for which fttl diff increase/decrease (this is the middle of spacer) indx_middle = np.where((ddttl[0:-1] > 0) & (ddttl[1:] < 0))[0] if len(indx_middle) == 1: # 3. Add 1/2 spacer to middle idx to get the spacer end indx spacer_around = int((np.floor(len(spacer_info['template']) / 2))) idx_end = int(indx_middle + spacer_around) + 1 spacer_times = np.insert(spacer_times, 0, np.array([ttl_signal[0], ttl_signal[idx_end]]), axis=0) error_nspacer = False if error_nspacer: raise ValueError( f'The number of expected spacer ({spacer_info["n_spacers"]}) ' f'is different than the one found on the raw ' f'trace ({int(np.size(spacer_times) / 2)})' ) spacer_times = np.r_[spacer_times.flatten(), tmax] return spacer_times[0], spacer_times[1::2], spacer_times[2::2] # ------------------------------------------------------------------ # 2/3 RFMapping stimuli # ------------------------------------------------------------------
[docs] def extract_rfmapping( session_path: Path, sync_collection: str = 'raw_ephys_data', task_collection: str = 'raw_passive_data', sync: dict | None = None, sync_map: dict | None = None, trfm: np.ndarray | None = None ) -> tuple[np.ndarray, np.ndarray]: """ Extract the receptive field mapping stimulus times from a passive session. Parameters ---------- session_path: Path The path to a session sync_collection : str, default 'raw_ephys_data' The collection containing the sync data. sync : dict, optional Preloaded sync dictionary. If None, it is loaded via `ephys_fpga.get_sync_and_chn_map`. sync_map : dict, optional Channel map for sync. Loaded if None. trfm: The start and end times of the receptive field mapping period. If None, extracted from passive periods. Returns ------- np.array The times of each frame presented during the receptive field mapping stimulus. """ if sync is None or sync_map is None: sync, sync_map = ephys_fpga.get_sync_and_chn_map(session_path, sync_collection) if trfm is None: passivePeriods_df = extract_passive_periods(session_path, sync_collection, sync=sync, sync_map=sync_map) trfm = passivePeriods_df.RFM.values # Get the ttl onset times from the RF mapping stimulus frames = _load_rf_mapping(Path(session_path), task_collection) # Extract the ttl trace from the lower right corner ttl_trace = frames[:, 0, 0] # Extract the onset times of the ttls rf_times_on = np.where(np.diff(np.abs(ttl_trace)) > 0)[0] / FRAME_FS # Get the ttl onset times as detected on the frame2ttl fttl = ephys_fpga.get_sync_fronts(sync, sync_map["frame2ttl"], tmin=trfm[0], tmax=trfm[1]) fttl = ephys_fpga._clean_frame2ttl(fttl) fttl_times_on = np.where(np.diff(fttl["times"]) <= 1)[0] # Ensure that all the polarities are the same for the selected times flips = fttl['polarities'][fttl_times_on] fttl_times_on = fttl_times_on[flips == np.median(flips)] fcn, *_ = sync_timestamps(rf_times_on, fttl['times'][fttl_times_on]) rf_times = fcn(np.arange(frames.shape[0]) / FRAME_FS) return rf_times
# ------------------------------------------------------------------ # 3/3 Task replay # ------------------------------------------------------------------
[docs] def extract_task_replay( session_path: Path, sync_collection: str = 'raw_ephys_data', task_collection: str = 'raw_passive_data', settings: dict | None = None, sync: dict | None = None, sync_map: dict | None = None, treplay: np.ndarray | None = None ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Extract the task replay stimuli from a passive session. Parameters ---------- session_path: Path The path to a session sync_collection : str, default 'raw_ephys_data' The collection containing the sync data. task_collection: str, default 'raw_passive_data' The collection containing the passive task data. settings: dict, optional A dictionary containing the session settings. If None, it is loaded via `rawio.load_settings`. sync: dict, optional The sync data from the fpga sync_map: dict, optional The map of sync channels treplay: np.ndarray, optional The start and end times of the task replay period. If None, extracted from passive periods. Returns ------- gabor_df: pd.DataFrame A dataframe containing the gabor stimulus times and properties. stim_df: pd.DataFrame A dataframe containing the valve, tone and noise stimulus times. """ if sync is None or sync_map is None: sync, sync_map = ephys_fpga.get_sync_and_chn_map(session_path, sync_collection) if treplay is None: passivePeriods_df = extract_passive_periods(session_path, sync_collection, sync=sync, sync_map=sync_map) treplay = passivePeriods_df.taskReplay.values if settings is None: settings = rawio.load_settings(session_path, task_collection=task_collection) pars = map(settings.get, ['IBLRIG_VERSION', 'IBLRIG_VERSION_TAG']) task_version = next((k for k in pars if k is not None), '0.0.0') if task_version == '': task_version = settings.get('PARAMS', {}).get('IBLRIG_VERSION', '0.0.0') task_version = version.Version(task_version) # Load in the expected task replay structure replay_trials = load_task_replay_fixtures(session_path=session_path, task_collection=task_collection, settings=settings, task_version=task_version) # Extract the gabor events, uses the ttls on the frame2ttl channel try: try: fttl = ephys_fpga.get_sync_fronts(sync, sync_map["frame2ttl"], tmin=treplay[0], tmax=treplay[1]) fttl = ephys_fpga._clean_frame2ttl(fttl) gabor_df = _extract_passive_gabor(fttl, replay_trials) except AssertionError: # There are a few sessions <8.29.0 where the first gabor was successfully shown. In these cases # we rextract without the adjustment of the task replay fixture to remove the first gabor log.warning('Attempting to extact gabor stimuli without adjusting for first blank gabor...') replay_trials = load_task_replay_fixtures(session_path=session_path, task_collection=task_collection, settings=settings, task_version=task_version, adjust=False) fttl = ephys_fpga.get_sync_fronts(sync, sync_map["frame2ttl"], tmin=treplay[0], tmax=treplay[1]) fttl = ephys_fpga._clean_frame2ttl(fttl) gabor_df = _extract_passive_gabor(fttl, replay_trials) except Exception as e: log.error(f'Failed to extract gabor stimuli from passive session. {e}') gabor_df = None try: # Extract the valve events, uses the ttls on the bpod channel bpod = ephys_fpga.get_sync_fronts(sync, sync_map["bpod"], tmin=treplay[0], tmax=treplay[1]) valve_df = _extract_passive_valve(bpod, replay_trials) # Extract the audio events, uses the ttls on the audio channel audio = ephys_fpga.get_sync_fronts(sync, sync_map["audio"], tmin=treplay[0], tmax=treplay[1]) tone_df, noise_df = _extract_passive_audio(audio, replay_trials, task_version) max_len = max(len(tone_df), len(valve_df), len(noise_df)) # Build the stimulus dataframe stim_df = pd.DataFrame({ 'valveOn': valve_df['start'].reindex(range(max_len)), 'valveOff': valve_df['stop'].reindex(range(max_len)), 'toneOn': tone_df['start'].reindex(range(max_len)), 'toneOff': tone_df['stop'].reindex(range(max_len)), 'noiseOn': noise_df['start'].reindex(range(max_len)), 'noiseOff': noise_df['stop'].reindex(range(max_len)), }) except Exception as e: log.error(f'Failed to extract valve/audio stimuli from passive session. {e}') stim_df = None if gabor_df is not None and stim_df is not None: # Build the full task replay dataframe and order by start time full_df = pd.concat([gabor_df[['stim_type', 'start', 'stop']], valve_df, tone_df, noise_df]) # Sort by start time and check if it matches the replay trials full_df = full_df.sort_values(by='start').reset_index(drop=True) if not version.parse('6.2.5') <= task_version <= version.parse('6.5.3'): assert np.array_equal(full_df['stim_type'].values, replay_trials['stim_type'].values), \ "The extracted sequence does not match the expected task replay sequence." gabor_df = gabor_df.drop(columns=['stim_type']) elif gabor_df is None and stim_df is not None: # Build the full task replay dataframe and order by start time excluding gabors full_df = pd.concat([valve_df, tone_df, noise_df]) replay_trials = replay_trials[replay_trials['stim_type'] != 'G'].reset_index(drop=True) # Sort by start time and check if it matches the replay trials full_df = full_df.sort_values(by='start').reset_index(drop=True) if not version.parse('6.2.5') <= task_version <= version.parse('6.5.3'): assert np.array_equal(full_df['stim_type'].values, replay_trials['stim_type'].values), \ "The extracted sequence does not match the expected task replay sequence." return gabor_df, stim_df
def _extract_passive_gabor( fttl: dict, replay_trials: pd.DataFrame ) -> pd.DataFrame: """ Extract the gabor stimulus times from a passive session. Parameters ---------- fttl: dict The frame2ttl sync dictionary containing 'times' and 'polarities'. replay_trials: A dataframe containing the expected task replay sequence. Returns ------- gabor_df: pd.DataFrame A dataframe containing the gabor stimulus times and properties. """ ttl_signal = fttl['times'] dttl = np.diff(ttl_signal) n_expected_gabor = (replay_trials['stim_type'] == 'G').sum() # Split the intervals into two alternating groups. # One group corresponds to the ttl pulses, the other to the interval between pulses, # we don't know which is which yet. even_gaps = dttl[2:-2:2] odd_gaps = dttl[3:-2:2] # Find the min and max of both groups diff0 = (np.min(even_gaps), np.max(even_gaps)) diff1 = (np.min(odd_gaps), np.max(odd_gaps)) # Find which group has the highest values, these correspond to the intervals between pulses if max(diff0 + diff1) in diff0: intervals = diff0 elif max(diff0 + diff1) in diff1: intervals = diff1 # Our upper threshold for detecting ttl pulses is then the min of the intervals group thresh = intervals[0] # Find the onset of the pulses. idx_start_stims = np.where((dttl < thresh - 0.05) & (dttl > 0.1))[0] # Get the offset times idx_end_stims = idx_start_stims + 1 start_times = ttl_signal[idx_start_stims] end_times = ttl_signal[idx_end_stims] if len(start_times) == n_expected_gabor + 1: # Sometimes an extra pulse is detected at the start, if this corresponds # to the start of the fttl signal we remove it if start_times[0] == fttl['times'][0]: start_times = start_times[1:] end_times = end_times[1:] assert start_times.size == n_expected_gabor, \ f"Wrong number of Gabor stimuli detected: {start_times.size} / {n_expected_gabor}" # Check length of presentation of stim is within 150ms of expected assert np.allclose(end_times - start_times, 0.3, atol=0.15), "Some Gabor presentation lengths seem wrong." # if not np.allclose(end_times - start_times, 0.3, atol=0.15): # log.warning("Some Gabor presentation lengths seem wrong.") # Build our gabor dataframe that also contains the stimulus properties gabor_df = ( replay_trials .loc[replay_trials["stim_type"] == "G", ["stim_type", "position", "contrast", "phase"]] .assign(start=start_times, stop=end_times) [["stim_type", "start", "stop", "position", "contrast", "phase"]] .reset_index(drop=True) ) return gabor_df def _extract_passive_valve( bpod: dict, replay_trials: pd.DataFrame ) -> pd.DataFrame: """ Extract the valve stimulus times from a passive session. Parameters ---------- bpod: dict The bpod sync dictionary containing 'times' and 'polarities'. replay_trials: pd.DataFrame A dataframe containing the expected task replay sequence. Returns ------- valve_df: pd.DataFrame A dataframe containing the valve stimulus times. """ n_expected_valve = (replay_trials['stim_type'] == 'V').sum() # All high fronts == valve open times and low fronts == valve close times valveOn_times = bpod["times"][bpod["polarities"] > 0] valveOff_times = bpod["times"][bpod["polarities"] < 0] assert len(valveOn_times) == n_expected_valve, (f"Wrong number of valve ONSET times: " f"{len(valveOn_times)} / {n_expected_valve}") assert len(valveOff_times) == n_expected_valve, (f"Wrong number of valve OFFSET times: " f"{len(valveOn_times)} / {n_expected_valve}") assert len(bpod["times"]) == n_expected_valve * 2, "Wrong number of valve FRONTS detected" # Check all values are within bpod tolerance of 100µs assert np.allclose(valveOff_times - valveOn_times, valveOff_times[1] - valveOn_times[1], atol=0.001), \ "Some valve outputs are longer or shorter than others" valve_df = ( replay_trials.loc[replay_trials["stim_type"] == "V", ["stim_type"]] .assign(start=valveOn_times, stop=valveOff_times) [["start", "stop", "stim_type"]] .reset_index(drop=True) ) return valve_df def _extract_passive_audio( audio: dict, replay_trials: pd.DataFrame, rig_version: version ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Extract the audio stimulus times from a passive session. Two dataframes are returned, one for tones and one for noise stimuli. Parameters ---------- audio: dict The audio sync dictionary containing 'times' and 'polarities'. replay_trials: pd.DataFrame A dataframe containing the expected task replay sequence. rig_version: version The version of iblrig used for the session. Returns ------- tone_df: pd.DataFrame A dataframe containing the tone stimulus times. noise_df: pd.DataFrame A dataframe containing the noise stimulus times. """ # Get all sound onsets and offsets soundOn_times = audio["times"][audio["polarities"] > 0] soundOff_times = audio["times"][audio["polarities"] < 0] n_expected_tone = (replay_trials['stim_type'] == 'T').sum() n_expected_noise = (replay_trials['stim_type'] == 'N').sum() n_expected_audio = n_expected_tone + n_expected_noise if version.parse('6.2.5') <= rig_version <= version.parse('6.5.3'): pulse_diff = soundOff_times - soundOn_times keep = pulse_diff < 1 soundOn_times = soundOn_times[keep] soundOff_times = soundOff_times[keep] log.warning('Detected 6.2.5 <= iblrig version <= 6.5.3, removing extra audio pulses longer than 1s and relaxing checks') if not len(soundOn_times) == n_expected_audio: log.warning(f"Wrong number of sound ONSETS: {len(soundOn_times)} / {n_expected_audio}") if not len(soundOff_times) == n_expected_audio: log.warning(f"Wrong number of sound OFFSETS: {len(soundOn_times)} / {n_expected_audio}") else: assert len(soundOn_times) == n_expected_audio, (f"Wrong number of sound ONSETS: " f"{len(soundOn_times)} / {n_expected_audio}") assert len(soundOff_times) == n_expected_audio, (f"Wrong number of sound OFFSETS: " f"{len(soundOn_times)} / {n_expected_audio}") pulse_diff = soundOff_times - soundOn_times # Tone is ~100ms so check if diff < 0.3 tone_mask = pulse_diff <= 0.3 # Noise is ~500ms so check if diff > 0.3 noise_mask = pulse_diff > 0.3 toneOn_times, toneOff_times = soundOn_times[tone_mask], soundOff_times[tone_mask] noiseOn_times, noiseOff_times = soundOn_times[noise_mask], soundOff_times[noise_mask] # Fixed delays from soundcard ~500µs assert np.allclose(toneOff_times - toneOn_times, 0.1, atol=0.02, equal_nan=True), "Some tone lengths seem wrong." assert np.allclose(noiseOff_times - noiseOn_times, 0.5, atol=0.02, equal_nan=True), "Some noise lengths seem wrong." if version.parse('6.2.5') <= rig_version <= version.parse('6.5.3'): # We pad the values with NaNs to match expected lengths toneOn_times = np.r_[toneOn_times, np.full((n_expected_tone - len(toneOn_times)), np.nan)] toneOff_times = np.r_[toneOff_times, np.full((n_expected_tone - len(toneOff_times)), np.nan)] noiseOn_times = np.r_[noiseOn_times, np.full((n_expected_tone - len(noiseOn_times)), np.nan)] noiseOff_times = np.r_[noiseOff_times, np.full((n_expected_tone - len(noiseOff_times)), np.nan)] assert len(toneOn_times) == len(toneOff_times) == n_expected_tone, "Wrong number of tones detected" assert len(noiseOn_times) == len(noiseOff_times) == n_expected_noise, "Wrong number of noise detected" tone_df = ( replay_trials.loc[replay_trials["stim_type"] == "T", ["stim_type"]] .assign(start=toneOn_times, stop=toneOff_times) [["start", "stop", "stim_type"]] .reset_index(drop=True) ) noise_df = ( replay_trials.loc[replay_trials["stim_type"] == "N", ["stim_type"]] .assign(start=noiseOn_times, stop=noiseOff_times) [["start", "stop", "stim_type"]] .reset_index(drop=True) ) return tone_df, noise_df # ------------------------------------------------------------------ # Main extractor # ------------------------------------------------------------------
[docs] class PassiveChoiceWorld(BaseExtractor): save_names = ( "_ibl_passivePeriods.intervalsTable.csv", "_ibl_passiveRFM.times.npy", "_ibl_passiveGabor.table.csv", "_ibl_passiveStims.table.csv", ) var_names = ( "passivePeriods_df", "passiveRFM_times", "passiveGabor_df", "passiveStims_df", ) def _extract( self, sync_collection: str = 'raw_ephys_data', task_collection: str = 'raw_passive_data', sync: dict | None = None, sync_map: dict | None = None, settings: dict | None = None, plot: bool = False, **kwargs ) -> tuple: # Load in the sync and sync map if not provided if sync is None or sync_map is None: sync, sync_map = ephys_fpga.get_sync_and_chn_map(self.session_path, sync_collection) # Load in the task settings if settings is None: settings = rawio.load_settings(self.session_path, task_collection=task_collection) # Get the start and end times of this protocol. if (protocol_number := kwargs.get('protocol_number')) is not None: # look for spacer # The spacers are TTLs generated by Bpod at the start of each protocol bpod = ephys_fpga.get_sync_fronts(sync, sync_map['bpod']) tmin, tmax = ephys_fpga.get_protocol_period(self.session_path, protocol_number, bpod) else: tmin = tmax = None # Get the timing of the passive periods passivePeriods_df = extract_passive_periods(self.session_path, sync_collection=sync_collection, sync=sync, sync_map=sync_map, tmin=tmin, tmax=tmax) trfm = passivePeriods_df.RFM.values treplay = passivePeriods_df.taskReplay.values try: # RFMapping passiveRFM_times = extract_rfmapping(self.session_path, sync_collection=sync_collection, task_collection=task_collection, sync=sync, sync_map=sync_map, trfm=trfm) except Exception as e: log.error(f"Failed to extract RFMapping datasets: {e}") passiveRFM_times = None skip_replay = settings.get('SKIP_EVENT_REPLAY', False) if not skip_replay: try: (passiveGabor_df, passiveStims_df,) = extract_task_replay( self.session_path, sync_collection=sync_collection, task_collection=task_collection, settings=settings, sync=sync, sync_map=sync_map, treplay=treplay) except Exception as e: log.error(f"Failed to extract task replay stimuli: {e}") passiveGabor_df, passiveStims_df = (None, None) else: # If we don't have task replay then we set the treplay intervals to NaN in our passivePeriods_df dataset passiveGabor_df, passiveStims_df = (None, None) passivePeriods_df.taskReplay = np.nan if plot: f, ax = plt.subplots(1, 1) f.suptitle("/".join(str(self.session_path).split("/")[-5:])) passive_plots.plot_sync_channels(sync=sync, sync_map=sync_map, ax=ax) passive_plots.plot_passive_periods(passivePeriods_df, ax=ax) passive_plots.plot_rfmapping(passiveRFM_times, ax=ax) passive_plots.plot_gabor_times(passiveGabor_df, ax=ax) passive_plots.plot_stims_times(passiveStims_df, ax=ax) plt.show() data = ( passivePeriods_df, # _ibl_passivePeriods.intervalsTable.csv passiveRFM_times, # _ibl_passiveRFM.times.npy passiveGabor_df, # _ibl_passiveGabor.table.csv, passiveStims_df # _ibl_passiveStims.table.csv ) # Set save names to None if data not extracted - these will not be saved or registered self.save_names = tuple(None if y is None else x for x, y in zip(self.save_names, data)) return data