Source code for est.core.split

import logging
import os
from contextlib import contextmanager
from typing import Generator
from typing import List
from typing import Optional

import h5py
import numpy
from silx.io import h5py_utils
from silx.utils.retry import RetryError

from .monotonic import split_piecewise_monotonic
from .sections import split_section_size

_logger = logging.getLogger(__name__)


[docs] def split_bliss_scan( filename: str, scan_number: int, monotonic_channel: str, out_filename: str, subscan_size: Optional[int] = None, trim_n_points: Optional[int] = None, wait_finished: bool = True, counter_group: Optional[str] = None, **retry_args, ) -> List[str]: """Split a Bliss scan in subscans as determined by a channel which is monotonically increasing or descreasing in each subscan or determined by subscan size. :param filename: HDF5 file name containing the Bliss scan. :param scan_number: The Bliss scan number. :param monotonic_channel: HDF5 path relative to the scan group. :param out_filename: HDF5 file name to save subscans as a result of splitting the Bliss scan. :param subscan_size: Fix length subscan size. :param trim_n_points: Trim N points from the start and end of each subscan. :param wait_finished: Wait for the Bliss scan to be complete in HDF5. :param counter_group: Group with counters to determine the number of scan points. :param retry_timeout: Timeout of waiting for the Bliss scan to be complete in HDF5. :param retry_period: Check period of waiting for the Bliss scan to be complete in HDF5. :returns: HDF5 URL's of the subscans as a result of splitting the Bliss scan. """ if subscan_size is None: subscan_size = 0 else: assert subscan_size >= 0 if trim_n_points is None: trim_n_points = 0 else: assert trim_n_points >= 0 entry_name = f"{scan_number}.1" out_urls = [] with _open_scan_entry( filename, entry_name, monotonic_channel, counter_group, wait_finished=wait_finished, **retry_args, ) as nxentry_in: if nxentry_in is None: return [] if wait_finished: finished = True else: finished = "end_time" in nxentry_in if finished: npoints = None else: npoints = _number_of_scan_points(nxentry_in, counter_group) if npoints is None: monotonic_values = nxentry_in[monotonic_channel][()] else: monotonic_values = nxentry_in[monotonic_channel][:npoints] if subscan_size: subscan_slices = split_section_size(monotonic_values, subscan_size) else: subscan_slices = split_piecewise_monotonic(monotonic_values) if not finished: subscan_slices = subscan_slices[:-1] subscan_slices = _select_complete_subscans( nxentry_in, subscan_slices, counter_group ) for subscan_number, subscan_slice in enumerate(subscan_slices, 1): out_url = _save_subscan( nxentry_in, scan_number, subscan_number, out_filename, subscan_slice, trim_n_points, **retry_args, ) out_urls.append(out_url) return out_urls
def _number_of_scan_points( nxentry_in: h5py.Group, counter_group: Optional[str] ) -> Optional[int]: if counter_group and counter_group in nxentry_in: counters = nxentry_in[counter_group] sizes = [counters[name].size for name in counters] if sizes: return min(sizes) def _select_complete_subscans( nxentry_in: h5py.Group, subscan_slices: List[slice], counter_group: Optional[str] ) -> List[slice]: complete_subscan_slices = [] for subscan_slice in subscan_slices: if not _subscan_is_complete(nxentry_in, subscan_slice, counter_group): break complete_subscan_slices.append(subscan_slice) return complete_subscan_slices def _subscan_is_complete( nxentry_in: h5py.Group, subscan_slice: slice, counter_group: Optional[str] ) -> bool: if not counter_group: return True if counter_group not in nxentry_in: return False if subscan_slice.step and subscan_slice.step < 0: nexpected = subscan_slice.start else: nexpected = subscan_slice.stop counters = nxentry_in[counter_group] sizes = [counters[name].size >= nexpected for name in counters] if sizes: return all(sizes) return False @contextmanager def _open_scan_entry( filename: str, entry_name: str, *paths: Optional[str], wait_finished: bool = False, **retry_args, ) -> Generator[Optional[h5py.Group], None, None]: retry_args.setdefault("retry_period", 0.5) if wait_finished: _ = retry_args.setdefault("retry_timeout", 60) else: _ = retry_args.setdefault("retry_timeout", 10) try: with _open_scan_retry( filename, entry_name, *paths, wait_finished=wait_finished, **retry_args ) as nxentry_in: yield nxentry_in except Exception as ex: if wait_finished: raise _logger.warning("%s::%s not complete (%s)", filename, entry_name, ex) yield None @h5py_utils.retry_contextmanager() def _open_scan_retry( filename: str, entry_name: str, *paths: Optional[str], wait_finished: bool = False, ) -> Generator[Optional[h5py.Group], None, None]: with h5py_utils.File(filename) as nxroot_in: retry = True try: nxentry_in = nxroot_in[entry_name] if wait_finished: _ = nxentry_in["end_time"] for path in paths: if path is not None and path not in nxentry_in: retry = False raise RuntimeError( f"Scan {entry_name!r} from {filename!r} is finished but {path!r} does not exist." ) for path in paths: if path is not None: _ = nxentry_in[path] except Exception as e: if retry: raise RetryError from e else: raise yield nxentry_in def _save_subscan( nxentry_in: h5py.Group, scan_number: int, subscan_number: int, out_filename: str, subscan_slice: slice, trim_n_points: int, **retry_args, ) -> str: entry_name = f"{scan_number}.{subscan_number}" out_url = f"silx://{out_filename}::/{entry_name}" dirname = os.path.dirname(out_filename) if dirname: os.makedirs(dirname, exist_ok=True) _ = retry_args.setdefault("retry_timeout", 60) with h5py_utils.open_item( out_filename, "/", mode="a", track_order=True, **retry_args ) as nxroot_out: if entry_name in nxroot_out: _logger.warning("%s::/%s already exists", out_filename, entry_name) return out_url nxentry_out = nxroot_out.create_group(entry_name) _save_subgroup(nxentry_in, nxentry_out, subscan_slice, trim_n_points) return out_url def _save_subgroup( group_in: h5py.Group, group_out: h5py.Group, dim0_slice: slice, trim_n_points: int ) -> None: group_out.attrs.update(group_in.attrs) for name in group_in: link = group_in.get(name, getlink=True) if isinstance(link, h5py.SoftLink): target = _relative_link(group_in, link.path, group_out) group_out[name] = h5py.SoftLink(path=target) continue h5item = group_in[name] if isinstance(h5item, h5py.Group): _save_subgroup( h5item, group_out.create_group(name), dim0_slice, trim_n_points ) continue if isinstance(h5item, h5py.Dataset): if h5item.size > 1: try: data = _slice_dataset(h5item, dim0_slice) except Exception: _logger.warning( "%s with shape %s cannot be sliced by %s", h5item.name, h5item.shape, dim0_slice, ) continue if trim_n_points: data = data[trim_n_points:-trim_n_points] else: data = h5item[()] dset_out = group_out.create_dataset(name, data=data) dset_out.attrs.update(h5item.attrs) continue _logger.warning("%s of type %s is not supported", h5item.name, type(h5item)) def _relative_link( org_parent: h5py.Group, link_target: str, new_parent: h5py.Group ) -> str: parent_path = org_parent.name.replace("/", os.path.sep) link_target_path = link_target.replace("/", os.path.sep) rel_link_target_path = os.path.relpath(link_target_path, parent_path) new_link_target_path = os.path.join(new_parent.name, rel_link_target_path) return os.path.normpath(new_link_target_path) def _slice_dataset(h5dataset: h5py.Dataset, dim0_slice: slice) -> numpy.ndarray: expected_size = (dim0_slice.stop - dim0_slice.start) // dim0_slice.step if dim0_slice.step and dim0_slice.step < 0: start = dim0_slice.stop + 1 stop = dim0_slice.start + 1 step = -dim0_slice.step data = h5dataset[start:stop:step] if len(data) != expected_size: raise ValueError("slice does not have the expected size") return data[::-1] else: data = h5dataset[dim0_slice] if len(data) != expected_size: raise ValueError("slice does not have the expected size") return data