Source code for mnished.hydrograph_separation

"""
mnished.hydrograph_separation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Estimate reservoir timescales and initial storage depths from an observed
discharge time series, without running the hydrological model.

The pipeline has three stages:

1. **Spectral stage** (fallback / diagnostic) — fit a cascade of
   linear-reservoir transfer functions (product of Lorentzians) to the power
   spectral density of Q.  AIC selects the number of fast reservoirs.  This
   stage provides initial fast-component timescale estimates and serves as a
   fallback when the time-domain stage cannot resolve a timescale.

2. **Time-domain stage** (primary) — estimate the fast timescales directly
   from recession behaviour in the time domain (per-segment-minimum recession
   fitting).  These estimates replace the corresponding spectral values when
   available; the spectral values from stage 1 remain as fallbacks.

3. **Recession stage** — remove the fast components with a recursive digital
   filter, leaving a slowly varying residual.  Fit log(Q_residual) vs. time
   on dry-season recession segments to extract τ_karst and its initial storage.

The primary output is physically informed initial conditions and calibration
parameter priors for MNiShed, reducing spin-up requirements and improving
optimizer starting points.

References
----------
Maillet (1905) : multi-exponential recession decomposition of spring flows.
Young (1984)   : recursive estimation and modelling of nonstationary time series.
Jakeman & Hornberger (1993) : IHACRES transfer-function identification.
Kirchner (2009): catchments as simple dynamical systems — spectral approach.
Eckhardt (2005): recursive digital baseflow separation filter.
"""

import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import csd, welch
from scipy.stats import linregress


[docs] class HydrographSeparation: """ Separate observed discharge into reservoir components and estimate timescales and initial storage depths. Parameters ---------- Q : array-like Observed specific discharge [mm/day]. dt : float Timestep [days]. Default 1.0. n_reservoirs : int or None Number of reservoirs. If None, selected by AIC on the spectral fit. The slowest reservoir (karst/deep) is always fitted by recession analysis regardless of this setting. max_reservoirs : int Maximum number of reservoirs to test when n_reservoirs is None. Default 4. precip : array-like or None Daily precipitation [mm/day], same length as Q. Used to mask recession periods; if None, only discharge decline is used. recession_min_duration : int Minimum qualifying recession length [days]. Default 10. recession_precip_threshold : float Maximum daily precipitation within a recession segment [mm/day]. Default 0.5. recession_antecedent_days : int Number of days immediately before a recession that must also be dry. Default 3. tau_deep : float or None Externally supplied timescale [days] for the deep (karst) reservoir. When provided, recession fitting is skipped for τ_karst and this value is used directly. Recommended when τ_karst >> T_record, since a decay too slow to observe cannot be reliably estimated from the discharge record alone. tau_fast : array-like or None Externally supplied timescales [days] for the fast reservoirs (all except the deepest), e.g. from a prior calibration run. When provided, spectral fitting is skipped. Note: spectral corner periods are 2πτ, not τ — a 29-day reservoir has its -3 dB corner at 182 days, and a 466-day soil reservoir at ~8 years, which may lie near or beyond the record length and be unreliable from spectral fitting alone. Attributes (populated after fit()) ------------------------------------ tau : ndarray Reservoir timescales [days], ordered slowest to fastest. H0 : ndarray Initial storage depths [mm], same order as tau. n_reservoirs_fitted : int Number of reservoirs selected (spectral fast + 1 karst). aic_scores : list of float AIC values for k = 1 … max_reservoirs-1 spectral fits. tau_karst : float Deep-reservoir timescale [days] from recession fitting. """ def __init__(self, Q, dt=1.0, n_reservoirs=None, max_reservoirs=4, precip=None, recession_min_duration=10, recession_precip_threshold=0.5, recession_antecedent_days=3, tau_deep=None, tau_fast=None): """ Parameters ---------- Q : array-like Observed specific discharge [mm/day]. dt : float, optional Timestep [days]. Default 1.0. n_reservoirs : int or None, optional Number of reservoirs. If None, selected by AIC. Default None. max_reservoirs : int, optional Maximum number of reservoirs to test when n_reservoirs is None. Default 4. precip : array-like or None, optional Daily precipitation [mm/day], same length as Q. Used to mask recession periods; if None, only discharge decline is used. recession_min_duration : int, optional Minimum qualifying recession length [days]. Default 10. recession_precip_threshold : float, optional Maximum daily precipitation within a recession segment [mm/day]. Default 0.5. recession_antecedent_days : int, optional Number of days before a recession that must also be dry. Default 3. tau_deep : float or None, optional Externally supplied deep-reservoir timescale [days]; skips recession fitting for that component when provided. tau_fast : array-like or None, optional Externally supplied fast-reservoir timescales [days]; skips spectral fitting when provided. """ self.Q = np.asarray(Q, dtype=float) self.dt = float(dt) self.n_reservoirs = n_reservoirs self.max_reservoirs = max_reservoirs self.precip = (np.asarray(precip, dtype=float) if precip is not None else None) self.recession_min_duration = recession_min_duration self.recession_precip_threshold = recession_precip_threshold self.recession_antecedent_days = recession_antecedent_days self.tau_deep = float(tau_deep) if tau_deep is not None else None # Externally supplied fast timescales [days], shortest first. # When provided, skip spectral fitting and use these directly. self.tau_fast = (np.sort(np.asarray(tau_fast, dtype=float)) if tau_fast is not None else None) # Results self.tau = None self.H0 = None self.n_reservoirs_fitted = None self.aic_scores = None self.tau_karst = None self.tau_karst_longterm = None # long-term absolute-time estimate (comparison) self._tau_spectral = None # fast timescales only (fastest first) self._Q_residual = None # Q after removing fast components self._Q_components_fast = None # list of fast-component time series self._psd_freqs = None self._psd = None self._tau_shallow_td = None # time-domain τ_shallow estimate self._tau_soil_td = None # time-domain τ_soil estimate # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------
[docs] def fit(self): """Run the full separation pipeline. Returns self.""" self._compute_psd() self._fit_spectral_model() self._fit_timescales_td() self._separate_components() self._fit_recession() self._compute_initial_conditions() return self
[docs] def get_initial_conditions(self): """ Initial reservoir storage depths, ordered fastest to slowest (shallow first, karst last), matching the MNiShed reservoir index convention. Returns ------- dict ``{'H0': [H0_shallow, H0_soil, ..., H0_karst]}`` in mm. """ self._check_fitted() return {'H0': list(self.H0[::-1])}
[docs] def get_parameter_priors(self): """ Suggested initial values and bounds (in params.yml units) for the MNiShed log__recession_coeff_* calibration parameters. Bounds are ±0.5 in log10 around the estimated timescale — wide enough to allow the optimizer freedom, tight enough to stay physical. Returns ------- dict Keyed by MNiShed parameter name. Each value is a dict with ``'initial'``, ``'lower'``, ``'upper'`` in log10(days). """ self._check_fitted() all_names = ['log__recession_coeff_shallow', 'log__recession_coeff_soil', 'log__recession_coeff_karst'] tau_fast_to_slow = self.tau[::-1] # shallow first, karst last n = len(tau_fast_to_slow) # Always end with karst; fill fast slots from the front. # For 2 reservoirs: ['shallow', 'karst'] — skip soil. names = all_names[:n - 1] + [all_names[-1]] priors = {} for name, tau_i in zip(names, tau_fast_to_slow): if not np.isfinite(tau_i): # For τ_soil, fall back to the time-domain rolling-minimum # estimate (not used for LP separation but good enough as a # prior hint — still biased low, so bounds are left wide). if name == 'log__recession_coeff_soil' and self._tau_soil_td is not None: tau_i = self._tau_soil_td else: priors[name] = None # could not be estimated; keep params.yml defaults continue log_tau = np.log10(tau_i) priors[name] = { 'initial': round(float(log_tau), 3), 'lower': round(float(log_tau) - 0.5, 3), 'upper': round(float(log_tau) + 0.5, 3), } return priors
[docs] def summary(self): """Print a brief summary of fitted timescales and initial conditions.""" self._check_fitted() tau_f2s = self.tau[::-1] H0_f2s = self.H0[::-1] labels = ['shallow', 'soil', 'karst'] + [f'res{i}' for i in range(3, len(tau_f2s))] print('HydrographSeparation results') print(f' n_reservoirs : {self.n_reservoirs_fitted}') print(f' {"Reservoir":<10} {"τ (days)":>12} {"H₀ (mm)":>12}') print(f' {"-"*10} {"-"*12} {"-"*12}') for label, tau_i, H0_i in zip(labels, tau_f2s, H0_f2s): print(f' {label:<10} {tau_i:>12.1f} {H0_i:>12.1f}') if self.aic_scores is not None: print(f'\n AIC by fast-reservoir order: ' f'{[round(a, 1) for a in self.aic_scores]}') print('\n Time-domain estimates:') td_sh = (f'{self._tau_shallow_td:.1f} d' if self._tau_shallow_td is not None else 'not estimated') print(f' τ_shallow : {td_sh}') if self.n_reservoirs_fitted >= 3 or self._tau_soil_td is not None: td_so = (f'{self._tau_soil_td:.1f} d' if self._tau_soil_td is not None else 'not estimated (insufficient long dry-period recessions)') print(f' τ_soil : {td_so}') fallback_tau = 10.0 * len(self.Q) * self.dt is_fallback = abs(self.tau_karst / fallback_tau - 1.0) < 0.01 fb_note = ' ← fallback: no declining trend detected' if is_fallback else '' print(f'\n τ_karst (segment minima): {self.tau_karst:.1f} days{fb_note}') if self.tau_karst_longterm is not None and not is_fallback: agree = abs(np.log(self.tau_karst / self.tau_karst_longterm)) < 0.5 print(f' τ_karst (long-term slope): {self.tau_karst_longterm:.1f} days' f' {"← agree" if agree else "← disagree — check interannual trend"}')
# ------------------------------------------------------------------ # Stage 1a: time-domain τ estimation (primary for fast reservoirs) # ------------------------------------------------------------------ def _fit_recession_scale(self, Q_signal, min_dur, smooth_days, antecedent_days, precip_threshold=None, min_tau=1.0, max_tau=None, mode='within'): """ Estimate a recession timescale from recession-segment analysis. Both current callers (τ_shallow and τ_soil in :meth:`_fit_timescales_td`) use ``mode='within'``. τ_karst has its own dedicated pipeline in :meth:`_fit_recession`, which uses the same per-segment-minimum approach as ``mode='cross'`` but adds a long-term cross-check and reconciliation step not available here. Two modes: ``'within'`` (default) — for fast/intermediate timescales: Fit log(Q) vs relative time within each qualifying segment; take the median slope. Captures the LOCAL recession rate, which is dominated by the fastest reservoir still active over that duration. Use short segments (5-20 d) for τ_shallow; longer segments (≥15 d) on the rolling-minimum envelope for τ_soil. ``'cross'`` — for inter-annual timescales: Record the minimum Q within each segment and fit log(Q_min) vs absolute time across all segments. Captures the INTER-ANNUAL drainage trend that persists after fast reservoirs empty. Not called internally; available for external or diagnostic use. Parameters ---------- Q_signal : ndarray Discharge signal to analyse [mm/day]. min_dur : int Minimum qualifying segment length [days]. smooth_days : float Moving-average window [days] for the declining test only. antecedent_days : int Days before segment start that must also satisfy the precipitation gate. precip_threshold : float or None Precipitation gate [mm/day]. Defaults to self.recession_precip_threshold. min_tau : float Reject fits with τ < min_tau [days]. max_tau : float or None Reject fits with τ > max_tau [days]. mode : {'within', 'cross'} Regression strategy (see above). Returns ------- tau : float or None Estimated timescale [days], or None if fewer than three segments qualify or the slope is non-negative. n_segs : int Number of qualifying segments used. """ n = len(Q_signal) threshold = (precip_threshold if precip_threshold is not None else self.recession_precip_threshold) # Smooth signal for declining test only w = max(1, int(round(smooth_days / self.dt))) if w > 1: kernel = np.ones(w) / w Q_smooth = np.convolve(Q_signal, kernel, mode='same') else: Q_smooth = Q_signal.copy() Q_smooth = np.maximum(Q_smooth, 0.0) # Precipitation gate if self.precip is not None: dry = self.precip < threshold for lag in range(1, antecedent_days + 1): dry[lag:] &= dry[:-lag] else: dry = np.ones(n, dtype=bool) # Declining test on smoothed signal declining = np.zeros(n, dtype=bool) declining[:-1] = Q_smooth[1:] < Q_smooth[:-1] candidate = dry & declining # Collect qualifying segments mask = np.zeros(n, dtype=bool) i = 0 while i < n: if candidate[i]: j = i + 1 while j < n and candidate[j]: j += 1 if j - i >= min_dur: mask[i:j] = True i = j else: i += 1 if not np.any(mask): return None, 0 idx = np.where(mask)[0] breaks = np.where(np.diff(idx) > 1)[0] + 1 segs = np.split(idx, breaks) min_valid = max(3, min_dur // 3) if mode == 'within': slopes = [] for seg in segs: Q_seg = Q_signal[seg] valid = Q_seg > 0 if valid.sum() < min_valid: continue t_rel = np.arange(valid.sum()) * self.dt s, _, _, _, _ = linregress(t_rel, np.log(Q_seg[valid])) if s < -1e-12: slopes.append(s) n_segs = len(slopes) if n_segs < 3: return None, n_segs slope = float(np.median(slopes)) else: # 'cross': per-segment minimum vs absolute time t_min_list = [] Q_min_list = [] for seg in segs: Q_seg = Q_signal[seg] valid = Q_seg > 0 if valid.sum() < min_valid: continue min_pos = np.argmin(Q_seg[valid]) abs_idx = seg[valid][min_pos] t_min_list.append(abs_idx * self.dt) Q_min_list.append(float(Q_seg[valid][min_pos])) n_segs = len(t_min_list) if n_segs < 3: return None, n_segs slope, _, _, _, _ = linregress(np.array(t_min_list), np.log(np.array(Q_min_list))) if slope >= -1e-12: return None, n_segs tau = -1.0 / slope if tau < min_tau: return None, n_segs if max_tau is not None and tau > max_tau: return None, n_segs return tau, n_segs def _fit_timescales_td(self): """ Estimate fast-reservoir timescales via time-domain recession analysis and update self._tau_spectral with the results. Two stages of sequential peeling: 1. **τ_shallow** — within-segment log(Q) slope on raw Q with short (≥5 day) segments and a 1-day smooth. Accepts τ in [2, 200] days. 2. **τ_soil** — per-segment minimum on Q after removing the shallow component (low-pass at τ_shallow). Uses a moving-average window equal to τ_shallow to suppress residual fast noise, a relaxed precipitation threshold (2 mm/day — small rain barely recharges the soil), and an antecedent window ≈ τ_shallow to wait for the shallow reservoir to drain. Only attempted when n_reservoirs ≥ 3. Time-domain estimates replace the corresponding spectral values in self._tau_spectral when available; spectral values serve as fallbacks only if they are well-separated from the shallow scale (> 5×τ_shallow). Diagnostic attributes _tau_shallow_td and _tau_soil_td are set regardless of whether they replace spectral values. """ if self.tau_fast is not None: self._tau_shallow_td = None self._tau_soil_td = None return n_fast_target = (self.n_reservoirs - 1 if self.n_reservoirs is not None else self.max_reservoirs - 1) # ---- τ_shallow ------------------------------------------------------- # Within-segment median slope on short (5-15 d) recession segments. # Short segments capture the fast shallow reservoir; deeper reservoirs # act as a near-constant baseline and barely affect the slope. tau_sh, _ = self._fit_recession_scale( self.Q, min_dur=5, smooth_days=1.0, antecedent_days=self.recession_antecedent_days, min_tau=2.0, max_tau=200.0, mode='within', ) self._tau_shallow_td = tau_sh # ---- τ_soil (only when ≥ 3 reservoirs expected) --------------------- # Problem with LP filtering for τ_soil estimation: applying a low-pass # filter at τ_shallow creates a signal (Q_slow) whose FILTER IMPULSE # RESPONSE decays with timescale τ_shallow. Recession segments in Q_slow # that start within ~3×τ_shallow days of a rain event are still in the # filter-decay regime, so within-segment slopes give τ_eff ≈ τ_shallow, # not τ_soil. With Minnesota dry periods of only 20-35 days, most # segments start before the filter has fully settled. # # Fix: use a ROLLING MINIMUM over a 2×τ_shallow window. The rolling # minimum extracts the lower envelope of Q (slow baseflow), cleanly # suppressing storm pulses without creating a lingering exponential tail. # The resulting Q_rmin ≈ Q_soil + Q_karst essentially everywhere it is # declining, with no multi-week filter-decay artefact. # Within-segment slopes on Q_rmin give # τ_eff = (Q_soil/Q_rmin)/τ_soil + (Q_karst/Q_rmin)/τ_karst # Theoretically τ_eff > τ_soil (karst floor slows the apparent decay), # but in practice residual shallow contamination in early recession # segments biases τ_eff short. The min_tau = 8×τ_shallow guard rejects # the worst-contaminated segments; the result is a rough lower bound on # τ_soil, useful as a prior but not a precise estimate. self._tau_soil_td = None if n_fast_target >= 2 and tau_sh is not None: n_sig = len(self.Q) w_rmin = max(1, int(round(2.0 * tau_sh / self.dt))) Q_rmin = np.array([ float(np.min(self.Q[max(0, t - w_rmin + 1):t + 1])) for t in range(n_sig) ]) tau_so, _ = self._fit_recession_scale( Q_rmin, min_dur=15, smooth_days=tau_sh, antecedent_days=5, precip_threshold=2.0, min_tau=8.0 * tau_sh, mode='within', ) self._tau_soil_td = tau_so # ---- update _tau_spectral -------------------------------------------- # NOTE: _tau_soil_td is intentionally NOT added to _tau_spectral here. # The rolling-minimum estimate is still biased low (transition regime), # so including it in the LP separation would corrupt Q_residual and # degrade τ_karst. It is used only in get_parameter_priors(). n_sp = len(self._tau_spectral) td_taus = [] # Shallow position (fastest) if tau_sh is not None: td_taus.append(tau_sh) elif n_sp >= 1: td_taus.append(float(self._tau_spectral[0])) # Soil position (second fastest), only when ≥ 3 reservoirs requested if n_fast_target >= 2: if n_sp >= 2: # Accept spectral τ_soil only when it is well-separated from # τ_shallow — if spectral fitting collapsed both to the same # corner (common when seasonal peaks dominate the PSD), discard ref = td_taus[0] if td_taus else None sp_soil = float(self._tau_spectral[1]) if ref is not None and sp_soil > 5.0 * ref: td_taus.append(sp_soil) # else: drop to 2-reservoir initialisation if td_taus: self._tau_spectral = np.sort(np.array(td_taus)) # shortest first self.n_reservoirs_fitted = len(td_taus) + 1 # +1 for karst # ------------------------------------------------------------------ # Stage 1b: spectral fitting (fallback / diagnostic) # ------------------------------------------------------------------ def _compute_psd(self): """ Estimate the spectral target for Lorentzian fitting. When precipitation is provided, compute the transfer function magnitude |H(f)|² = |S_QP(f)|² / S_PP(f) (cross-spectrum / input PSD). This cancels the colour of the precipitation spectrum and gives a clean estimate of the linear-reservoir cascade response regardless of whether the input is white noise, intermittent, or seasonal. Without precipitation, fall back to the output PSD S_QQ(f), which conflates input colour with reservoir structure but is still useful when timescale separation is large. """ Q = self.Q.copy() Q[~np.isfinite(Q)] = np.nanmean(Q) nperseg = min(len(Q) // 4, 1024) kw = dict(fs=1.0 / self.dt, nperseg=nperseg, window='hann', detrend='constant') if self.precip is not None: P = self.precip.copy() P[~np.isfinite(P)] = 0.0 freqs, S_PP = welch(P, **kw) freqs, S_QP = csd(Q, P, **kw) # Transfer function magnitude squared; guard against near-zero S_PP safe_spp = np.where(S_PP > 0, S_PP, np.nan) H2 = np.abs(S_QP) ** 2 / safe_spp spectral = H2 else: freqs, spectral = welch(Q, **kw) # The above uses nperseg = T_record/4, which may not resolve the # slow-reservoir corner. Supplement with a full-record periodogram # (nperseg = N) for the low-frequency bins that are missing. The # periodogram is noisier but gives the only spectral estimates below # 4/T_record, where the soil or karst corners may sit. n = len(Q) if nperseg < n: kw_full = dict(fs=1.0 / self.dt, nperseg=n, window='hann', detrend='constant') if self.precip is not None: f2, S_PP2 = welch(P, **kw_full) f2, S_QP2 = csd(Q, P, **kw_full) safe2 = np.where(S_PP2 > 0, S_PP2, np.nan) spec2 = np.abs(S_QP2) ** 2 / safe2 else: f2, spec2 = welch(Q, **kw_full) # Use full-record estimate for frequencies below the Welch resolution f_cutover = 4.0 / (n * self.dt) low_mask = (f2 > 0) & (f2 < f_cutover) & np.isfinite(spec2) & (spec2 > 0) freqs = np.concatenate([f2[low_mask], freqs]) spectral = np.concatenate([spec2[low_mask], spectral]) # Drop DC and any non-finite bins mask = (freqs > 0) & np.isfinite(spectral) & (spectral > 0) self._psd_freqs = freqs[mask] self._psd = spectral[mask] @staticmethod def _log_cascade_psd(log_freqs, log_A, *log_taus): """ log PSD of a cascade of k linear reservoirs driven by white noise: log S(f) = log_A - Σ_i log(1 + (2π f τ_i)²) Parameters passed as (log_A, log_τ_1, ..., log_τ_k) so that scipy.optimize.curve_fit can treat them as a flat vector. """ freqs = np.exp(log_freqs) log_S = np.full_like(freqs, log_A) for log_tau in log_taus: tau = np.exp(log_tau) log_S -= np.log(1.0 + (2.0 * np.pi * freqs * tau) ** 2) return log_S def _fit_k_reservoirs(self, k): """ Fit k Lorentzians to log PSD. Returns ------- taus : ndarray or None Fitted timescales [days], sorted longest first. aic : float AIC of the fit (lower is better). """ log_f = np.log(self._psd_freqs) log_S = np.log(self._psd) # Spread initial τ guesses log-uniformly across the observable range T_record = len(self.Q) * self.dt tau_init = np.logspace(0, np.log10(T_record / 4.0), k) p0 = [float(np.mean(log_S))] + list(np.log(tau_init)) tau_lo = np.log(0.5) tau_hi = np.log(5.0 * T_record) bounds = ([-np.inf] + [tau_lo] * k, [ np.inf] + [tau_hi] * k) try: popt, _ = curve_fit(self._log_cascade_psd, log_f, log_S, p0=p0, bounds=bounds, maxfev=20000) except (RuntimeError, ValueError): return None, np.inf taus = np.sort(np.exp(popt[1:]))[::-1] # longest first resid = log_S - self._log_cascade_psd(log_f, *popt) n = len(log_S) rss = np.dot(resid, resid) aic = n * np.log(rss / n) + 2.0 * (k + 1) # k taus + amplitude return taus, aic def _fit_spectral_model(self): """ Determine fast-reservoir timescales (all reservoirs except the deepest). If tau_fast was supplied at construction, use it directly and skip spectral fitting. Spectral fitting works well when the system is a nearly pure cascade (small exfiltration fractions so most water reaches the deep reservoir) and when the corner periods 2πτ_i fall within the observable frequency band. For real data with seasonal forcing and short records relative to τ_soil, externally supplied values from a prior calibration run are more reliable. """ if self.tau_fast is not None: self._tau_spectral = self.tau_fast # already sorted self.n_reservoirs_fitted = len(self.tau_fast) + 1 # +1 for deep self.aic_scores = None return if self.n_reservoirs is not None: n_fast_max = self.n_reservoirs - 1 else: n_fast_max = self.max_reservoirs - 1 aic_scores = [] tau_by_k = [] for k in range(1, n_fast_max + 1): taus, aic = self._fit_k_reservoirs(k) aic_scores.append(aic) tau_by_k.append(taus) self.aic_scores = aic_scores if self.n_reservoirs is not None: best_k = self.n_reservoirs - 1 else: best_k = int(np.argmin(aic_scores)) + 1 taus_best = tau_by_k[best_k - 1] if taus_best is None: # curve_fit failed for the selected k; fall back to 1 fast reservoir for k_try in range(1, n_fast_max + 1): if tau_by_k[k_try - 1] is not None: taus_best = tau_by_k[k_try - 1] best_k = k_try break else: taus_best = np.array([1.0]) best_k = 1 self._tau_spectral = np.sort(taus_best) # fastest first self.n_reservoirs_fitted = best_k + 1 # +1 for karst # ------------------------------------------------------------------ # Stage 2: component separation # ------------------------------------------------------------------ def _apply_lowpass(self, signal, tau): """ Forward recursive low-pass filter with timescale τ [days]. Returns the slowly varying component of signal. """ alpha = np.exp(-self.dt / tau) result = np.empty_like(signal) result[0] = signal[0] for t in range(1, len(signal)): result[t] = alpha * result[t - 1] + (1.0 - alpha) * signal[t] return result def _separate_components(self): """ Peel fast components from Q one by one (fastest first). Each pass: apply low-pass filter at τ_i, subtract → fast component. Residual after all passes is the slow (karst) signal. """ Q = self.Q.copy() Q[~np.isfinite(Q)] = np.nanmean(Q) components = [] for tau_i in self._tau_spectral: # shortest τ first Q_slow = self._apply_lowpass(Q, tau_i) components.append(Q - Q_slow) # fast component at this τ Q = Q_slow self._Q_components_fast = components # list: fastest component first self._Q_residual = np.maximum(Q, 0.0) # slow residual for recession fit # ------------------------------------------------------------------ # Stage 3: recession fitting for τ_karst # ------------------------------------------------------------------ def _recession_mask(self): """ Boolean mask of timesteps within qualifying recession segments: Q_residual monotonically declining, precipitation absent, minimum duration satisfied. """ n = len(self._Q_residual) # Precipitation gate if self.precip is not None: dry = self.precip < self.recession_precip_threshold for lag in range(1, self.recession_antecedent_days + 1): dry[lag:] &= dry[:-lag] else: dry = np.ones(n, dtype=bool) # Discharge declining declining = np.zeros(n, dtype=bool) declining[:-1] = self._Q_residual[1:] < self._Q_residual[:-1] candidate = dry & declining # Enforce minimum run length mask = np.zeros(n, dtype=bool) i = 0 while i < n: if candidate[i]: j = i + 1 while j < n and candidate[j]: j += 1 if j - i >= self.recession_min_duration: mask[i:j] = True i = j else: i += 1 return mask def _fit_recession(self): """ Estimate τ_karst using per-segment minimum-flow analysis, cross-checked against a long-term absolute-time regression on all recession timesteps. Primary method — per-segment minima: For each qualifying recession segment, record the minimum Q_residual value and its absolute time position. The minimum represents the most-depleted state within that segment, after fast reservoirs have substantially drained. OLS on log(Q_min) vs absolute time gives a slope -1/τ_karst. Cross-check — long-term slope: Fit log(Q_residual) vs absolute time across all recession timesteps. This averages over more data points and is more robust to soil contamination of the per-segment estimate (unremoved soil components in Q_residual shorten the per-segment estimate because per-segment minima track the seasonal soil cycle rather than the interannual karst trend). Reconciliation: When both estimates are available and they disagree by more than factor 2 (|log ratio| > log 2 ≈ 0.69), the LONGER τ is adopted as the final tau_karst. Physically, soil contamination always biases the per-segment estimate SHORT, so the longer of the two is a better lower-bound on τ_karst. Both raw estimates are stored (tau_karst and tau_karst_longterm). If tau_deep was supplied at construction, skip fitting and use it. """ if self.tau_deep is not None: self.tau_karst = self.tau_deep self.tau_karst_longterm = self.tau_deep return mask = self._recession_mask() fallback_tau = 10.0 * len(self.Q) * self.dt # ---- long-term slope (always computed for comparison) ---- if np.any(mask): idx = np.where(mask)[0] Q_dry = self._Q_residual[idx] pos = Q_dry > 0 if pos.sum() >= 5: slope_lt, _, _, _, _ = linregress(idx[pos] * self.dt, np.log(Q_dry[pos])) self.tau_karst_longterm = (fallback_tau if slope_lt >= -1e-12 else -1.0 / slope_lt) else: self.tau_karst_longterm = fallback_tau else: self.tau_karst_longterm = fallback_tau if not np.any(mask): self.tau_karst = fallback_tau return # ---- per-segment minima ---- idx = np.where(mask)[0] breaks = np.where(np.diff(idx) > 1)[0] + 1 segs = np.split(idx, breaks) t_min_list = [] Q_min_list = [] for seg in segs: Q_seg = self._Q_residual[seg] valid = Q_seg > 0 if valid.sum() < self.recession_min_duration: continue min_pos = np.argmin(Q_seg[valid]) abs_idx = seg[valid][min_pos] t_min_list.append(abs_idx * self.dt) Q_min_list.append(Q_seg[valid][min_pos]) if len(t_min_list) < 3: self.tau_karst = self.tau_karst_longterm return t_min = np.array(t_min_list) logQ_m = np.log(np.array(Q_min_list)) slope, _, _, _, _ = linregress(t_min, logQ_m) tau_segmin = fallback_tau if slope >= -1e-12 else -1.0 / slope # Reconcile: prefer the LONGER estimate when there is strong disagreement. # Per-segment minimum is biased short when Q_residual still contains a # soil component whose seasonal drainage dominates the segment minima. tau_lt = self.tau_karst_longterm if (tau_lt < fallback_tau and abs(np.log(tau_segmin / tau_lt)) > np.log(2.0)): self.tau_karst = max(tau_segmin, tau_lt) else: self.tau_karst = tau_segmin # ------------------------------------------------------------------ # Stage 4: initial conditions # ------------------------------------------------------------------ def _compute_initial_conditions(self): """ H₀_i = Q_i(t = 0) × τ_i for each reservoir. tau and H0 are stored slowest-first (karst, …, shallow). When fewer fast reservoirs were estimated than n_reservoirs specifies (e.g., τ_soil not identifiable from recession analysis), the missing intermediate reservoirs are padded with H₀ = 0 and τ = NaN so that get_initial_conditions() returns the expected number of entries. """ H0_karst = float(self._Q_residual[0]) * self.tau_karst H0_fast = [max(float(Q_c[0]), 0.0) * tau_i for Q_c, tau_i in zip(self._Q_components_fast, self._tau_spectral)] # Pad with NaN/0 for any fast reservoirs that could not be estimated n_fast_needed = (self.n_reservoirs - 1 if self.n_reservoirs is not None else len(self._tau_spectral)) n_missing = n_fast_needed - len(H0_fast) if n_missing > 0: # Insert unestimated reservoirs between the fastest and the karst. # tau = NaN signals "unknown"; H0 = 0 is a conservative start. H0_fast = [0.0] * n_missing + H0_fast tau_pad = [float('nan')] * n_missing + list(self._tau_spectral[::-1]) else: tau_pad = list(self._tau_spectral[::-1]) # Slowest first self.tau = np.array([self.tau_karst] + tau_pad) self.H0 = np.array([H0_karst] + H0_fast) # ------------------------------------------------------------------ # Utility # ------------------------------------------------------------------ def _check_fitted(self): if self.tau is None: raise RuntimeError('Call fit() before accessing results.')