Source code for pysight.main

"""
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = Hagai Har-Gil
"""

from typing import MutableMapping, Any, Optional
import pickle
import logging
import pathlib
import sys
import enum

import matplotlib
import pandas as pd
import colorama
import numpy as np
import toml

from pysight.ascii_list_file_parser.file_io import ReadMeta, ascii_parsing
from pysight.nd_hist_generator.allocation import Allocate
from pysight.nd_hist_generator.outputs import OutputParser, PySightOutput
from pysight.nd_hist_generator.movie import Movie
from pysight.nd_hist_generator.gating import GatedDetection
from pysight.nd_hist_generator.photon_df import PhotonDF
from pysight.nd_hist_generator.tag_bits import ParseTAGBits
from pysight.nd_hist_generator.line_signal_validators.validation_tools import (
    SignalValidator,
)
from pysight.nd_hist_generator.line_signal_validators.add_bidir_lines import (
    add_bidir_lines,
)
from pysight.gui.gui_helpers import verify_input
from pysight.gui.config_parser import Config
from pysight.nd_hist_generator.volume_gen import VolumeGenerator
from pysight.binary_list_file_parser.binary_parser import (
    binary_parsing,
)
from pysight.read_lst import ReadData
from pysight.nd_hist_generator.deinterleave import Deinterleave


logging.basicConfig(
    stream=sys.stdout,
    # filename=str(pathlib.Path('.') / "general.log"),
    # filemode="w",
    format="%(levelname)s :: %(filename)s :: %(asctime)s :: %(message)s",
    level=logging.INFO,
)
matplotlib.rcParams["backend"] = "TkAgg"
colorama.init()


class RunType(enum.Enum):
    GUI = enum.auto()
    SINGLE = enum.auto()
    BATCH = enum.auto()


def main_data_readout(config: MutableMapping[str, Any]) -> Optional[PySightOutput]:
    """
    Main function that reads the lst file and processes its data.
    Should not be run independently - only from other "run_X" functions.

    :param dict config: Loaded configuration file as a dictionary.

    :return PySightOutput: An object containing the relevant data,\
    if "memory" option was checked in the GUI.
    """
    if config["outputs"]["data_filename"].endswith(".lst"):
        relevant_columns, dict_of_data, lst_metadata, fill_frac = _read_lst_file(config)
    else:
        logging.info(f"Reading file {config['outputs']['data_filename']}...")
        with open(config["outputs"]["data_filename"], "rb") as f:
            dict_of_data = pickle.load(f)
        lst_metadata = dict()
        relevant_columns = ["abs_time"]
        fill_frac = config["advanced"]["fill_frac"]

    validated_data = SignalValidator(
        dict_of_data=dict_of_data,
        num_of_frames=config["image"]["num_of_frames"],
        binwidth=float(config["advanced"]["binwidth"]),
        use_sweeps=config["advanced"]["sweeps_as_lines"],
        delay_between_frames=float(config["advanced"]["frame_delay"]),
        data_to_grab=relevant_columns,
        line_freq=config["advanced"]["line_freq"],
        num_of_lines=config["image"]["x_pixels"],
        bidir=config["advanced"]["bidir"],
        bidir_phase=config["advanced"]["phase"],
        image_soft=config["image"]["imaging_software"],
        laser_freq=config["advanced"]["reprate"],
        lst_metadata=lst_metadata,
    )

    validated_data.run()

    photon_df = PhotonDF(
        dict_of_data=validated_data.dict_of_data,
        interleaved=config["advanced"]["interleaved"],
    )
    photons = photon_df.run()
    tag_bit_parser = ParseTAGBits(
        dict_of_data=validated_data.dict_of_data,
        photons=photons,
        use_tag_bits=config["tagbits"]["tag_bits"],
        bits_dict=config["tagbits"],
    )

    if not config["advanced"]["bidir"]:
        validated_data.dict_of_data = add_bidir_lines(validated_data.dict_of_data)
    analyzed_struct = Allocate(
        bidir=config["advanced"]["bidir"],
        tag_offset=config["advanced"]["tag_offset"],
        laser_freq=float(config["advanced"]["reprate"]),
        binwidth=float(config["advanced"]["binwidth"]),
        tag_pulses=int(config["advanced"]["tag_pulses"]),
        phase=config["advanced"]["phase"],
        keep_unidir=config["advanced"]["keep_unidir"],
        flim=config["advanced"]["flim"],
        censor=config["advanced"]["censor"],
        dict_of_data=validated_data.dict_of_data,
        df_photons=tag_bit_parser.gen_df(),
        tag_freq=float(config["advanced"]["tag_freq"]),
        tag_to_phase=True,
        deinterleave=config["advanced"]["interleaved"],
    )
    analyzed_struct.run()
    data_for_movie = analyzed_struct.df_photons

    del photons
    del photon_df
    if config["advanced"]["interleaved"]:
        logging.warning(
            """Deinterleaving a data channel is currently highly experimental and
            is supported only on data in the PMT1 channel. Inexperienced users
            are highly advised not to use it."""
        )
        deinter = Deinterleave(
            photons=analyzed_struct.df_photons,
            reprate=config["advanced"]["reprate"],
            binwidth=config["advanced"]["binwidth"],
        )
        data_for_movie = deinter.run()
    # Determine type and shape of wanted outputs, and open the file pointers there
    outputs = OutputParser(
        num_of_frames=len(validated_data.dict_of_data["Frames"]),
        flim_downsampling_time=config["advanced"]["flim_downsampling_time"],
        output_dict=config["outputs"],
        filename=config["outputs"]["data_filename"],
        x_pixels=config["image"]["x_pixels"],
        y_pixels=config["image"]["y_pixels"],
        z_pixels=config["image"]["z_pixels"] if analyzed_struct.tag_interp_ok else 1,
        channels=data_for_movie.index.levels[0],
        binwidth=config["advanced"]["binwidth"],
        reprate=config["advanced"]["reprate"],
        lst_metadata=lst_metadata,
        debug=config["advanced"]["debug"],
    )
    outputs.run()

    line_delta = validated_data.line_delta
    del validated_data
    if config["advanced"]["gating"]:
        logging.warning(
            "Gating is currently not implemented. Please contact package authors."
        )
        # gated = GatedDetection(
        #     raw=analyzed_struct.df_photons, reprate=config['advanced']['reprate'], binwidth=config['advanced']['binwidth']
        # )
        # gated.run()
    # Create a movie object
    volume_chunks = VolumeGenerator(
        frames=analyzed_struct.dict_of_data["Frames"], data_shape=outputs.data_shape
    )
    frame_slices = volume_chunks.create_frame_slices()
    final_movie = Movie(
        data=data_for_movie,
        frames=analyzed_struct.dict_of_data["Frames"],
        frame_slices=frame_slices,
        num_of_frame_chunks=volume_chunks.num_of_chunks,
        reprate=float(config["advanced"]["reprate"]),
        name=config["outputs"]["data_filename"],
        data_shape=outputs.data_shape,
        binwidth=float(config["advanced"]["binwidth"]),
        bidir=config["advanced"]["bidir"],
        fill_frac=fill_frac,
        outputs=outputs.outputs,
        censor=config["advanced"]["censor"],
        mirror_phase=config["advanced"]["phase"],
        lines=analyzed_struct.dict_of_data["Lines"],
        channels=data_for_movie.index.levels[0],
        flim=config["advanced"]["flim"] or config["advanced"]["interleaved"],
        flim_downsampling_space=config["advanced"]["flim_downsampling_space"],
        flim_downsampling_time=config["advanced"]["flim_downsampling_time"],
        lst_metadata=lst_metadata,
        line_delta=int(line_delta),
        tag_as_phase=True,
        tag_freq=float(config["advanced"]["tag_freq"]),
        image_soft=config["image"]["imaging_software"],
        frames_per_chunk=volume_chunks.frames_per_chunk,
    )

    final_movie.run()
    if "memory" in outputs.outputs:
        if "flim" in outputs.outputs:
            flim = final_movie.flim
        else:
            flim = None
        pysight_output = PySightOutput(
            photons=data_for_movie,
            summed_mem=final_movie.summed_mem,
            stack=final_movie.stack,
            channels=data_for_movie.index.levels[0],
            data_shape=outputs.data_shape,
            flim=flim,
            config=config,
        )
        return pysight_output


def _read_lst_file(config: MutableMapping[str, Any]):
    """Read out the data in the .lst file. This function won't be called
    if PySight was called on already-parsed data, i.e. data which came
    from a source other than a multiscaler.
    """
    cur_file = ReadMeta(
        filename=config["outputs"]["data_filename"],
        input_start=config["inputs"]["start"],
        input_stop1=config["inputs"]["stop1"],
        input_stop2=config["inputs"]["stop2"],
        input_stop3=config["inputs"]["stop3"],
        input_stop4=config["inputs"]["stop4"],
        input_stop5=config["inputs"]["stop5"],
        binwidth=config["advanced"]["binwidth"],
        use_sweeps=config["advanced"]["sweeps_as_lines"],
        mirror_phase=config["advanced"]["phase"],
    )
    cur_file.run()
    raw_data_obj = ReadData(
        filename=config["outputs"]["data_filename"],
        start_of_data_pos=cur_file.start_of_data_pos,
        timepatch=cur_file.timepatch,
        is_binary=cur_file.is_binary,
        debug=config["advanced"]["debug"],
    )
    raw_data = raw_data_obj.read_lst()
    if cur_file.is_binary:
        relevant_columns, dict_of_data = binary_parsing(cur_file, raw_data, config)
    else:
        relevant_columns, dict_of_data = ascii_parsing(cur_file, raw_data, config)
    lst_metadata = cur_file.lst_metadata
    fill_frac = (
        config["advanced"]["fill_frac"]
        if cur_file.fill_fraction == -1.0
        else cur_file.fill_fraction
    )
    return relevant_columns, dict_of_data, lst_metadata, fill_frac


def mp_main_data_readout(config: MutableMapping[str, Any]):
    """
    Wrapper for main_data_readout that
    wraps it with a try block. To be used with the
    multiprocessing run option.
    """
    try:
        out = main_data_readout(config)
    except Exception:
        pass
    else:
        return out


def interpret_runtype(fname: str):
    """Read the given file name and interpret what type of run the user wanted.
    If the file name is an existing list file or npz file - simply run PySight
    on it.
    If the file is a glob pattern - run the glob, find all files, and run
    PySight on them.
    If the file is empty - open the GUI and let the user choose it by itself.

    Parameters
    ----------
    :param str fname: The user's chosen file to parse
    """
    if fname is None or fname == "":
        return RunType.GUI
    elif "*" in fname:
        return RunType.BATCH
    else:
        return RunType.SINGLE


[docs]def run(cfg_file: str = None) -> Optional[PySightOutput]: """ Run PySight. :param str cfg_file: Optionally supply an existing configuration filename. Otherwise a GUI will open. :return PySightOutput: Object containing raw and processed data """ from pysight.gui.gui_main import GuiAppLst if cfg_file: with open(cfg_file, "r") as f: config: MutableMapping[str, Any] = toml.load(f) else: gui = GuiAppLst() gui.root.mainloop() config = Config.from_gui(gui).config_data verify_input(config) return main_data_readout(config)
[docs]def run_batch_lst( foldername: str, glob_str: str = "*.lst", recursive: bool = False, cfg_file: str = "", ) -> pd.DataFrame: """ Run PySight on all list files in the folder :param str foldername: - Main folder to run the analysis on. :param str glob_str: String for the `glob` function to filter list files :param bool recursive: Whether the search should be recursive. :param str cfg_file: Name of config file to use :return pd.DataFrame: Record of analyzed data """ from pysight.gui.gui_main import GuiAppLst path = pathlib.Path(foldername) num_of_files = 0 if not path.exists(): raise UserWarning(f"Folder {foldername} doesn't exist.") if recursive: all_lst_files = path.rglob(glob_str) logging.info(f"Running PySight on the following files:") for file in list(all_lst_files): logging.info(str(file)) num_of_files += 1 all_lst_files = path.rglob(glob_str) else: all_lst_files = path.glob(glob_str) logging.info(f"Running PySight on the following files:") for file in list(all_lst_files): logging.info(str(file)) num_of_files += 1 all_lst_files = path.glob(glob_str) data_columns = ["fname", "done", "error"] data_record = pd.DataFrame( np.zeros((num_of_files, 3)), columns=data_columns ) # store result of PySight try: with open(cfg_file, "r") as f: config = toml.load(f) config["outputs"]["data_filename"] = ".lst" except (TypeError, FileNotFoundError): gui = GuiAppLst() gui.root.mainloop() gui.filename.set(".lst") # no need to choose a list file config = Config.from_gui(gui).config_data verify_input(config) try: for idx, lst_file in enumerate(all_lst_files): config["outputs"]["data_filename"] = str(lst_file) data_record.loc[idx, "fname"] = str(lst_file) try: main_data_readout(config) except BaseException as e: logging.warning( f"File {str(lst_file)} returned an error. Moving onwards." ) data_record.loc[idx, "done"] = False data_record.loc[idx, "error"] = repr(e) else: data_record.loc[idx, "done"] = True data_record.loc[idx, "error"] = None except TypeError as e: logging.error(repr(e)) logging.info(f"Summary of batch processing:\n{data_record}") return data_record
[docs]def mp_batch( foldername, glob_str="*.lst", recursive=False, n_proc=None, cfg_file: str = "" ): """ Run several instances of PySight using the multiprocessing module. :param str foldername: Folder to scan :param str glob_str: Glob string to filter files :param bool recursive: Whether to scan subdirectories as well :param int n_proc: Number of processes to use (None means all) :param str cfg_file: Configuration file name """ import multiprocessing as mp from copy import deepcopy from pysight.gui.gui_main import GuiAppLst path = pathlib.Path(foldername) if not path.exists(): raise UserWarning(f"Folder {foldername} doesn't exist.") try: with open(cfg_file, "r") as f: config = toml.load(f) config["outputs"]["data_filename"] = ".lst" except (TypeError, FileNotFoundError): gui = GuiAppLst() gui.root.mainloop() gui.filename.set(".lst") # no need to choose a list file config = Config.from_gui(gui).config_data verify_input(config) if recursive: all_lst_files = path.rglob(glob_str) else: all_lst_files = path.glob(glob_str) all_cfgs = [] logging.info(f"Running PySight on the following files:") for file in all_lst_files: logging.info(str(file)) config["outputs"]["data_filename"] = str(file) all_cfgs.append(deepcopy(config)) with mp.Pool(n_proc) as pool: pool.map(mp_main_data_readout, all_cfgs)
if __name__ == "__main__": out = run()