"""Tools to manipulate stumulus."""
import h5py
import numpy as np
from spikelib.io import load_matstim
from spikelib.utils import check_groups
[docs]def correct_checkerboard(stimpath, syncpath, repeatedpath, outputpath,
matvar='stim', output_group='/',
output_dataset='checkerboard',
):
"""Create a new stimulus with all repeated frames.
Take a checkerboar stimulus and add all repeated frame found in a
experiment and create a new stim file.
Parameters
----------
stimpath : str
path to original stim file (.mat).
syncpath : str
path to the syncronization file with start and end time for
checkerboard (txt format).
repeatedpath : str
path to file with all repeated times of experiment
(txt format).
outputpath : str
path to save stim to hdf5 file.
Notes
-----
scipy.io.loadmat read matfile in mantain the matlab axis order
to access array, ej. shape (y,x,channel,frame) = (35,35,3,72000)
and python should be (frame,y,x,channel) = (72000,35,35,3), for
this reason the output file keep python format.
https://eli.thegreenplace.net/2015/memory-layout-of-multi-dimensional-arrays/
http://scikit-image.org/docs/dev/user_guide/numpy_images.html
"""
stim = load_matstim(stimpath, matvar=matvar)
print('Shape for checkerboar file: {}'.format(stim.shape))
sync_frame = np.loadtxt(syncpath)
repetared_frame = np.loadtxt(repeatedpath)
repeated = np.where(np.isin(sync_frame[:, 0], repetared_frame))[0]
n_repeated = len(repeated)
# Find repeated position an number of repetitions
if n_repeated > 1:
rep_pointer = 0
counter = {repeated[rep_pointer]: 1}
for krep in range(1, len(repeated)):
if (repeated[krep] - repeated[krep-1]) > 1:
counter[repeated[krep]] = 1
rep_pointer = krep
else:
counter[repeated[rep_pointer]] += 1
elif n_repeated == 1:
counter = {repeated[0]: 1}
else:
counter = {}
# Correct of repeated to get the original stim repeated
sort_keys = [k for k in counter.keys()]
sort_keys.sort()
print('Repeated frame {}'.format(sort_keys))
if len(counter) > 1:
corrected_repeated = {sort_keys[0]: counter[sort_keys[0]]}
corrected_sum = counter[sort_keys[0]]
for krep in range(1, len(sort_keys)):
kkey = sort_keys[krep]
corrected_repeated[kkey-corrected_sum] = counter[kkey]
corrected_sum += counter[kkey]
elif len(counter) == 1:
corrected_repeated = counter
corrected_sum = counter[sort_keys[0]]
else:
corrected_repeated = counter
corrected_sum = 0
stim_shape = list(stim.shape)
new_stim_shape = list(stim_shape)
new_stim_shape[0] += corrected_sum
new_stim = np.empty(tuple(new_stim_shape), dtype=np.uint8)
range_stim = [k for k in corrected_repeated]
range_stim.sort()
range_stim = np.array([[0] + range_stim, range_stim + [stim_shape[0]]])
range_stim = range_stim.transpose()
if 0 not in corrected_repeated:
corrected_repeated[0] = 0
delay = 0
for kstart, kend in range_stim:
delay_rep = corrected_repeated[kstart]
new_start, new_end = (kstart+delay, kstart+delay+delay_rep)
new_stim[new_start:new_end, ...] = stim[kstart:kstart+1, ...]
delay += delay_rep
new_stim[kstart+delay:kend+delay, ...] = stim[kstart:kend, ...]
del stim, sync_frame, repetared_frame
with h5py.File(outputpath, 'a') as f:
if not output_group.endswith('/'):
output_group = output_group + '/'
check_groups(f, [output_group])
if output_dataset in f[output_group]:
f[output_group+output_dataset][...] = new_stim
else:
f.create_dataset(output_group+output_dataset,
data=new_stim,
dtype=np.uint8,
chunks=tuple([1]+new_stim_shape[1:]),
compression='gzip',
shuffle=False,
)
f[output_group].attrs['nrepeated'] = corrected_sum
repeated_str = ','.join(map(lambda x: str(x), sort_keys))
f[output_group].attrs['repeated'] = repeated_str
f[output_group].attrs['dim'] = u'(nframe,ysize,xsize,nchannel)'