Source code for pysight.nd_hist_generator.line_signal_validators.scanimage

"""
__author__ = Hagai Hargil
"""
from typing import Dict, Tuple
import pandas as pd
import numpy as np
import attr
from itertools import tee


[docs]@attr.s(slots=True) class ScanImageLineValidator: sig_val = attr.ib() # SignalValidator def __getattr__(self, item): return getattr(self.sig_val, item)
[docs] def run(self) -> Tuple[Dict, np.uint64]: """ Interpolate SI-specific line signals :return: Dictionary containing the data and the mean difference between subsequent lines """ lines = self.dict_of_data['Lines'].loc[:, 'abs_time'].copy() lines_mat, rel_idx, end_of_frames_idx, last_idx_of_row, rel_idx_non_end_frame = \ self.__calc_line_parameters(lines=lines) if lines_mat.shape[0] == 1: # single frame lines = pd.Series(lines_mat[0, :self.num_of_lines], dtype=np.uint64) delta = np.uint64(lines.diff().mean()) else: theo_lines, delta = self.__gen_line_model(lines=lines, rel_idx=rel_idx, end_of_frames_idx=end_of_frames_idx) lines_mat = self.__diff_mat_analysis(y=theo_lines, lines_mat=lines_mat, last_idx=last_idx_of_row, delta=delta, rel_idx=rel_idx_non_end_frame) lines = self.__finalize_lines(lines_mat=lines_mat) if self.bidir: lines = self.sig_val.add_phase_to_bidir_lines(lines=lines) self.dict_of_data['Lines'] = pd.DataFrame(lines, dtype=np.uint64, columns=['abs_time']) return self.dict_of_data, delta
def __gen_line_model(self, lines: pd.Series, rel_idx: np.ndarray, end_of_frames_idx: np.ndarray): """ Using linear approximation generate a model for the "correct" line signal """ rel_idx_first_frame = rel_idx[rel_idx < self.num_of_lines] lines_first_frame = lines.iloc[:self.num_of_lines].drop(rel_idx_first_frame) # Create a model: y = mx + const const = lines_first_frame[0] m = np.uint64(lines_first_frame.diff().median()) x = np.arange(start=0, stop=self.num_of_lines, dtype=np.uint64) y = m * x + const y = np.tile(y, (end_of_frames_idx.shape[0], 1)) y[1:, :] -= y[1, 0] y = np.concatenate((y, np.zeros_like(y)), axis=1) return y, m def __diff_mat_analysis(self, y: np.ndarray, lines_mat: np.ndarray, last_idx: np.ndarray, delta: int, rel_idx: np.ndarray) -> np.ndarray: """ Check for missing\extra lines in the matrix of lines :param diff_mat: np.ndarray :param last_idx: last index of relevant lines in the frame :return: """ diff_mat = np.abs(np.subtract(lines_mat, y, dtype=np.int64)) missing_vals_rows, missing_vals_cols = np.where(diff_mat > delta / 20) frames_to_correct = np.where(last_idx != self.num_of_lines)[0] for frame_num in frames_to_correct: num_of_missing_lines = last_idx[frame_num] - self.num_of_lines if num_of_missing_lines < 0: for miss in missing_vals_cols[missing_vals_rows == frame_num]: lines_mat[frame_num, :] = np.concatenate((lines_mat[frame_num, :miss], np.atleast_1d(y[frame_num, miss]), lines_mat[frame_num, miss:-1])) elif num_of_missing_lines > 0: cur_missing_cols = missing_vals_cols.copy() while cur_missing_cols.shape[0] > 0: lines_mat[frame_num, :] = np.concatenate((lines_mat[frame_num, :missing_vals_cols[0]], np.atleast_1d(y[frame_num, missing_vals_cols[0]]), lines_mat[frame_num, missing_vals_cols[0]:-1])) diff_line = np.abs(np.subtract(lines_mat[frame_num, :], y[frame_num, :], dtype=np.int64)) cur_missing_cols = np.where(diff_line > delta / 20)[0] return lines_mat def __finalize_lines(self, lines_mat: np.ndarray) -> pd.Series: """ Add back the start-of-frame times to the lines """ frames_mat = np.array(self.dict_of_data['Frames']).reshape((len(self.dict_of_data['Frames']), 1)) frames_mat = np.tile(frames_mat, (1, self.num_of_lines)) lines_mat = lines_mat[:, :self.num_of_lines] + frames_mat return pd.Series(lines_mat.ravel()) def __calc_line_parameters(self, lines: pd.Series) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Generate general parameters of the given acquisition """ change = np.abs(lines.diff().pct_change(periods=1)) rel_idx = np.where(change > self.change_thresh)[0] div, mod = divmod(len(lines), self.num_of_lines) if len(rel_idx) == 0 and mod != 0 and div == 0: raise UserWarning('Data contained incomplete signals only from the first frame.') end_of_frames = change > 5 end_of_frames_idx = np.where(end_of_frames)[0] # scanimage specific rel_idx_non_end_frame = np.where(change <= self.change_thresh)[0] idx_list = [slice(st, sp) for st, sp in self.__pairwise([0] + list(end_of_frames_idx))] lines_mat = np.zeros((len(idx_list), self.num_of_lines * 2), dtype=np.uint64) last_idx_of_row = np.zeros((end_of_frames_idx.shape[0]), dtype=np.int32) for cur_row, cur_slice in enumerate(idx_list): last_idx = cur_slice.stop-cur_slice.start lines_mat[cur_row, :last_idx] = lines.values[cur_slice] last_idx_of_row[cur_row] = last_idx start_time_of_frames = lines[end_of_frames_idx].values start_time_of_frames = np.concatenate((np.array([0], dtype=np.uint64), start_time_of_frames[:-1])) \ .reshape((end_of_frames_idx.shape[0], 1)) lines_mat -= start_time_of_frames if 'Frames' not in self.dict_of_data: self.dict_of_data['Frames'] = pd.DataFrame(start_time_of_frames.ravel(), columns=['abs_time']) for row, last_idx in enumerate(last_idx_of_row): lines_mat[row, last_idx:] = 0 return lines_mat, rel_idx, end_of_frames_idx, last_idx_of_row, rel_idx_non_end_frame @staticmethod def __pairwise(iterable): """From itertools: s -> (s0,s1), (s1,s2), (s2, s3), ...""" a, b = tee(iterable) next(b, None) return zip(a, b)