Source code for est.core.process.noise

import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Sequence

import numpy
import scipy.signal

from ..types.spectrum import Spectrum
from ..types.xasobject import XASObject
from .base import Process
from .handle_nan import array_undo_mask

_logger = logging.getLogger(__name__)


[docs] def process_noise_savgol( spectrum: Spectrum, configuration: Dict[str, Any], overwrite: bool = True, callbacks: Optional[Sequence[Callable[[], None]]] = None, ) -> Spectrum: """ :param spectrum: spectrum to process. :param configuration: configuration of the pymca normalization. :param overwrite: `False` if we want to return a new Spectrum instance. :param callbacks: callbacks to execute after processing. :return: processed spectrum. """ _logger.debug( "start noise with Savitsky-Golay on spectrum (%s, %s)", spectrum.x, spectrum.y ) if "noise" in configuration: configuration = configuration["noise"] if "window_size" not in configuration: raise ValueError("`window_size` should be specify. Missing in configuration") else: window_size = configuration["window_size"] if "polynomial_order" not in configuration: raise ValueError( "`polynomial_order` should be specify. Missing in configuration" ) else: polynomial_order = configuration["polynomial_order"] if spectrum.e0 is None: raise ValueError( "e0 is None. Unable to compute noise. (e0 is determine in pre_edge. You must run it first)" ) mask = numpy.isfinite(spectrum.mu) # compute noise. This is always done over the full spectrum # get e_min and e_max. Those will be try: smooth_spectrum = scipy.signal.savgol_filter( spectrum.mu[mask], window_size, polynomial_order ) except ValueError as e: if sum(mask) >= 10: raise _logger.warning( "savgol_filter failed (%s). Less than 10 valid data points so return empty result.", e, ) mask = numpy.array([]) smooth_spectrum = numpy.array([]) smooth_spectrum = array_undo_mask(smooth_spectrum, mask) noise = numpy.absolute(spectrum.mu - smooth_spectrum) # compute e_min and e_max. those are provided relative to the edge e_min = configuration.get("e_min", None) e_max = configuration.get("e_max", None) if e_min is None: if spectrum.energy.size == 0: e_min = numpy.nan else: e_min = spectrum.energy.min() else: e_min += spectrum.e0 if e_max is None: if spectrum.energy.size == 0: e_max = numpy.nan else: e_max = spectrum.energy.max() else: e_max += spectrum.e0 if not overwrite: spectrum = spectrum.model_copy(deep=True) spectrum.noise_savgol = noise spectrum.noise_e_min = e_min spectrum.noise_e_max = e_max if mask.size: mask = mask & (spectrum.energy > e_min) & (spectrum.energy < e_max) if mask.any(): if spectrum.edge_step is None: raise ValueError( "edge_step is None. Unable to compute noise. (edge_step is determine in pre_edge. You must run it first)" ) spectrum.raw_noise_savgol = numpy.mean(noise[mask]) spectrum.norm_noise_savgol = spectrum.raw_noise_savgol / spectrum.edge_step else: # Uses nan's instead of raising an exception. Otherwise we have much more failing # workflows online (either E0 is wrong or the scan has not progressed past Emin, # which is after the edge so the plots are fine). spectrum.raw_noise_savgol = numpy.nan spectrum.norm_noise_savgol = numpy.nan if callbacks: for callback in callbacks: callback() return spectrum
[docs] class NoiseProcess( Process, input_names=["xas_obj"], optional_input_names=["window_size", "polynomial_order", "e_min", "e_max"], output_names=["xas_obj"], ):
[docs] def run(self): xas_obj = self.getXasObject(xas_obj=self.inputs.xas_obj) parameters = { "window_size": self.get_input_value("window_size", 5), "polynomial_order": self.get_input_value("polynomial_order", 2), } if not self.missing_inputs.e_min: parameters["e_min"] = self.inputs.e_min if not self.missing_inputs.e_max: parameters["e_max"] = self.inputs.e_max xas_obj.configuration["noise"] = parameters self.progress = 0.0 self._pool_process(xas_obj=xas_obj) self.progress = 100.0 self.outputs.xas_obj = xas_obj
def _pool_process(self, xas_obj: XASObject): n_s = len(xas_obj.spectra.data.flat) for i_s, spectrum in enumerate(xas_obj.spectra): process_noise_savgol( spectrum=spectrum, configuration=xas_obj.configuration, callbacks=self.callbacks, overwrite=True, ) self.progress = i_s / n_s * 100.0
[docs] def definition(self) -> str: return "noise using Savitsky-Golay algorithm"
[docs] @staticmethod def program_name() -> str: return "noise_savgol"