Source code for est.core.monotonic

from typing import List
from typing import Sequence
from typing import Tuple

import numpy


[docs] def split_piecewise_monotonic(values: Sequence[float]) -> List[slice]: """This method returns the disjoint slices representing monotonic sections. The provided values are assumed to be piecewise monotonic which means that it consists of sections that are either monotonically increasing (∀ i ≤ j, x[i] ≤ x[j]) or monotonically decreasing (∀ i ≤ j, x[i] ≥ x[j]). Note that a list of identical values is both monotonically increasing and decreasing. """ section_sizes = [] section_slopes = [] values_left = values while len(values_left): section_size, section_slope = _first_monotonic_size(values_left) section_sizes.append(section_size) section_slopes.append(section_slope) values_left = values_left[section_size:] monotone_start = [0] monotone_stop = [] monotone_step = [] last_section_index = len(section_sizes) - 1 stops = numpy.cumsum(section_sizes) for section_index, (stop, section_slope) in enumerate(zip(stops, section_slopes)): monotone_step.append(1 if section_slope >= 0 else -1) if section_index == last_section_index: monotone_stop.append(stop) break # The pivot index is the index at which either # - slope>=0 with the previous index and slope<0 with the next index # - slope<=0 with the previous index and slope>0 with the next index pivot_index = stop - 1 if values[pivot_index - 1] == values[pivot_index]: # The pivot index ■ is part of the next section # # ● ─ ■ # / \ -> same slope up and down # ● ● # monotone_stop.append(stop - 1) monotone_start.append(stop - 1) continue # Determine whether the pivot index is part of this section or # be part of the next section slope_before = values[pivot_index] - values[pivot_index - 1] diff_slope_before = abs(section_slope - slope_before) slope_after = values[pivot_index + 1] - values[pivot_index] next_section_slope = section_slopes[section_index + 1] diff_slope_after = abs(next_section_slope - slope_after) if diff_slope_after <= diff_slope_before: # The pivot index ■ is part of the next section # # ■ # ● \ -> slope after matches the next section slope better # / ● then the slope before matches the current section slope # ● \ # / ● # monotone_stop.append(stop - 1) monotone_start.append(stop - 1) else: # The pivot index ■ is part of this section # # ■ # / ● -> slope before matches the current section slope better # ● \ than the slope after matches the next section slope # / ● # ● \ # monotone_stop.append(stop) monotone_start.append(stop) return [ _create_slice(start, stop, step) for start, stop, step in zip(monotone_start, monotone_stop, monotone_step) ]
def _create_slice(start: int, stop: int, step: int): if step >= 0: return slice(start, stop, step) assert start >= 0 assert stop >= 0 start, stop = stop, start start -= 1 if stop == 0: stop = None else: stop -= 1 return slice(start, stop, step) def _mean_nonzero(values: Sequence[float]) -> float: """Average non-zero value. Returns `nan` for an empty list and `0.` when all values are zero. """ values = numpy.asarray(values) if not values.size: return numpy.nan non_zero = values != 0 if non_zero.any(): return numpy.mean(values[non_zero]) return 0.0 def _first_monotonic_size(values: Sequence[float]) -> Tuple[int, int]: """Determine the length of the first monotonically increasing or decreasing slice starting from the first value. """ slopes = numpy.diff(values) maxsize = len(values) if maxsize < 3: return maxsize, _mean_nonzero(slopes) slope_signs = numpy.sign(slopes) first_slope_sign = slope_signs[0] for value_idx, slope_sign in enumerate(slope_signs[1:], 1): if slope_sign and first_slope_sign and slope_sign != first_slope_sign: # Non-zero slope changed sign: end of monotonic series return value_idx + 1, _mean_nonzero(slopes[:value_idx]) if not first_slope_sign: first_slope_sign = slope_sign return maxsize, _mean_nonzero(slopes)