Source code for pysight.nd_hist_generator.line_signal_validators.mscan

"""
__author__ = Hagai Hargil
"""
from typing import Dict, Tuple
import pandas as pd
import numpy as np
import attr
from itertools import chain


[docs]@attr.s(slots=True) class MScanLineValidator: sig_val = attr.ib() # SignalValidator def __getattr__(self, item): return getattr(self.sig_val, item)
[docs] def run(self) -> Tuple[Dict, np.uint64]: """ Interpolate MScan-specific line signals :return: Dictionary containing the data and the mean difference between subsequent lines """ lines = self.dict_of_data['Lines'].loc[:, 'abs_time'].copy() rel_idx, delta = self.__calc_line_parameters(lines=lines) lines, rel_idx, delta = self.__filter_extra_lines(lines=lines, delta=delta) if len(rel_idx) > 0: # missing lines, not just extra theo_lines = self.__gen_line_model(lines=lines, m=delta) lines = self.__diff_vec_analysis(lines=lines, y=theo_lines, delta=delta) lines = self.__finalize_lines(lines=lines, delta=delta) if self.bidir: lines = self.sig_val.add_phase_to_bidir_lines(lines=lines) self.dict_of_data['Lines'] = pd.DataFrame(lines, dtype=np.uint64, columns=['abs_time']) return self.dict_of_data, delta
def __calc_line_parameters(self, lines: pd.Series) -> Tuple[np.ndarray, np.uint64]: """ Generate general parameters of the given acquisition """ rel_idx = np.where(np.abs(lines.diff().pct_change(periods=1)) > self.change_thresh)[0] delta = np.uint64(lines.drop(rel_idx).reindex(np.arange(len(lines))).interpolate().diff().mean()) return rel_idx[::2], delta def __filter_extra_lines(self, lines: pd.Series, delta: np.uint64) -> Tuple[pd.Series, pd.Series, np.uint64]: """ Kick out excess line signals :param lines: :param delta: :return: Tuple of valid lines, missing lines and new delta of lines """ diffs = lines.diff() rel_idx = np.where(np.abs(diffs.pct_change(periods=1)) > self.change_thresh)[0] recurring = np.where(np.diff(rel_idx) == 1)[0] idx_to_keep = np.ones_like(rel_idx, dtype=bool) for idx, _ in enumerate(recurring[1:], 1): try: if recurring[idx] - recurring[idx-1] == 1: idx_to_keep[recurring[idx] + 1] = False except IndexError: pass rel_idx_new = pd.Series(rel_idx[idx_to_keep][::2], dtype=np.uint64) missing_lines = [] extra_lines = [] for idx in rel_idx_new: if diffs[idx] < (delta/2): # excess lines extra_lines.append(idx) else: missing_lines.append(idx) valid_lines = lines.drop(extra_lines).reset_index(drop=True) delta = np.uint64(valid_lines.drop(missing_lines).diff().mean()) # Get rid of lines that came after the last frame num_of_extra_lines = len(valid_lines) % self.num_of_lines valid_lines = valid_lines[:-num_of_extra_lines] return valid_lines, pd.Series(missing_lines), delta def __gen_line_model(self, lines: pd.Series, m: np.uint64) -> np.ndarray: """ Using linear approximation generate a model for the "correct" line signal """ const = lines.iloc[0] x = np.arange(start=0, stop=len(lines), dtype=np.uint64) y = m * x + const if len(lines) > 1500: # correct simulated lines idx_range = np.arange(1500, len(lines), step=1500, dtype=np.uint64) for idx in idx_range: x = np.arange(0, len(lines)-idx, dtype=np.uint64) y[idx:] = m * x + lines.iloc[idx] # MScan's lines are evenly separated first_diff = np.uint64(lines.iloc[2:110].diff()[1::2].median()) delta_diff = np.int32((m - first_diff) / 2) if first_diff < m: y[1::2] -= delta_diff y[::2] += delta_diff else: y[1::2] += delta_diff y[::2] -= delta_diff return y def __diff_vec_analysis(self, y: np.ndarray, lines: pd.Series, delta: np.uint64) -> pd.Series: diff_vec = np.abs(np.subtract(y, lines, dtype=np.int64)) missing_val = np.where(diff_vec > delta / 20)[0] while missing_val.shape[0] > 0: if np.abs(diff_vec[missing_val[0]] - delta)/delta < 0.1: # double line lines = np.concatenate((lines[:missing_val[0]], lines[missing_val[0]+1:])) else: lines = np.concatenate((lines[:missing_val[0]], np.atleast_1d(y[missing_val[0]]), lines[missing_val[0]:])) # Restart the loop y = self.__gen_line_model(pd.Series(lines), delta) diff_vec = np.abs(np.subtract(y, lines, dtype=np.int64)) missing_val = np.where(diff_vec > delta / 20)[0] return pd.Series(lines) def __finalize_lines(self, lines, delta) -> pd.Series: """ Sample the lines so that they're "silent" between frames """ lines_between_frames = int(np.rint(self.frame_delay / delta)) start_of_frame_idx = np.arange(start=0, stop=len(lines), step=self.num_of_lines + lines_between_frames, dtype=np.uint64) end_of_frame_idx = start_of_frame_idx + self.num_of_lines exact_lines = [lines[slice(start, end)] for start, end in zip(start_of_frame_idx, end_of_frame_idx)] exact_lines = np.array(list(chain.from_iterable(exact_lines)), dtype=np.uint64) self.sig_val.num_of_frames = len(end_of_frame_idx) return pd.Series(exact_lines, dtype=np.uint64)