Source code for est.core.process.pymca.ft

"""wrapper to pymca `ft` process"""

import logging
import numpy
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from est.core.process.process import Process
from est.core.types import XASObject, Spectrum

_logger = logging.getLogger(__name__)


[docs] def process_spectr_ft( spectrum, configuration, overwrite=True, callbacks=None, output=None, output_dict=None, ): """ :param spectrum: spectrum to process :type: :class:`.Spectrum` :param configuration: configuration of the pymca normalization :type: dict :param overwrite: False if we want to return a new Spectrum instance :type: bool :param callback: callback to execute. :param output: list to store the result, needed for pool processing :type: multiprocessing.manager.list :param output_dict: key is input spectrum, value is index in the output list. :type: dict :return: processed spectrum :rtype: tuple (configuration, spectrum) """ _logger.debug( "start fourier transform on spectrum (%s, %s)" % (spectrum.x, spectrum.y) ) pymca_xas = XASClass() if spectrum.energy is None or spectrum.mu is None: _logger.error( "Energy and or Mu is/are not specified, unable to " "compute exafs" ) return None, None if configuration is not None: pymca_xas.setConfiguration(configuration) pymca_xas.setSpectrum(energy=spectrum.energy, mu=spectrum.mu) if spectrum.chi is None: _logger.warning( "exafs has not been processed yet, unable to process" "fourier transform" ) return None, None if "EXAFSNormalized" not in spectrum.pymca_dict: _logger.warning("ft window need to be defined first") return None, None cleanMu = spectrum.chi kValues = spectrum.k dataSet = numpy.zeros((cleanMu.size, 2), float) dataSet[:, 0] = kValues dataSet[:, 1] = cleanMu set2 = dataSet.copy() set2[:, 1] = spectrum.pymca_dict["EXAFSNormalized"] k_min = spectrum.pymca_dict.get("KMin", spectrum.k.max()) k_max = spectrum.pymca_dict.get("KMax", spectrum.k.min()) # remove points with k<2 goodi = (set2[:, 0] >= k_min) & (set2[:, 0] <= k_max) set2 = set2[goodi, :] if set2.size == 0: ft = {"FTImaginary": numpy.nan, "FTIntensity": numpy.nan, "FTRadius": numpy.nan} else: ft = pymca_xas.fourierTransform( set2[:, 0], set2[:, 1], kMin=spectrum.pymca_dict["KMin"], kMax=spectrum.pymca_dict["KMax"], ) assert "FTIntensity" in ft assert "FTRadius" in ft assert ft["FTRadius"] is not None assert ft["FTIntensity"] is not None if callbacks: for callback in callbacks: callback() if not overwrite: spectrum = Spectrum.from_dict(spectrum.to_dict()) spectrum.ft = ft if output is not None: assert output_dict is not None output[output_dict[spectrum]] = spectrum return configuration, spectrum
[docs] def pymca_ft(xas_obj, **optional_inputs): """ :param xas_obj: object containing the configuration and spectra to process :type: Union[XASObject, dict] :return: spectra dict :rtype: dict """ process = PyMca_ft(inputs={"xas_obj": xas_obj, **optional_inputs}) process.run() return process.get_output_value("xas_obj", None)
[docs] class PyMca_ft( Process, name="ft", input_names=["xas_obj"], output_names=["xas_obj"], optional_input_names=["ft"], ):
[docs] def set_properties(self, properties): if "_pymcaSettings" in properties: self._settings = properties["_pymcaSettings"]
[docs] def run(self): """ :param xas_obj: object containing the configuration and spectra to process :type: Union[XASObject, dict] :return: spectra dict :rtype: dict """ xas_obj = self.inputs.xas_obj if xas_obj is None: raise ValueError("xas_obj should be provided") _xas_obj = self.getXasObject(xas_obj=xas_obj) if self.inputs.ft: self.setConfiguration(self.inputs.ft) _xas_obj.configuration["FT"] = self.inputs.ft self._advancement.reset(max_=_xas_obj.n_spectrum) self._advancement.startProcess() self._pool_process(xas_obj=_xas_obj) self._advancement.endProcess() assert hasattr(_xas_obj.spectra.data.flat[0], "ft") assert hasattr(_xas_obj.spectra.data.flat[0].ft, "intensity") assert hasattr(_xas_obj.spectra.data.flat[0].ft, "imaginary") self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj): assert isinstance(xas_obj, XASObject) n_s = len(xas_obj.spectra.data.flat) for i_s, spectrum in enumerate(xas_obj.spectra): process_spectr_ft( spectrum=spectrum, configuration=xas_obj.configuration, callbacks=self.callbacks, overwrite=True, ) self.progress = i_s / n_s * 100.0
[docs] def definition(self): return "fourier transform"
[docs] def program_version(self): import PyMca5 return PyMca5.version()
[docs] @staticmethod def program_name(): return "pymca_ft"