Source code for est.core.process.larch.pre_edge

"""wrapper to the larch pre-edge process"""

from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
from larch.xafs.pre_edge import pre_edge
import logging
from larch.symboltable import Group

_logger = logging.getLogger(__name__)


[docs] def process_spectr_pre_edge( spectrum, configuration, overwrite=True, callbacks=None, output=None, output_dict=None, ): """ :param spectrum: spectrum to process :type: :class:`.Spectrum` :param configuration: configuration of the pymca normalization :type: dict :param overwrite: False if we want to return a new Spectrum instance :type: bool :param callbacks: callbacks to execute. :param output: list to store the result, needed for pool processing :type: multiprocessing.manager.list :param output_dict: key is: input spectrum, value is index in the output list. :type: dict :return: processed spectrum :rtype: tuple (configuration, spectrum) """ _logger.debug("start pre_edge on spectrum (%s, %s)" % (spectrum.x, spectrum.y)) assert isinstance(spectrum, Spectrum) if spectrum.energy is None or spectrum.mu is None: _logger.error( "Energy and or Mu is/are not specified, unable to " "compute pre edge" ) return None, None _conf = configuration spectrum.e0 = _conf.get("e0", None) or spectrum.e0 opts = {} for opt_name in ( "z", "edge", "pre1", "pre2", "norm1", "nnorm", "nvict", "step", "make_flat", "norm2", "order", "leexiang", "tables", "fit_erfc", ): if opt_name in _conf: opts[opt_name] = _conf[opt_name] if not overwrite: spectrum = Spectrum.from_dict(spectrum=spectrum) res_group = Group() pre_edge( energy=spectrum.energy, mu=spectrum.mu, group=res_group, e0=spectrum.e0, **opts ) spectrum.e0 = res_group.e0 spectrum.normalized_mu = res_group.norm spectrum.flatten_mu = res_group.flat spectrum.pre_edge = res_group.pre_edge spectrum.post_edge = res_group.post_edge spectrum.edge_step = res_group.edge_step if callbacks: for callback in callbacks: callback() return configuration, spectrum
[docs] def larch_pre_edge(xas_obj, **optional_inputs): """ :param xas_obj: object containing the configuration and spectra to process :type: Union[XASObject, dict] :return: spectra dict :rtype: XASObject """ process = Larch_pre_edge(inputs={"xas_obj": xas_obj, **optional_inputs}) process.run() return process.get_output_value("xas_obj", None)
[docs] class Larch_pre_edge( Process, name="pre_edge", input_names=["xas_obj"], optional_input_names=["pre_edge_config"], output_names=["xas_obj"], ):
[docs] def run(self): xas_obj = self.inputs.xas_obj if xas_obj is None: raise ValueError("xas_obj should be provided") self._xas_obj = self.getXasObject(xas_obj=xas_obj) _xas_obj = self._xas_obj self.progress = 0.0 self._pool_process(xas_obj=_xas_obj) self.progress = 100.0 self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj): assert isinstance(xas_obj, XASObject) pre_edge_config = self.get_input_value("pre_edge_config", dict()) n_s = len(xas_obj.spectra.data.flat) for i_s, spectrum in enumerate(xas_obj.spectra): process_spectr_pre_edge( spectrum=spectrum, configuration=pre_edge_config, callbacks=self.callbacks, overwrite=True, ) self.progress = i_s / n_s * 100.0
[docs] def definition(self): return "pre_edge calculation"
[docs] def program_version(self): import larch.version return larch.version.version_data()["larch"]
[docs] @staticmethod def program_name(): return "larch_pre_edge"