Source code for est.tests.test_acquisition

import time
import pytest

try:
    import larch
except ImportError:
    larch = None

from est.io.utils import get_data_from_url
from .nxwriter import nxwriter_process
from .test_example_workflows import assert_execution


[docs] @pytest.mark.parametrize("period", [0.01, 0.1, 0.5]) def test_get_hdf5_data(period, spectrum_cu_from_larch, tmpdir): filename = str(tmpdir / "data.h5") scan = "1.1" npoints = len(spectrum_cu_from_larch.energy) positioners = {"energy": spectrum_cu_from_larch.energy} detectors = {"mu": spectrum_cu_from_larch.mu} npoints = spectrum_cu_from_larch.energy.size blocksize = max(int(npoints / 10), 1) est_time = npoints / blocksize * period tmax = time.time() + est_time + 5 print("Estimated scan time", est_time, "s") urls = [ f"silx://{filename}::/{scan}/measurement/energy", f"silx://{filename}::/{scan}/measurement/mu", ] with nxwriter_process(filename, scan, positioners, detectors, blocksize, period): nprogress = 0 while nprogress != npoints: data = [get_data_from_url(url, retry_timeout=3) for url in urls] progress = [len(arr) for arr in data] print("Reader point progress", progress) nprogress = min(progress) if time.time() > tmax: raise TimeoutError time.sleep(period / 10)
[docs] @pytest.mark.skipif(larch is None, reason="larch is not installed") @pytest.mark.parametrize("period", [0.01, 0.1, 0.5]) def test_live_example_bm23(period, example_bm23, spectrum_cu_from_larch, tmpdir): filename = str(tmpdir / "data.h5") scan = "1.1" npoints = len(spectrum_cu_from_larch.energy) positioners = {"energy": spectrum_cu_from_larch.energy} detectors = {"mu": spectrum_cu_from_larch.mu} npoints = spectrum_cu_from_larch.energy.size blocksize = max(int(npoints / 10), 1) est_time = npoints / blocksize * period tmax = time.time() + est_time + 5 print("Estimated scan time", est_time, "s") input_information = { "channel_url": f"silx://{filename}::/{scan}/measurement/energy", "spectra_url": f"silx://{filename}::/{scan}/measurement/mu", "energy_unit": "electron_volt", } with nxwriter_process(filename, scan, positioners, detectors, blocksize, period): nprogress = 0 while nprogress != npoints: try: result = assert_execution( example_bm23, list(), input_information, tmpdir ) except RuntimeError: pass # Task error else: spectrum = next(iter(result["xas_obj"].spectra.data.flat)) progress = [result["xas_obj"].energy.size, spectrum.mu.size] print("Reader point progress", progress) nprogress = min(progress) if time.time() > tmax: break time.sleep(period / 10)