Source code for est.tests.nxwriter

from typing import Dict
from time import sleep
import multiprocessing
from contextlib import contextmanager


import h5py
from numpy.typing import ArrayLike


[docs] def nxwriter( filename, scan: str, positioners: Dict[str, ArrayLike], detectors: Dict[str, ArrayLike], blocksize: int, period: float, ): """Write data in chunks of `blocksize` every `period` seconds""" with h5py.File(filename, "a") as h5f: h5f.attrs["NX_class"] = "NXroot" entry = h5f.create_group(scan) entry.attrs["NX_class"] = "NXentry" entry["title"] = "test EXAFS scan" instrument = entry.create_group("instrument") instrument.attrs["NX_class"] = "NXinstrument" measurement = entry.create_group("measurement") measurement.attrs["NX_class"] = "NXconnection" datasets = list() for name, values in positioners.items(): grp = instrument.create_group(name) grp.attrs["NX_class"] = "NXpositioner" dset = grp.create_dataset( "value", shape=(0,), dtype=values.dtype, maxshape=(None,) ) datasets.append((values, dset)) if name == "energy": dset.attrs["units"] = "eV" measurement[name] = h5py.SoftLink(dset.name) for name, values in detectors.items(): grp = instrument.create_group(name) grp.attrs["NX_class"] = "NXdetector" dset = grp.create_dataset( "data", shape=(0,), dtype=values.dtype, maxshape=(None,) ) datasets.append((values, dset)) measurement[name] = h5py.SoftLink(dset.name) ndone = 0 ndatasets = len(datasets) while ndone != ndatasets: ndone = 0 for values, dset in datasets: istart = dset.size istop = istart + blocksize data = values[istart:istop] if data.size == 0: ndone += 1 continue istop = istart + data.size dset.resize(istop, axis=0) dset[istart:istop] = data h5f.flush() print("NXwriter: flushed and sleep for", period, "sec") sleep(period)
[docs] @contextmanager def nxwriter_process(*args, timeout=3, **kw): p = multiprocessing.Process(target=nxwriter, args=args, kwargs=kw) p.start() try: yield finally: p.join(timeout) if p.is_alive(): p.kill() p.join()