Source code for est.core.types.spectrum

from typing import Union
from typing import Iterable
import logging

import numpy
import pint
from est.units import ur

_logger = logging.getLogger(__name__)


[docs] class Spectrum: """ Core object to be used to store larch and pymca results. Larch is using 'Group' to store the results and adds members to this group according to the different treatment. Pymca is using a dictionary to store the results. This class has to adpat to both behaviors and the different naming convention as well. :param numpy.ndarray (1D) energy: beam energy :param numpy.ndarray (1D) mu: beam absorption :param int x: x index on the spectra :param int y: y index on the spectra """ _MU_KEY = "Mu" _ENERGY_KEY = "Energy" _NORMALIZED_MU_KEY = "NormalizedMu" _NORMALIZED_ENERGY_KEY = "NormalizedEnergy" _POST_EDGE_KEY = "post_edge" _EDGE_STEP_KEY = "edge_step" _CHI_KEY = "Chi" _K_KEY = "K" _FT_KEY = "FT" _EDGE_KEY = "Edge" _NORMALIZED_BACKGROUND_KEY = "NormalizedBackground" _X_POS_KEY = "XPos" _Y_POS_KEY = "YPos" _PYMCA_DICT_KEY = "pymca_dict" _LARCH_DICT_KEY = "larch_dict" def __init__( self, energy: Union[None, numpy.ndarray] = None, mu: Union[None, numpy.ndarray] = None, x: Union[None, int] = None, y: Union[None, int] = None, ): super().__init__() if energy is not None: assert isinstance(energy, (numpy.ndarray, pint.Quantity)) if isinstance(energy, numpy.ndarray): energy = energy * ur.eV self.__x = x self.__y = y # properties self.__energy = None self.__mu = None self.__chi = None self.__k_values = None self.__normalized_mu = None self.__flatten_mu = None self.__normalized_energy = None self.__pre_edge = None self.__post_edge = None self.__edge_step = None self.__e0 = None self.__noise_savgol = None self.__norm_noise_savgol = None self.__raw_noise_savgol = None self.__other_parameters = {} self.ft = {} self.__pymca_dict = {} # pymca dict use to store some processing information specific to pymca self.__larch_dict = {} # TODO: this should be removed as the item setting as a dict has been removed self.__key_mapper = { self._MU_KEY: self.__class__.mu, self._ENERGY_KEY: self.__class__.energy, self._NORMALIZED_MU_KEY: self.__class__.normalized_mu, self._NORMALIZED_ENERGY_KEY: self.__class__.normalized_energy, self._POST_EDGE_KEY: self.__class__.post_edge, self._NORMALIZED_BACKGROUND_KEY: self.__class__.pre_edge, self._FT_KEY: self.__class__.ft, self._EDGE_KEY: self.__class__.e0, self._EDGE_STEP_KEY: self.__class__.edge_step, self._CHI_KEY: self.__class__.chi, self._K_KEY: self.__class__.k, self._PYMCA_DICT_KEY: self.__class__.pymca_dict, self._LARCH_DICT_KEY: self.__class__.larch_dict, } self.energy = energy self.mu = mu @property def energy(self) -> Union[None, numpy.ndarray]: """Energy in eV. :note: cannot be a Quantity because uses directly by xraylarch and pymca """ return self.__energy @energy.setter @ur.wraps(None, (None, ur.eV), strict=False) def energy(self, energy): self.__energy = energy @property def mu(self) -> Union[None, numpy.ndarray]: return self.__mu @mu.setter def mu(self, mu: numpy.ndarray): assert isinstance(mu, numpy.ndarray) or mu is None self.__mu = mu @property def x(self) -> Union[None, int]: return self.__x @property def y(self) -> Union[None, int]: return self.__y @property def chi(self) -> Union[None, numpy.ndarray]: return self.__chi @chi.setter def chi(self, chi: numpy.ndarray): self.__chi = chi @property def k(self) -> Union[None, numpy.ndarray]: return self.__k_values @k.setter def k(self, k: numpy.ndarray): self.__k_values = k @property def normalized_mu(self) -> Union[None, numpy.ndarray]: return self.__normalized_mu @normalized_mu.setter def normalized_mu(self, mu: numpy.ndarray): assert isinstance(mu, numpy.ndarray) or mu is None self.__normalized_mu = mu @property def flatten_mu(self) -> Union[None, numpy.ndarray]: return self.__flatten_mu @flatten_mu.setter def flatten_mu(self, mu: numpy.ndarray): assert isinstance(mu, numpy.ndarray) or mu is None self.__flatten_mu = mu @property def normalized_energy(self) -> Union[None, numpy.ndarray]: return self.__normalized_energy @normalized_energy.setter def normalized_energy(self, energy: Union[numpy.ndarray, pint.Quantity]): if not (isinstance(energy, (numpy.ndarray, pint.Quantity)) or energy is None): raise TypeError( f"energy is expected to be None or a numpy array or a Quantity. Not {type(energy)}." ) self.__normalized_energy = energy @property def pre_edge(self) -> Union[None, numpy.ndarray]: return self.__pre_edge @pre_edge.setter def pre_edge(self, value: numpy.ndarray): self.__pre_edge = value @property def post_edge(self) -> Union[None, numpy.ndarray]: return self.__post_edge @post_edge.setter def post_edge(self, value: numpy.ndarray): self.__post_edge = value @property def edge_step(self): return self.__edge_step @edge_step.setter def edge_step(self, value): self.__edge_step = value @property def e0(self) -> Union[None, numpy.ndarray]: return self.__e0 @e0.setter def e0(self, e0: numpy.ndarray): self.__e0 = e0 @property def noise_savgol(self): return self.__noise_savgol @noise_savgol.setter def noise_savgol(self, values): self.__noise_savgol = values @property def raw_noise_savgol(self): return self.__raw_noise_savgol @raw_noise_savgol.setter def raw_noise_savgol(self, noise): self.__raw_noise_savgol = noise @property def norm_noise_savgol(self): return self.__norm_noise_savgol @norm_noise_savgol.setter def norm_noise_savgol(self, values): self.__norm_noise_savgol = values @property def ft(self): return self.__ft @ft.setter def ft(self, ft): if isinstance(ft, _FT): self.__ft = ft else: self.__ft = _FT(ddict=ft) @property def r(self) -> Union[None, numpy.ndarray]: # this alias is needed for larch return self.__ft["FTRadius"] @r.setter def r(self, value: numpy.ndarray): # this alias is needed for larch self.__ft["FTRadius"] = value @property def chir_mag(self) -> Union[None, numpy.ndarray]: # this alias is needed for larch return self.__ft["FTIntensity"] @chir_mag.setter def chir_mag(self, value: numpy.ndarray): # this alias is needed for larch self.__ft["FTIntensity"] = value @property def pymca_dict(self): return self.__pymca_dict @pymca_dict.setter def pymca_dict(self, ddict: dict): assert isinstance(ddict, dict) self.__pymca_dict = ddict @property def larch_dict(self): return self.__larch_dict @larch_dict.setter def larch_dict(self, ddict: dict): assert isinstance(ddict, dict) self.__larch_dict = ddict @property def shape(self) -> tuple: _energy_len = 0 if self.__energy is not None: _energy_len = len(self.__energy) _mu_len = 0 if self.__mu is not None: _mu_len = len(self.__mu) return (_energy_len, _mu_len)
[docs] def extra_keys(self) -> tuple: return tuple(self.__other_parameters.keys())
[docs] def load_from_dict(self, ddict: dict): assert isinstance(ddict, dict) def value_is_none(value): if hasattr(value, "decode"): value = value.decode("UTF-8") if isinstance(value, str): return value == "None" else: return value is None for key, value in ddict.items(): if key in self.__key_mapper: prop = self.__key_mapper[key] if value_is_none(value=value): prop.fset(self, None) else: prop.fset(self, value) else: _logger.warning(f"Unable to set value for key {key}. Will be ignored") return self
[docs] @staticmethod def from_dict(ddict: dict): x_pos = None y_pos = None if Spectrum._X_POS_KEY in ddict: x_pos = ddict.pop(Spectrum._X_POS_KEY) if Spectrum._Y_POS_KEY in ddict: y_pos = ddict.pop(Spectrum._Y_POS_KEY) spectrum = Spectrum(x=x_pos, y=y_pos) return spectrum.load_from_dict(ddict=ddict)
[docs] def to_dict(self) -> dict: res = { self._X_POS_KEY: self.x, self._Y_POS_KEY: self.y, self._MU_KEY: self.mu, self._ENERGY_KEY: self.energy, self._FT_KEY: self.ft.to_dict(), self._NORMALIZED_MU_KEY: ( "None" if self.normalized_mu is None else self.normalized_mu ), self._NORMALIZED_ENERGY_KEY: ( "None" if self.normalized_energy is None else self.normalized_energy ), self._POST_EDGE_KEY: "None" if self.post_edge is None else self.post_edge, self._NORMALIZED_BACKGROUND_KEY: ( "None" if self.pre_edge is None else self.pre_edge ), self._EDGE_KEY: "None" if self.e0 is None else self.e0, self._CHI_KEY: "None" if self.chi is None else self.chi, self._K_KEY: "None" if self.k is None else self.k, self._PYMCA_DICT_KEY: self.pymca_dict, self._LARCH_DICT_KEY: self.larch_dict, } res.update(self.__other_parameters) return res
def __str__(self): def add_info(str_, attr): assert hasattr(self, attr) sub_str = "- " + attr + ": " + str(getattr(self, attr)) + "\n" return str_ + sub_str main_info = "" for info in ( "energy", "mu", "normalized_mu", "normalized_signal", "normalized_energy", ): main_info = add_info(str_=main_info, attr=info) def add_third_info(str_, key): sub_str = ("- " + key + ": " + str(self[key])) + "\n" return str_ + sub_str for key in self.__other_parameters: main_info = add_third_info(str_=main_info, key=key) return main_info
[docs] def update(self, obj): """ Update the contained values from the given obj. :param obj: :type obj: Union[XASObject, dict] """ if isinstance(obj, Spectrum): _obj = obj.to_dict() else: _obj = obj assert isinstance(_obj, dict) for key, value in _obj.items(): if isinstance(value, str) and value == "None": self[key] = None else: self[key] = value
[docs] def get_missing_keys(self, keys: Iterable) -> tuple: """Return missing keys on the spectrum""" missing = [] for key in keys: if key not in self or self[key] is None: missing.append(key) return tuple(missing)
[docs] def keys(self) -> list: keys = list(self.__other_parameters.keys()) keys += list(self.__key_mapper.keys()) return keys
[docs] def copy(self): return Spectrum.from_dict(self.to_dict())
def _force_indexes(self, x, y): """This is protected because it might change display and the indexes should be defined during Spectra or Spectrum construction once for all""" self.__x = x self.__y = y
class _FT: _RADIUS_KEY = "FTRadius" _INTENSITY_KEY = "FTIntensity" _IMAGINARY_KEY = "FTImaginary" def __init__(self, ddict): self.__radius = numpy.nan self.__intensity = numpy.nan self.__imaginary = numpy.nan self.__other_parameters = {} self.__key_mapper = { self._RADIUS_KEY: self.__class__.radius, self._INTENSITY_KEY: self.__class__.intensity, self._IMAGINARY_KEY: self.__class__.imaginary, } if ddict is not None: for key, values in ddict.items(): self[key] = values @property def radius(self): return self.__radius @radius.setter def radius(self, radius): self.__radius = radius @property def intensity(self): return self.__intensity @intensity.setter def intensity(self, intensity): self.__intensity = intensity @property def imaginary(self): return self.__imaginary @imaginary.setter def imaginary(self, imaginery): self.__imaginary = imaginery def __getitem__(self, key): """Need for pymca compatibility""" if key in self.__key_mapper: return self.__key_mapper[key].fget(self) else: return self.__other_parameters[key] def __setitem__(self, key, value): """Need for pymca compatibility""" if key in self.__key_mapper: self.__key_mapper[key].fset(self, value) else: self.__other_parameters[key] = value def __contains__(self, item): return item in self.__key_mapper or item in self.__other_parameters def to_dict(self) -> dict: res = { self._RADIUS_KEY: self.radius, self._INTENSITY_KEY: self.intensity, self._IMAGINARY_KEY: self.imaginary, } res.update(self.__other_parameters) return res def get_missing_keys(self, keys: Iterable) -> tuple: """Return missing keys on the spectrum""" missing = [] for key in keys: if key not in self: missing.append(key) return tuple(missing)