Source code for pysight.nd_hist_generator.volume_gen

import attr
from attr.validators import instance_of
import pandas as pd
import numpy as np
import itertools
from typing import List, Generator


[docs]@attr.s(slots=True) class VolumeGenerator: """ Generate the list of volume chunks to be processed. Main method is "create_frame_slices", which returns a generator containing slice objects that signify the chunks of volumes to be processed simultaneously. Inputs: :param frames pd.DataFrame: Frames for the entire dataset. Should not contain a closing, right-edge, frame. :param data_shape tuple: Shape of the final n-dimensional array (from the Output object) :param MAX_BYTES_ALLOWED int: Number of bytes that can be held in RAM ("magic number") """ frames = attr.ib(validator=instance_of(pd.Series), repr=False) data_shape = attr.ib(validator=instance_of(tuple)) MAX_BYTES_ALLOWED = attr.ib(default=int(300e6), validator=instance_of(int)) num_of_frames = attr.ib(init=False) bytes_per_frames = attr.ib(init=False) full_frame_chunks = attr.ib(init=False) frame_slices = attr.ib(init=False) frame_per_chunk = attr.ib(init=False) num_of_chunks = attr.ib(init=False)
[docs] def create_frame_slices(self, create_slices=True) -> Generator: """ Main method for the pipeline. Returns a generator with slices that signify the start time and end time of all frames. :param create_slices bool: Used for testing, always keep true. """ self.bytes_per_frames = np.prod(self.data_shape[1:]) * 8 self.frame_per_chunk = int(max(1, self.MAX_BYTES_ALLOWED // self.bytes_per_frames)) self.num_of_frames = len(self.frames) self.num_of_chunks = int(max(1, len(self.frames) // self.frame_per_chunk)) self.full_frame_chunks = self.__grouper() if create_slices: self.frame_slices = self.__generate_frame_slices() return self.frame_slices
def __grouper(self) -> Generator: """ Chunk volume times into maximal-sized groups of values. grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx """ args = [iter(self.frames.values)] * self.frame_per_chunk return itertools.zip_longest(*args, fillvalue=np.nan) def __generate_frame_slices(self) -> Generator: if self.frame_per_chunk == 1: start, end = itertools.tee(self.full_frame_chunks) next(end, None) return (slice(s, e) for s, e in zip(start, end)) start_and_end = [] for chunk in self.full_frame_chunks: first, last = chunk[0], chunk[-1] if np.isnan(last): for val in reversed(chunk[:-1]): if val is not np.nan: last = val break start_and_end.append((first, last)) return (slice(np.uint64(t[0]), np.uint64(t[1])) for t in start_and_end)