Source code for est.core.process.larch.xftf

"""wrapper to the larch xftf process"""

import logging
from importlib.metadata import version as get_version
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Union

import numpy
from larch.symboltable import Group
from larch.xafs.xafsft import xftf

from ...types.spectrum import Spectrum
from ...types.xasobject import XASObject
from ..base import Process

_logger = logging.getLogger(__name__)


[docs] def process_spectr_xftf( spectrum: Spectrum, configuration: Dict[str, Any], overwrite: bool = True, callbacks: Optional[Sequence[Callable[[], None]]] = None, ) -> Spectrum: """ :param spectrum: spectrum to process. :param configuration: configuration of the pymca normalization. :param overwrite: `False` if we want to return a new Spectrum instance. :param callbacks: callbacks to execute after processing. :return: processed spectrum """ _logger.debug("start xftf on spectrum (%s, %s)", spectrum.x, spectrum.y) # if kmax is not provided take default value kmax = configuration.get("kmax", None) if kmax is None: if spectrum.k.size: configuration["kmax"] = float(max(spectrum.k)) * 0.9 else: configuration["kmax"] = numpy.nan kmin = configuration.get("kmin", None) if kmin is None: if spectrum.k.size: configuration["kmin"] = float(min(spectrum.k)) else: configuration["kmin"] = numpy.nan xftf_kwargs = {} for config_key in ( "kmin", "kmax", "kweight", "dk", "dk2", "with_phase", "window", "rmax_out", "nfft", "kstep", ): if config_key in configuration: xftf_kwargs[config_key] = configuration[config_key] if config_key == "kweight": xftf_kwargs["kw"] = configuration[config_key] res_group = Group() mask = numpy.isfinite(spectrum.chi) with_phase = xftf_kwargs.get("with_phase", False) try: xftf(k=spectrum.k[mask], chi=spectrum.chi[mask], group=res_group, **xftf_kwargs) except (ValueError, IndexError) as e: if sum(mask) >= 10: raise _logger.warning( "Larch xftf failed (%s). Less than 10 valid data points so return empty result.", e, ) res_group.r = numpy.array([]) res_group.chir = numpy.array([]) res_group.chir_mag = numpy.array([]) res_group.chir_re = numpy.array([]) res_group.chir_im = numpy.array([]) if with_phase: res_group.chir_pha = numpy.array([]) if not overwrite: spectrum = spectrum.model_copy(deep=True) spectrum.ft.radius = res_group.r spectrum.ft.intensity = res_group.chir_mag spectrum.ft.real = res_group.chir_re spectrum.ft.imaginary = res_group.chir_im if with_phase: spectrum.ft.phase = res_group.chir_pha else: spectrum.ft.phase = None # handle chi(x) * k**k_weight plot with r max if spectrum.k is not None and spectrum.chi is not None: if "kweight" in xftf_kwargs: kweight = xftf_kwargs["kweight"] else: kweight = 0 spectrum.chi_weighted_k = spectrum.chi * (spectrum.k**kweight) spectrum.larch_dict.xftf_k_weight = kweight spectrum.larch_dict.xftf_k_min = configuration["kmin"] spectrum.larch_dict.xftf_k_max = configuration["kmax"] if callbacks: for callback in callbacks: callback() return configuration, spectrum
[docs] def larch_xftf( xas_obj: Union[XASObject, dict], **optional_inputs ) -> Optional[XASObject]: process = Larch_xftf(inputs={"xas_obj": xas_obj, **optional_inputs}) process.run() return process.get_output_value("xas_obj", None)
[docs] class Larch_xftf( Process, input_names=["xas_obj"], optional_input_names=["xftf_config"], output_names=["xas_obj"], ): """Fourier transform of the XAS fine-structure."""
[docs] def run(self): xas_obj = self.getXasObject(xas_obj=self.inputs.xas_obj) self._advancement.reset(max_=xas_obj.n_spectrum) self._advancement.startProcess() self._pool_process(xas_obj=xas_obj) self._advancement.endProcess() self.outputs.xas_obj = xas_obj
def _pool_process(self, xas_obj): xftf_config = self.get_input_value("xftf_config", dict()) n_s = len(xas_obj.spectra.data.flat) for i_s, spectrum in enumerate(xas_obj.spectra): process_spectr_xftf( spectrum=spectrum, configuration=xftf_config, callbacks=self.callbacks, overwrite=True, ) self.progress = i_s / n_s * 100.0
[docs] def definition(self) -> str: return "xftf calculation"
[docs] def program_version(self) -> str: return get_version("larch")
[docs] @staticmethod def program_name() -> str: return "larch_xftf"