Source code for est.core.process.noise
from est.core.types import XASObject
from .process import Process
import scipy.signal
import logging
import numpy
_logger = logging.getLogger(__name__)
[docs]
def process_noise_savgol(
spectrum,
configuration,
overwrite=True,
callbacks=None,
output=None,
output_dict=None,
):
"""
:param spectrum: spectrum to process
:type: :class:`.Spectrum`
:param configuration: configuration of the pymca normalization
:type: dict
:param overwrite: False if we want to return a new Spectrum instance
:type: bool
:param callbacks: callback to execute.
:param output: list to store the result, needed for pool processing
:type: multiprocessing.manager.list
:param output_dict: key is input spectrum, value is index in the output
list.
:type: dict
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
_logger.debug(
"start noise with Savitsky-Golay on spectrum (%s, %s)"
% (spectrum.x, spectrum.y)
)
if "noise" in configuration:
configuration = configuration["noise"]
if "window_size" not in configuration:
raise ValueError("`window_size` should be specify. Missing in configuration")
else:
window_size = configuration["window_size"]
if "polynomial_order" not in configuration:
raise ValueError(
"`polynomial_order` should be specify. Missing in configuration"
)
else:
polynomial_order = configuration["polynomial_order"]
if spectrum.edge_step is None:
raise ValueError(
"edge_step is None. Unable to compute noise. (edge_step is determine in pre_edge. You must run it first)"
)
if spectrum.e0 is None:
raise ValueError(
"e0 is None. Unable to compute noise. (e0 is determine in pre_edge. You must run it first)"
)
# compute noise. This is always done over the full spectrum
# get e_min and e_max. Those will be
smooth_spectrum = scipy.signal.savgol_filter(
spectrum.mu, window_size, polynomial_order
)
noise = numpy.absolute(spectrum.mu - smooth_spectrum)
spectrum.noise_savgol = noise
# compute e_min and e_max. those are provided relative to the edge
e_min = configuration.get("e_min", None)
e_max = configuration.get("e_max", None)
if e_min is None:
e_min = spectrum.energy.min()
else:
e_min += spectrum.e0
if e_max is None:
e_max = spectrum.energy.max()
else:
e_max += spectrum.e0
spectrum.larch_dict["noise_e_min"] = e_min
spectrum.larch_dict["noise_e_max"] = e_max
mask = (spectrum.energy > e_min) & (spectrum.energy < (e_max))
if mask.any():
spectrum.raw_noise_savgol = numpy.mean(noise[mask])
spectrum.norm_noise_savgol = spectrum.raw_noise_savgol / spectrum.edge_step
else:
# Uses nan's instead of raising an exception. Otherwise we have much more failing workflows online (either E0 is wrong or the scan has not progressed past Emin, which is after the edge so the plots are fine).
spectrum.raw_noise_savgol = numpy.nan
spectrum.norm_noise_savgol = numpy.nan
if callbacks:
for callback in callbacks:
callback()
[docs]
class NoiseProcess(
Process,
name="noise",
input_names=["xas_obj"],
optional_input_names=["window_size", "polynomial_order", "e_min", "e_max"],
output_names=["xas_obj"],
):
[docs]
def run(self):
"""
:param xas_obj: object containing the configuration and spectra to process
:type: Union[:class:`.XASObject`, dict]
:return: spectra dict
:rtype: :class:`.XASObject`
"""
xas_obj = self.inputs.xas_obj
if xas_obj is None:
raise ValueError("xas_obj should be provided")
_xas_obj = self.getXasObject(xas_obj=xas_obj)
parameters = {
"window_size": self.get_input_value("window_size", 5),
"polynomial_order": self.get_input_value("polynomial_order", 2),
}
if not self.missing_inputs.e_min:
parameters["e_min"] = self.inputs.e_min
if not self.missing_inputs.e_max:
parameters["e_max"] = self.inputs.e_max
_xas_obj.configuration["noise"] = parameters
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
assert isinstance(xas_obj, XASObject)
n_s = len(xas_obj.spectra.data.flat)
for i_s, spectrum in enumerate(xas_obj.spectra):
process_noise_savgol(
spectrum=spectrum,
configuration=xas_obj.configuration,
callbacks=self.callbacks,
overwrite=True,
)
self.progress = i_s / n_s * 100.0
[docs]
def definition(self):
return "noise using Savitsky-Golay algorithm"
[docs]
@staticmethod
def program_name():
return "noise_savgol"