Source code for est.io.io

import logging
from datetime import datetime
from typing import Optional, Union

import numpy
from silx.io.url import DataUrl
from silx.utils.enum import Enum
from silx.io.h5py_utils import File as HDF5File
from silx.io.dictdump import dicttoh5, dicttonx

from est import settings
from est.units import ur
from est.core.types import Spectra
from est.core.types import dimensions as dimensions_mod
from est.io.utils import get_data_from_url
from est.io.utils import get_est_data
from est.io.utils import get_ascii_data
from est.io.utils.ascii import split_ascii_url


_logger = logging.getLogger(__name__)


[docs] class InputType(Enum): ascii_spectrum = "ascii" # signle or multi scan, single spectrum hdf5_spectra = "hdf5" # multi scan, multi spectra
[docs] def load_data( data_url: DataUrl, name: str, dimensions: dimensions_mod.DimensionsType, columns_names: Optional[dict] = None, energy_unit=ur.eV, timeout=settings.DEFAULT_READ_TIMEOUT, ) -> Union[None, dict, numpy.ndarray, Spectra]: """ Load a specific data from a url. Manage the different scheme (silx, fabio, numpy, PyMca, xraylarch) :param data_url: silx DataUrl with path to the data :type: DataUrl :param str name: name of the data we want to load. Should be in ('spectra', 'energy', 'configuration') :param Union[None,dict] columns_names: name of the column to pick for .dat files... Expect key 'mu' and 'energy' to be registered :return: data loaded :rtype: Union[None,dict,numpy.ndarray] """ if data_url is None: return None assert isinstance(data_url, DataUrl) scheme = data_url.scheme().lower() if scheme in ("ascii", "spec", "pymca", "pymca5", "larch", "xraylarch"): return get_ascii_data( data_url, name, columns_names=columns_names, energy_unit=energy_unit ) if scheme == "numpy": return _move_axes_to_standard(numpy.load(data_url.file_path()), dimensions) if scheme == "est": assert name == "spectra" spectra = get_est_data(data_url, retry_timeout=timeout) return Spectra(energy=spectra[0].energy, spectra=spectra) if not data_url.is_valid(): _logger.warning("invalid url for %s: %s", name, data_url) return try: data = get_data_from_url(data_url, retry_timeout=timeout) except ValueError as e: _logger.error(e) return if name == "spectra": if data.ndim == 1: return data.reshape(data.shape[0], 1, 1) elif data.ndim == 3: return _move_axes_to_standard(data, dimensions=dimensions) return data
def _move_axes_to_standard(spectra, dimensions: dimensions_mod.DimensionsType): if isinstance(spectra, Spectra): spectra.data = dimensions_mod.transform_to_standard(spectra.data, dimensions) elif isinstance(spectra, numpy.ndarray): spectra = dimensions_mod.transform_to_standard(spectra, dimensions) return spectra
[docs] def read_xas(information, timeout=settings.DEFAULT_READ_TIMEOUT): """ Read the given spectra, configuration... from the provided input Information :param InputInformation informationUnion: :return: spectra, energy, configuration """ _spectra_url = _get_url(original_url=information.spectra_url, name="spectra") _energy_url = _get_url(original_url=information.channel_url, name="energy") _config_url = information.config_url if type(_config_url) is str and _config_url == "": _config_url = None if not (_config_url is None or isinstance(_config_url, DataUrl)): raise TypeError("given input for configuration is invalid") # build column name columns_names = {} for url, col_name in zip( [information.spectra_url, information.channel_url, information.mu_ref_url], ["mu", "energy", "monitor"], ): if url is not None: columns_names[col_name] = split_ascii_url(url)["col_name"] else: columns_names[col_name] = None # this should be extractable and done in the InputInformation class spectra = load_data( _spectra_url, name="spectra", dimensions=information.dimensions, columns_names=columns_names, energy_unit=information.energy_unit, timeout=timeout, ) energy = load_data( _energy_url, name="energy", dimensions=information.dimensions, columns_names=columns_names, energy_unit=information.energy_unit, timeout=timeout, ) configuration = load_data( _config_url, name="configuration", dimensions=information.dimensions, columns_names=columns_names, energy_unit=information.energy_unit, timeout=timeout, ) if energy is None: raise ValueError("Unable to load energy from {}".format(_energy_url)) if not energy.ndim == 1: raise ValueError("Energy / channel is not 1D") if energy.shape[0] > spectra.shape[0]: energy = energy[: spectra.shape[0]] _logger.warning("energy has less value than spectra. Clip energy.") if not energy.shape[0] == spectra.shape[0]: _logger.warning( "Energy / channel and spectra dim1 have incoherent length (%s vs %s)" % ( energy.shape[0], spectra.shape[0], ) ) if energy.shape[0] < spectra.shape[0]: spectra = spectra[: energy.shape[0]] else: energy = energy[: spectra.shape[0]] not_strictly_increasing = numpy.diff(energy) <= 0 if not_strictly_increasing.any(): _logger.warning("Energy is not strictly increasing: sort data by energy") idx = numpy.argsort(energy) energy = energy[idx] spectra = spectra[idx] has_duplicates = numpy.diff(energy) == 0 if has_duplicates.any(): _logger.warning("Energy has duplicate values: remove duplicates") energy, idx = numpy.unique(energy, return_index=True) spectra = spectra[idx] return spectra, energy * information.energy_unit, configuration
def _get_url(original_url, name): url_ = original_url if isinstance(url_, str): try: url_ = DataUrl(path=url_) except Exception: url_ = DataUrl(file_path=url_, scheme="PyMca") if not isinstance(url_, DataUrl): raise TypeError("given input for {} is invalid ({})".format(name, url_)) return url_
[docs] def write_xas_proc( h5_file, entry, process, results, processing_order, data_path="/", overwrite=True, ): """ Write a xas :class:`.Process` into .h5 :param str h5_file: path to the hdf5 file :param str entry: entry name :param process: process executed :type: :class:`.Process` :param results: process result data :type: numpy.ndarray :param processing_order: processing order of treatment :type: int :param data_path: path to store the data :type: str """ process_name = "xas_process_" + str(processing_order) # write the xasproc default information with HDF5File(h5_file, "a") as h5f: nx_entry = h5f.require_group("/".join((data_path, entry))) nx_entry.attrs["NX_class"] = "NXentry" nx_process = nx_entry.require_group(process_name) nx_process.attrs["NX_class"] = "NXprocess" if overwrite: for key in ( "program", "version", "date", "processing_order", "class_instance", "ft", ): if key in nx_process: del nx_process[key] nx_process["program"] = process.program_name() nx_process["version"] = process.program_version() nx_process["date"] = datetime.now().replace(microsecond=0).isoformat() nx_process["processing_order"] = numpy.int32(processing_order) _class = process.__class__ nx_process["class_instance"] = ".".join((_class.__module__, _class.__name__)) nx_data = nx_entry.require_group("data") nx_data.attrs["NX_class"] = "NXdata" nx_data.attrs["signal"] = "data" nx_process_path = nx_process.name if isinstance(results, numpy.ndarray): data_ = {"data": results} else: data_ = results def get_interpretation(my_data): """Return hdf5 attribute for this type of data""" if isinstance(my_data, numpy.ndarray): if my_data.ndim == 1: return "spectrum" elif my_data.ndim in (2, 3): return "image" return None # save results def save_key(key_path, value, attrs): """Save the given value to the associated path. Manage numpy arrays and dictionaries. """ if attrs is not None: assert value is None, "can save value or attribute not both" if value is not None: assert attrs is None, "can save value or attribute not both" key_path = key_path.replace(".", "/") # save if is dict if isinstance(value, dict): h5_path = "/".join((entry, process_name, key_path)) dicttoh5( value, h5file=h5_file, h5path=h5_path, update_mode="replace", mode="a", ) else: with HDF5File(h5_file, "a") as h5f: nx_process = h5f.require_group(nx_process_path) if attrs is None: if key_path in nx_process: del nx_process[key_path] try: nx_process[key_path] = value except TypeError as e: _logger.warning( "Unable to write at {} reason is {}" "".format(str(key_path), str(e)) ) else: interpretation = get_interpretation(value) if interpretation: nx_process[key_path].attrs[ "interpretation" ] = interpretation else: for key, value in attrs.items(): try: nx_process[key_path].attrs[key] = value except Exception as e: _logger.warning(e) for key, value in data_.items(): if isinstance(key, tuple): key_path = "/".join(("results", key[0])) save_key(key_path=key_path, value=None, attrs={key[1]: value}) else: key_path = "/".join(("results", str(key))) save_key(key_path=key_path, value=value, attrs=None) if process.getConfiguration() is not None: h5_path = "/".join((nx_process_path, "configuration")) dicttoh5( process.getConfiguration(), h5file=h5_file, h5path=h5_path, update_mode="add", mode="a", )
[docs] def write_xas( h5_file, entry, energy, mu, sample=None, start_time=None, data_path="/", title=None, definition=None, overwrite=True, ): """ Write raw date in nexus format :param str h5_file: path to the hdf5 file :param str entry: entry name :param sample: definition of the sample :type: :class:`.Sample` :param energy: beam energy (1D) :type: numpy.ndarray :param mu: beam absorption (2D) :type: numpy.ndarray :param start_time: :param str data_path: :param str title: experiment title :param str definition: experiment definition """ h5path = "/".join((data_path, entry)) nx_dict = { "@NX_class": "NXentry", "monochromator": { "@NX_class": "NXmonochromator", "energy": energy, "energy@interpretation": "spctrum", "energy@NX_class": "NXdata", "energy@unit": "eV", }, "absorbed_beam": { "@NX_class": "NXdetector", "data": mu, "data@interpretation": "image", "data@NX_class": "NXdata", }, "data": { "@NX_class": "NXdata", ">energy": "../monochromator/energy", ">absorbed_beam": "../absorbed_beam/data", }, "start_time": start_time, "title": title, "definition": definition, } if overwrite: mode = "w" update_mode = "replace" else: mode = "a" update_mode = "add" dicttonx(nx_dict, h5_file, h5path=h5path, mode=mode, update_mode=update_mode)
[docs] def write_spectrum_saving_pt(h5_file, entry, obj, overwrite=True): """Save the current status of an est object :param str h5_file: path to the hdf5 file :param str entry: entry name :param obj: object to save. :param str obj_name: name of the object to store :param str data_path: """ dicttoh5(obj, h5file=h5_file, h5path=entry, update_mode="replace", mode="a")
[docs] def get_xasproc(h5_file, entry): """ Return the list of all NXxasproc existing at the data_path level :param str h5_file: hdf5 file :param str entry: data location :return: :rtype: list """ def copy_nx_xas_process(h5_group): """copy base information from nx_xas_process""" res = {} res["_h5py_path"] = h5_group.name relevant_keys = ( "program", "version", "data", "parameters", "processing_order", "configuration", "class_instance", "plots", ) from silx.io.dictdump import h5todict for key in h5_group.keys(): # for now we don't want to copy the numpy array (data) if key in relevant_keys: if key == "configuration": config_path = "/".join((h5_group.name, "configuration")) res[key] = h5todict(h5_file, config_path, asarray=False) elif key == "plots": plots_grp = h5_group["plots"] res[key] = {} for plot_key in plots_grp.keys(): res[key][plot_key] = dict(plots_grp[plot_key].attrs.items()) else: res[key] = h5_group[key][...] return res res = [] with HDF5File(h5_file, "a") as h5f: try: root_group = h5f[entry] except KeyError: _logger.warning(entry + " does not exist in " + h5_file) else: for key in root_group.keys(): elmt = root_group[key] if hasattr(elmt, "attrs") and "NX_class" in elmt.attrs: if elmt.attrs["NX_class"] == "NXprocess": nx_xas_proc = copy_nx_xas_process(elmt) if len(nx_xas_proc) == 0: _logger.warning( "one xas process was not readable " "from the hdf5 file at:" + key ) else: res.append(nx_xas_proc) return res
if __name__ == "__main__": import os from est.core.process.pymca.normalization import PyMca_normalization from est.core.process.pymca.exafs import PyMca_exafs from est.core.types import Sample h5_file = "test_xas_123.h5" if os.path.exists(h5_file): os.remove(h5_file) sample = Sample(name="mysample") data = numpy.random.rand(256 * 20 * 10) data = data.reshape((256, 20, 10)) process_data = numpy.random.rand(256 * 20 * 10).reshape((256, 20, 10)) energy = numpy.linspace(start=3.25, stop=3.69, num=256) write_xas(h5_file=h5_file, entry="scan1", sample=sample, energy=energy, mu=data) process_norm = PyMca_normalization() write_xas_proc( h5_file=h5_file, entry="scan1", process=process_norm, results=process_data, processing_order=1, ) process_exafs = PyMca_exafs() process_data2 = numpy.random.rand(256 * 20 * 10).reshape((256, 20, 10)) write_xas_proc( h5_file=h5_file, entry="scan1", process=process_exafs, results=process_data2, processing_order=2, )