#! /usr/bin/python3
########################################
# Then, methought, the air grew denser #
# - Edgar Allan Poe #
# THE RAVEN #
########################################
# Started by A. Wickert
# 25 July 2019
# Updated slightly by J. Jones
# 08 Oct 2019
# Significant Update by A. Wickert
# October 2022
# CLI added by A. Wickert
# November 2023
import argparse
import math
import sys
import warnings
import matplotlib.dates as mdates
import numpy as np
import pandas as pd
import yaml
from matplotlib import pyplot as plt
try:
import numba as _numba
_numba_available = True
except ImportError:
_numba_available = False
# c_p / L_f: water's specific heat divided by the latent heat of fusion.
# c_p = 4186 J kg⁻¹ °C⁻¹, L_f = 334 000 J kg⁻¹ → ≈ 0.01253 °C⁻¹
# Gives mm SWE melted per mm of rain per °C of rain temperature.
_CP_LF = 4186.0 / 334000.0
if _numba_available:
@_numba.jit(nopython=True, cache=True)
def _jit_run(P_arr, ET_arr, T_arr, T_min_arr, T_max_arr,
H_init, H_snow_init, fgi_init, H_deficit_carry_init,
tau_arr, b_arr, H_ref_arr, f_dis_in, junction_arr,
leakance_R_arr, H_threshold_arr, Hmax_arr,
f_tile_arr, tau_tile_arr, H_tile_init,
multipath_thr_arr, multipath_tau_arr,
melt_factor, snow_insulation_k, fgi_decay_coeff, fdd_threshold,
direct_runoff_fraction, wp_soil, wp_soil_sigma, et_alpha, dt,
has_snowpack, use_fgi, use_rain_on_snow, use_et_reservoir_draw,
use_direct_runoff, has_trange):
"""JIT-compiled daily time loop replacing Buckets.run()/update().
Replicates update()/_compute_snowpack()/_update_fgi()/
_draw_et_from_reservoirs() exactly for the common case (no PDM,
no et_water_stress). Falls back to the Python loop otherwise.
Two tile-drain mechanisms are supported (independently or together):
- Constant-fraction bypass (``f_tile_arr``/``tau_tile_arr``):
a fraction of recession outflow is routed through a downstream
fast linear sub-reservoir, regardless of storage state.
- Multipath threshold-activated parallel drain
(``multipath_thr_arr``/``multipath_tau_arr``): a second linear
outflow path operates from the same storage but only when the
reservoir depth exceeds the threshold. ``multipath_tau_arr[r] <= 0``
disables the multipath drain for reservoir *r*.
junction_arr encoding: 0 = fraction, 1 = leakance, 2 = threshold.
"""
n_steps = len(P_arr)
n_res = len(tau_arr)
Q_out = np.empty(n_steps)
SWE_out = np.empty(n_steps)
H_sub_out = np.empty(n_steps)
H_res_out = np.empty((n_steps, n_res))
H_res = H_init.copy()
H_snow_cur = H_snow_init
fgi_cur = fgi_init
H_deficit_carry = H_deficit_carry_init
# Local copy of f_to_discharge — frozen-ground overrides res[0] temporarily
f_dis = f_dis_in.copy()
# Per-reservoir cascade working arrays
H_to_next = np.zeros(n_res)
H_deficit_res = np.zeros(n_res)
H_tile = H_tile_init.copy()
for step in range(n_steps):
P_t = P_arr[step]
ET_t = ET_arr[step]
T_t = 0.0
# Read temperature when needed by snowpack or FGI
if has_snowpack or use_fgi:
T_t = T_arr[step]
# Skip step if any required forcing is NaN.
# T NaN only triggers skip when snowpack is active (matches Python).
skip = math.isnan(P_t) or math.isnan(ET_t)
if has_snowpack and not skip:
skip = math.isnan(T_t)
if skip:
Q_out[step] = np.nan
SWE_out[step] = H_snow_cur
H_sub_total = 0.0
for r in range(n_res):
H_sub_total += H_res[r] + H_tile[r]
H_res_out[step, r] = H_res[r]
H_sub_out[step] = H_sub_total
continue
# ── Snowpack ──────────────────────────────────────────────────
sp_infiltrated = 0.0
sp_deficit = 0.0
excess_dd = 0.0
if has_snowpack:
if use_et_reservoir_draw:
sp_recharge = P_t + H_deficit_carry
else:
sp_recharge = P_t - ET_t + H_deficit_carry
# Snowpack.recharge(sp_recharge)
if sp_recharge >= 0.0:
if T_t <= 0.0:
H_snow_cur += sp_recharge
# sp_infiltrated stays 0
else:
sp_infiltrated = sp_recharge
else:
# Sublimation; deficit if snow insufficient
if H_snow_cur > -sp_recharge:
H_snow_cur += sp_recharge
else:
sp_deficit = sp_recharge + H_snow_cur
H_snow_cur = 0.0
# sp_infiltrated stays 0
# Snowpack.melt(dt, P)
if T_t > 0.0:
pdd = melt_factor * T_t * dt
ros_P = P_t if use_rain_on_snow else 0.0
ros = _CP_LF * T_t * ros_P
total_m = pdd + ros
if total_m <= H_snow_cur:
actual_melt = total_m
else:
actual_melt = H_snow_cur
if melt_factor > 0.0:
excess_dd = (total_m - actual_melt) / melt_factor
sp_infiltrated += actual_melt
H_snow_cur -= actual_melt
# Temporarily store snowpack deficit as carry for res[0] recharge
H_deficit_carry_for_res0 = sp_deficit
else:
H_deficit_carry_for_res0 = H_deficit_carry
# ── FGI update ────────────────────────────────────────────────
f0_frozen = f_dis[0] # save calibrated value before possible override
if use_fgi and not math.isinf(fdd_threshold):
T_eff = T_t * math.exp(-snow_insulation_k * H_snow_cur)
if has_trange:
T_max_t = T_max_arr[step]
T_min_t = T_min_arr[step]
DTR = T_max_t - T_min_t
# NaN comparisons evaluate False → A_t = 1.0 when data absent
if DTR > 0.0 and T_max_t > 0.0:
f_above = T_max_t / DTR
if f_above > 1.0:
f_above = 1.0
A_t = 1.0 - (1.0 - fgi_decay_coeff) * f_above
else:
A_t = 1.0
else:
A_t = fgi_decay_coeff
new_fgi = A_t * fgi_cur - T_eff - excess_dd
if new_fgi < 0.0 or math.isnan(new_fgi):
new_fgi = 0.0
fgi_cur = new_fgi
if fgi_cur > fdd_threshold:
f_dis[0] = 1.0 # frozen ground: all drainage → direct runoff
# ── Reservoir cascade ─────────────────────────────────────────
qi = 0.0
for r in range(n_res):
# --- Recharge ---
H_res_excess = 0.0
H_res_deficit = 0.0
if r == 0:
if has_snowpack:
_rech = sp_infiltrated + H_deficit_carry_for_res0
else:
if use_et_reservoir_draw:
_rech = P_t + H_deficit_carry_for_res0
else:
_rech = P_t - ET_t + H_deficit_carry_for_res0
if use_direct_runoff and _rech > 0.0:
q_direct = _rech * direct_runoff_fraction
qi += q_direct
_rech -= q_direct
else:
_rech = H_to_next[r - 1] + H_deficit_res[r - 1]
new_H = H_res[r] + _rech
if new_H < 0.0:
H_res_deficit = new_H # negative
H_res[r] = 0.0
elif (not math.isinf(Hmax_arr[r])) and new_H > Hmax_arr[r]:
H_res_excess = new_H - Hmax_arr[r]
H_res[r] = Hmax_arr[r]
else:
H_res[r] = new_H
H_deficit_res[r] = H_res_deficit
# --- Discharge ---
b_r = b_arr[r]
H0_r = H_res[r]
# Threshold junction: recession only above H_threshold
if junction_arr[r] == 2:
H_eff = H0_r - H_threshold_arr[r]
if H_eff < 0.0:
H_eff = 0.0
else:
H_eff = H0_r
if b_r == 1.0 or H_eff <= 0.0:
dH = H_eff * (1.0 - math.exp(-dt / tau_arr[r]))
else:
# Exact integration: H(t+dt) = [H^(1-b) + (b-1)dt/tau_eff]^(1/(1-b))
tau_eff = tau_arr[r] * (H_ref_arr[r] ** (b_r - 1.0))
arg = H_eff ** (1.0 - b_r) + (b_r - 1.0) * dt / tau_eff
H_new_r = arg ** (1.0 / (1.0 - b_r)) if arg > 0.0 else 0.0
dH = H_eff - H_new_r
if dH < 0.0:
dH = 0.0
# Partition dH: leakance applies to non-bottom reservoirs only
is_bottom = (r == n_res - 1)
if junction_arr[r] == 1 and not is_bottom:
# Q_leak = min(dH, max(0, H_this - H_next) / R)
diff = H0_r - H_res[r + 1] # H_res[r+1] pre-recharge/discharge
if diff < 0.0:
diff = 0.0
Q_leak = diff / leakance_R_arr[r]
if Q_leak > dH:
Q_leak = dH
H_to_next[r] = Q_leak
H_exfiltrated = dH - Q_leak
else:
H_exfiltrated = dH * f_dis[r]
H_to_next[r] = dH * (1.0 - f_dis[r])
H_discharge = H_res_excess + H_exfiltrated
H_res[r] -= dH
qi += H_discharge
# Tile drain: intercept f_tile fraction of H_to_next,
# route through a linear (b=1) sub-reservoir, discharge to stream.
if f_tile_arr[r] > 0.0:
tile_in = f_tile_arr[r] * H_to_next[r]
H_to_next[r] -= tile_in
H_tile[r] += tile_in
tile_dH = H_tile[r] * (1.0 - math.exp(-dt / tau_tile_arr[r]))
H_tile[r] -= tile_dH
qi += tile_dH
# Multipath drainage: a parallel fast path from this reservoir
# direct to stream, active when storage exceeds the threshold.
# Applied after primary recession via operator splitting
# (first-order accurate; acceptable at daily dt). Bypasses the
# junction/f_to_discharge partition.
if multipath_tau_arr[r] > 0.0:
H_above_mp = H_res[r] - multipath_thr_arr[r]
if H_above_mp > 0.0:
dH_mp = H_above_mp * (
1.0 - math.exp(-dt / multipath_tau_arr[r]))
H_res[r] -= dH_mp
qi += dH_mp
# Restore calibrated f_to_discharge[0] (undo frozen-ground override)
f_dis[0] = f0_frozen
# Carry deficit from bottom reservoir to next timestep
H_deficit_carry = H_deficit_res[n_res - 1]
# ── ET draw from reservoirs ───────────────────────────────────
if use_et_reservoir_draw:
demand0 = et_alpha * ET_t
avail0 = H_res[0] if H_res[0] > 0.0 else 0.0
actual0 = demand0 if demand0 <= avail0 else avail0
H_res[0] -= actual0
# Condensation (negative ET) that overtops the cap runs off.
if H_res[0] > Hmax_arr[0]:
qi += H_res[0] - Hmax_arr[0]
H_res[0] = Hmax_arr[0]
if n_res >= 2:
demand1 = (1.0 - et_alpha) * ET_t
if wp_soil > 0.0:
if wp_soil_sigma > 0.0:
# Gaussian CDF: demand scaled; draw from full H
f_avail = 0.5 * (1.0 + math.erf(
(H_res[1] - wp_soil)
/ (wp_soil_sigma * math.sqrt(2.0))))
demand1 = demand1 * f_avail
avail1 = H_res[1] if H_res[1] > 0.0 else 0.0
else:
# Hard threshold: no draw below wp_soil
if H_res[1] <= wp_soil:
demand1 = 0.0
avail1 = 0.0
else:
avail1 = H_res[1] - wp_soil
else:
avail1 = H_res[1] if H_res[1] > 0.0 else 0.0
actual1 = demand1 if demand1 <= avail1 else avail1
H_res[1] -= actual1
if H_res[1] > Hmax_arr[1]:
qi += H_res[1] - Hmax_arr[1]
H_res[1] = Hmax_arr[1]
# ── Record outputs ────────────────────────────────────────────
Q_out[step] = qi
SWE_out[step] = H_snow_cur
H_sub_total = 0.0
for r in range(n_res):
H_sub_total += H_res[r] + H_tile[r]
H_res_out[step, r] = H_res[r]
H_sub_out[step] = H_sub_total
return (Q_out, SWE_out, H_sub_out, H_res_out, H_res, H_tile,
H_snow_cur, fgi_cur, H_deficit_carry)
[docs]
class Reservoir(object):
"""
Generic reservoir. Accepts new water (recharge), and sends it to other
reservoirs and/or out of the system (discharge) at a rate that is
proportional to the amount of water held in the reservoir.
"""
[docs]
def __init__(self, recession_coeff, f_to_discharge=1., Hmax=np.inf, pdm_H0=None,
H0=0., f_tile=0.0, tau_tile=None,
junction_type='fraction', leakance_R=None, H_threshold=0.0,
multipath_threshold=None, multipath_timescale=None):
"""
Initialize a reservoir.
Parameters
----------
recession_coeff : float
Recession coefficient. For a linear reservoir (recession
exponent b = 1) this equals the true e-folding timescale, in
days. For b > 1 it is not a timescale — its units are
day·mm^(b-1) and the actual mean residence time depends on
storage level; use :meth:`mean_residence_time` to obtain a
physically comparable timescale at a given reference discharge.
f_to_discharge : float, optional
Fraction of water lost each time step that exits as river
discharge. The remainder (1 - f_to_discharge) infiltrates to
the next-deeper reservoir. Default 1.0 (all to discharge).
Used only when junction_type is 'fraction' or 'threshold'.
Hmax : float, optional
Maximum water depth the reservoir can hold. Default np.inf.
pdm_H0 : float or None, optional
Characteristic storage depth [mm] for the probability-distributed
model (PDM) of saturation-excess overland flow. Storage capacity
is assumed exponentially distributed across the catchment with
mean pdm_H0; the saturated fraction when reservoir depth is H is
f_sat = 1 - exp(-H / pdm_H0). That fraction of each positive
recharge pulse is shed immediately as overland flow. Mutually
exclusive with a finite Hmax. Default None (PDM off).
H0 : float, optional
Initial water depth at the start of the simulation. Default 0.
f_tile : float, optional
Fraction of subsurface drainage (H_to_next) that is diverted
to a fast tile-drain sub-reservoir instead of passing to the
next-deeper reservoir. The tile sub-reservoir drains directly to
stream with e-folding time tau_tile. This is a *fractional-bypass*
tile representation: a constant fraction of recession outflow is
routed through a downstream fast linear reservoir, regardless of
current storage. Default 0 (no tile drainage). Requires tau_tile
when > 0.
For a *threshold-activated parallel drainage* representation,
in which a second outflow path from the same storage activates
only above a water-table depth, use ``multipath_threshold`` /
``multipath_timescale`` instead.
tau_tile : float or None, optional
E-folding residence time [days] of the tile-drain sub-reservoir
(used with ``f_tile``). Typical values: 3–21 days for
agricultural tile systems. Required when f_tile > 0; ignored
when f_tile == 0. Default None.
multipath_threshold : float or None, optional
Storage depth [mm] above which a *parallel* fast drainage path
from this reservoir activates. The parallel path drains directly
to stream with e-folding timescale ``multipath_timescale`` and is
added to the discharge alongside the primary recession. The
outflow is
Q_multipath = max(0, H - multipath_threshold) / multipath_timescale.
Below the threshold this term is exactly zero, so the reservoir
behaves identically to the no-multipath case. Above it, a second
linear drain operates in parallel with the primary recession,
giving a two-timescale response: slow matrix drainage at low
storage and faster combined drainage at high storage.
Physical analogue: a tile-drain system installed at a fixed depth
that activates only once the water table rises above the drain
elevation. Contrast with ``f_tile``/``tau_tile``, which is a
constant-fraction fast-routing path through a separate downstream
reservoir regardless of storage.
Both ``multipath_threshold`` and ``multipath_timescale`` must be
provided together, or both left as None (default = multipath
disabled).
multipath_timescale : float or None, optional
E-folding timescale [days] of the parallel multipath drain.
Typical values: 3–21 days for agricultural tile systems
represented this way. Required together with
``multipath_threshold``; ignored when it is None. Default None.
junction_type : str, optional
How drainage is partitioned between river discharge and the
next-deeper reservoir. Options:
``'fraction'`` (default)
Fixed split: ``f_to_discharge`` fraction exits as stream
discharge; remainder infiltrates. Current default behaviour.
``'leakance'``
Head-difference driven flow to the next reservoir, physically
representing Darcy flow through a confining unit (e.g. a shale
layer). ``Q_leak = max(H_this - H_next, 0) / leakance_R``,
capped at the total drainage ``dH``. The remainder goes to
stream. Requires ``leakance_R``.
``'threshold'``
Dead-storage threshold: the recession law is applied only to
``max(H - H_threshold, 0)``; water below ``H_threshold``
never drains. The above-threshold drainage splits by
``f_to_discharge`` as in the ``'fraction'`` case. Models a
stream-aquifer connection that activates only when the water
table exceeds the streambed elevation.
leakance_R : float or None, optional
Leakance resistance [days]. ``Q_leak = ΔH / leakance_R``.
Required when junction_type is ``'leakance'``. Default None.
H_threshold : float, optional
Dead-storage threshold depth [mm]. Recession is applied only
to ``max(H - H_threshold, 0)``. Used when junction_type is
``'threshold'``. Default 0.0 (no threshold; full storage drains).
Raises
------
ValueError
If recession_coeff <= 0, f_to_discharge < 0 or > 1, Hmax < 0,
pdm_H0 <= 0, f_tile < 0 or > 1, f_tile > 0 with no tau_tile,
junction_type is unrecognised, junction_type is 'leakance'
with no leakance_R, multipath_threshold < 0,
multipath_timescale <= 0, or only one of the two multipath
parameters is provided.
"""
self.Hwater = H0
self.Hmax = Hmax
self.pdm_H0 = pdm_H0
self.recession_coeff = recession_coeff
self.f_to_discharge = f_to_discharge
self.junction_type = junction_type
self.leakance_R = leakance_R
self.H_threshold = H_threshold
# Initialized here so all instance attributes exist before
# recharge() and discharge() are first called
self.H_excess = 0.
self.H_deficit = 0.
self.H_exfiltrated = 0.
self.H_to_next = 0.
self.H_discharge = 0.
# Fast-path components of H_discharge, recorded per step for
# diagnostics / BMI flux partition (already included in H_discharge).
self.H_tile = 0.
self.H_multipath = 0.
# Check values and note whether they are reasonable
if recession_coeff <= 0:
raise ValueError("recession_coeff must be > 0.")
if f_to_discharge < 0:
raise ValueError("Negative f_to_discharge not possible.")
elif f_to_discharge > 1:
raise ValueError("f_to_discharge: Cannot discharge >100% of water.")
elif f_to_discharge == 0:
warnings.warn("All water infiltrates when f_to_discharge is 0:"+
" you may have created a\n"+
"redundant pass-through water-storage layer",
stacklevel=2)
if Hmax < 0:
raise ValueError("Hmax must be >= 0 (and >0 makes more sense)")
if pdm_H0 is not None and pdm_H0 <= 0:
raise ValueError("pdm_H0 must be > 0")
if f_tile < 0 or f_tile > 1:
raise ValueError("f_tile must be in [0, 1]")
if f_tile > 0 and tau_tile is None:
raise ValueError("tau_tile must be provided when f_tile > 0")
_valid_junctions = ('fraction', 'leakance', 'threshold')
if junction_type not in _valid_junctions:
raise ValueError(f"junction_type must be one of {_valid_junctions}")
if junction_type == 'leakance' and leakance_R is None:
raise ValueError("leakance_R must be provided when junction_type='leakance'")
self.f_tile = f_tile
if f_tile > 0.0 and tau_tile is not None:
self.tile_res = Reservoir(tau_tile, f_to_discharge=1.0)
else:
self.tile_res = None
# Multipath: threshold-activated parallel drain (distinct from the
# constant-fraction f_tile/tau_tile bypass; see __init__ docstring).
# Both must be set together or both None.
if (multipath_threshold is None) ^ (multipath_timescale is None):
raise ValueError(
"multipath_threshold and multipath_timescale must be set "
"together (both numbers) or both left as None.")
if multipath_threshold is not None:
if multipath_threshold < 0.0:
raise ValueError("multipath_threshold must be >= 0.")
if multipath_timescale <= 0.0:
raise ValueError("multipath_timescale must be > 0.")
self.multipath_threshold = multipath_threshold
self.multipath_timescale = multipath_timescale
# Power-law recession: Q = (H/τ) * (H/recession_H_ref)^(recession_exponent-1).
# recession_exponent=1 (default) recovers the linear reservoir exactly.
# recession_H_ref is the storage at which τ has its usual linear meaning [mm].
self.recession_exponent = 1.0
self.recession_H_ref = 1.0
@property
def has_multipath(self):
"""True when both multipath parameters are configured (drain active)."""
return (self.multipath_threshold is not None
and self.multipath_timescale is not None)
[docs]
def recharge(self, H):
"""
Add or remove water from the reservoir.
Recharge H can be positive (net water input, e.g. P > ET) or
negative (net deficit, e.g. ET > P). Sets H_excess if the
reservoir overflows Hmax, or H_deficit if more water is removed
than the reservoir holds.
Parameters
----------
H : float
Depth of water added (positive) or removed (negative).
Raises
------
ValueError
If Hwater is already negative before recharge is applied.
"""
# Extra water above a maximum cap
self.H_excess = 0.
# Water that this layer cannot hold and cannot be passed to a deeper layer
self.H_deficit = 0.
# ERROR if water is less than 0 -- may be able to remove
# this check later
if self.Hwater < 0:
raise ValueError("Hwater in reservoir < 0; non-physical")
# PDM: exponential distribution of storage capacities.
# Saturated fraction of catchment sheds positive recharge immediately
# as saturation-excess overland flow before the remainder enters storage.
if self.pdm_H0 is not None and H > 0:
f_sat = 1.0 - np.exp(-self.Hwater / self.pdm_H0)
self.H_excess = f_sat * H
H = (1.0 - f_sat) * H
# What if more water is lost during "recharge" than exists in reservoir?
# Create a deficit and bring Hwater to 0
if self.Hwater + H < 0:
self.H_deficit += self.Hwater + H
self.Hwater = 0.
# What if more water is added than maximum reservoir capacity?
# Mark excess (straight to runoff) and bring Hwater to Hmax
elif self.Hwater + H > self.Hmax:
self.H_excess += self.Hwater + H - self.Hmax
self.Hwater = self.Hmax
# Otherwise, we're in a range in which 0 <= H <= Hmax
# Yay! Things are easier!
else:
self.Hwater += H
[docs]
def discharge(self, dt, H_next=None):
"""
Discharge water from the reservoir over one time step.
Computes water lost by the recession law, partitions it between
river discharge (H_exfiltrated) and infiltration to the next-deeper
reservoir (H_to_next) according to junction_type, and adds
overflow from recharge() (H_excess) to H_discharge.
Parameters
----------
dt : float
Time step duration (same units as recession_coeff; typically days).
H_next : float or None, optional
Current water depth of the next-deeper reservoir [mm]. Used
only when junction_type is ``'leakance'`` to compute the
head-difference driven flux. Ignored for other junction types.
"""
b = self.recession_exponent
H0 = self.Hwater
# For threshold junction: recession applies only above H_threshold.
# Water below H_threshold is permanent dead storage.
H_eff = max(0.0, H0 - self.H_threshold) if self.junction_type == 'threshold' else H0
if b == 1.0 or H_eff <= 0.0:
dH = H_eff * (1 - np.exp(-dt / self.recession_coeff))
else:
# Exact integration of dH/dt = -(H/recession_coeff)·(H/H_ref)^(b-1)
# = -H^b / (recession_coeff · H_ref^(b-1))
# Substituting u = H^(1-b): du/dt = (b-1)/tau_eff
# => H(t+dt) = [H0^(1-b) + (b-1)·dt/tau_eff]^(1/(1-b))
tau_eff = self.recession_coeff * self.recession_H_ref ** (b - 1.0)
H_new = (H_eff ** (1.0 - b) + (b - 1.0) * dt / tau_eff) ** (1.0 / (1.0 - b))
dH = H_eff - max(0.0, H_new)
# Partition dH between stream discharge and infiltration to next reservoir.
if self.junction_type == 'leakance' and H_next is not None:
# Head-difference driven flux through confining unit (e.g., shale layer).
# Q_leak = max(H_this - H_next, 0) / R, capped at available drainage dH.
Q_leak = min(dH, max(0.0, H0 - H_next) / self.leakance_R)
self.H_to_next = Q_leak
self.H_exfiltrated = dH - Q_leak
else:
# 'fraction' and 'threshold': fixed f_to_discharge split.
self.H_exfiltrated = dH * self.f_to_discharge
self.H_to_next = dH * (1 - self.f_to_discharge)
self.H_discharge = self.H_excess + self.H_exfiltrated
self.Hwater -= dH
# Reset the per-step fast-path components (recorded for the BMI flux
# partition; they are also folded into H_discharge below).
self.H_tile = 0.0
self.H_multipath = 0.0
# Tile drainage: divert f_tile of H_to_next to the fast sub-reservoir.
# The remainder continues to the next-deeper reservoir as normal.
if self.tile_res is not None:
tile_in = self.f_tile * self.H_to_next
self.H_to_next -= tile_in
self.tile_res.recharge(tile_in)
self.tile_res.discharge(dt)
self.H_tile = self.tile_res.H_discharge
self.H_discharge += self.H_tile
# Multipath drainage: a parallel fast path direct to stream, active
# only when storage exceeds multipath_threshold. Applied after the
# primary recession via operator splitting (first-order accurate in dt;
# acceptable for daily timesteps with τ_matrix >> dt). Bypasses the
# junction/f_to_discharge partition — goes straight to discharge.
if self.has_multipath:
H_above = self.Hwater - self.multipath_threshold
if H_above > 0.0:
dH_mp = H_above * (1.0 - np.exp(-dt / self.multipath_timescale))
self.Hwater -= dH_mp
self.H_multipath = dH_mp
self.H_discharge += dH_mp
[docs]
def mean_residence_time(self, Q_ref):
"""
Mean residence time [days] at a reference steady-state discharge.
For a nonlinear reservoir governed by
:math:`Q = (H/\\tau)\\cdot(H/H_{\\mathrm{ref}})^{b-1}
= H^{b}/\\tau_{\\mathrm{eff}}`, with effective constant
:math:`\\tau_{\\mathrm{eff}} = \\tau\\,H_{\\mathrm{ref}}^{\\,b-1}`,
the steady-state storage at discharge *Q_ref* is
:math:`H_{ss} = (Q_{\\mathrm{ref}} \\cdot \\tau_{\\mathrm{eff}})^{1/b}`,
giving
.. math::
\\mathrm{MRT} = \\frac{H_{ss}}{Q_{\\mathrm{ref}}} =
\\frac{\\tau_{\\mathrm{eff}}^{1/b}}{Q_{\\mathrm{ref}}^{\\,1 - 1/b}}
With the default :math:`H_{\\mathrm{ref}} = 1` this is
:math:`\\tau^{1/b} / Q_{\\mathrm{ref}}^{\\,1-1/b}`.
For a linear reservoir (*b* = 1) this reduces exactly to
:math:`\\tau`. For *b* > 1, MRT is smaller than :math:`\\tau`
whenever :math:`Q_{\\mathrm{ref}} > 1` mm/day, reflecting the
faster drainage at realistic operating storage depths.
Unlike :attr:`recession_coeff`, which equals a timescale only when b=1
(linear reservoir), MRT is a physically comparable timescale across
reservoirs with different recession exponents.
Parameters
----------
Q_ref : float
Representative steady-state discharge from this reservoir
[mm/day]. Use the long-term mean flux attributed to this
layer (e.g. mean annual discharge partitioned by reservoir).
Returns
-------
float
Mean residence time [days].
Raises
------
ValueError
If Q_ref <= 0.
"""
if Q_ref <= 0:
raise ValueError("Q_ref must be > 0.")
b = self.recession_exponent
if b == 1.0:
return self.recession_coeff
tau_eff = self.recession_coeff * self.recession_H_ref ** (b - 1.0)
return tau_eff ** (1.0 / b) / Q_ref ** (1.0 - 1.0 / b)
[docs]
class Snowpack(object):
"""
Snowpack reservoir driven by temperature.
Accumulates precipitation as snow when mean temperature is at or below
0 °C. Melts at a positive-degree-day rate when temperature is above 0 °C.
All melt is routed to the top subsurface reservoir as infiltration; direct
discharge to the river is not modeled.
Should precede the subsurface reservoir list in a watershed model.
The melt factor is a positive-degree-day factor [mm/°C/day].
"""
[docs]
def __init__(self, melt_factor=None):
"""
Initialize an empty snowpack.
Parameters
----------
melt_factor : float, optional
Positive-degree-day melt factor (mm SWE °C⁻¹ day⁻¹).
Can be set or updated later via set_melt_factor().
"""
self.Hwater = 0. # SWE
self.melt_factor = melt_factor
self.T = 0.
self.H_infiltrated = 0.
self.H_deficit = 0.
[docs]
def set_melt_factor(self, melt_factor):
"""
Set or update the positive-degree-day melt factor.
Parameters
----------
melt_factor : float
Melt rate per positive degree-day (mm SWE °C⁻¹ day⁻¹).
"""
self.melt_factor = melt_factor
[docs]
def set_temperature(self, T):
"""
Set the mean air temperature for the current time step.
Parameters
----------
T : float
Mean air temperature (°C).
"""
self.T = T
[docs]
def recharge(self, H):
"""
Apply net water input or deficit to the snowpack.
If T <= 0, positive H accumulates as snow (SWE). If T > 0,
positive H bypasses the snowpack and is passed directly to the
top subsurface reservoir via H_infiltrated. Negative H (ET > P)
is removed from the snowpack as sublimation; any remainder that
exceeds available SWE becomes H_deficit.
Parameters
----------
H : float
Net water depth for this time step (mm). Positive = input
(P - ET > 0); negative = deficit (ET - P > 0).
"""
self.H_deficit = 0. # Water deficit with neg ET; just this time step
# If positive recharge
if H >= 0:
if self.T <= 0:
self.Hwater += H
self.H_infiltrated = 0.
else:
# Incoming precip component; melt sums with this
# This is then directly passed to the first layer of the
# set of hydrological reservoirs
self.H_infiltrated = H
# If negative recharge: remove water from snowpack via sublimation.
# Any deficit beyond available SWE is passed down as H_deficit.
else:
# Sublimation (effectively) if snow present;
# Otherwise pass water deficit
if self.Hwater > -H:
self.Hwater += H
else:
self.H_deficit += H + self.Hwater
self.Hwater = 0
self.H_infiltrated = 0.
[docs]
def melt(self, dt, P=0.0):
"""
Compute positive-degree-day and rain-on-snow melt; update state.
Both terms are routed to H_infiltrated (→ top soil reservoir). If
total available energy exceeds the SWE present, the leftover is
returned as equivalent degree-days so the caller can credit it
toward frozen-soil thawing (FGI reduction) rather than losing it.
Rain-on-snow sensible heat: water arriving at T_mean > 0 °C
carries (c_p / L_f) · T · P mm SWE of latent-heat capacity.
Spring snowpacks are near-isothermal at 0 °C, so cold-content
corrections are negligible and the latent-heat term dominates.
References
----------
McCabe et al. (2007) doi:10.1175/BAMS-88-3-319
Würzer et al. (2016) doi:10.1175/JHM-D-15-0181.1
Parameters
----------
dt : float
Timestep [days].
P : float, optional
Raw liquid precipitation [mm/day]. Used to compute rain-on-snow
sensible-heat melt. Default 0 (PDD only).
Returns
-------
excess_dd : float
Leftover melt energy after the snowpack is exhausted, expressed
as degree-day equivalent [°C·day] = leftover mm SWE / melt_factor.
Zero when SWE is not fully depleted.
The melt factor (mm SWE °C⁻¹ day⁻¹) serves as the bridge
between the PDD snowmelt representation and the frozen ground
index (FGI): dividing excess melt depth (mm SWE) by melt_factor
recovers the equivalent thermal forcing in °C·day, which is the
currency the FGI uses. See Buckets._update_fgi().
"""
if self.T <= 0:
return 0.0
pdd_avail = self.melt_factor * self.T * dt # [mm SWE]
ros_avail = _CP_LF * self.T * P # [mm SWE] rain-on-snow
total_avail = pdd_avail + ros_avail
if total_avail <= self.Hwater:
actual_melt = total_avail
excess_dd = 0.0
else:
actual_melt = self.Hwater
# Leftover energy → °C·day equivalent for soil-thaw credit
excess_dd = (total_avail - actual_melt) / self.melt_factor
self.H_infiltrated += actual_melt
self.Hwater -= actual_melt
return excess_dd
[docs]
class Buckets(object):
"""
Incorporates a list of reservoirs into a linear hierarchy that sends water
either downwards or out to the surface. Reservoirs are ordered from top
(nearest Earth's surface) to bottom (deepest groundwater); this order
controls the direction of infiltration between layers.
MNiShed is designed as a daily-timestep model. This is a deliberate
design choice: the physical parameterisations — degree-day snowmelt,
Thornthwaite ET, and linear reservoir drainage — are climatological
approximations that are well-founded at daily resolution but lose physical
meaning at finer scales. The daily timestep is enforced in initialize().
"""
[docs]
def __init__(self, T_monthly_normals=None):
"""
Initialize the watershed model.
If using the ThornthwaiteChang2019 ET method, pass
T_monthly_normals here so that the thermal index I and exponent
a are computed once from climatological normals and remain fixed
throughout the simulation.
Parameters
----------
T_monthly_normals : array-like of length 12, optional
Long-term mean monthly temperatures (°C) used to compute the
Thornthwaite thermal index I and exponent a per Chang et al.
(2019), https://doi.org/10.1002/ird.2309. Required when
evapotranspiration_method is 'ThornthwaiteChang2019'.
"""
# Thornthwaite thermal index and exponent, per Chang et al. (2019)
# https://doi.org/10.1002/ird.2309
# I is climatologically imposed by the local normal temperature regime
# and must remain fixed during simulation (not recomputed each timestep).
if T_monthly_normals is not None:
self.Chang_I = self._compute_Chang_I(T_monthly_normals)
self.Chang_a = self._compute_Chang_a(self.Chang_I)
# Frozen ground index (Molnau & Bissell 1983). Disabled by default
# (threshold = inf); overridden by snowmelt.fdd_threshold in YAML or
# by run_and_score(fdd_threshold=).
self.fdd_threshold = np.inf # [°C·day]
self._fgi = 0.0 # current frozen ground index [°C·day]
def _compute_Chang_I(self, T_monthly_normals):
"""
Compute the Thornthwaite thermal index I from long-term monthly normal
temperatures, per Chang et al. (2019), Eq. 1.
https://doi.org/10.1002/ird.2309
Parameters
----------
T_monthly_normals : array-like, length 12
Long-term mean monthly temperatures (°C). Negative values are
treated as 0 per the Thornthwaite convention.
Returns
-------
I : float
Thermal index (dimensionless).
"""
Tn = np.maximum(T_monthly_normals, 0)
return np.sum((0.2 * Tn) ** 1.514)
def _compute_Chang_a(self, I):
"""
Compute the Thornthwaite exponent a from thermal index I, per
Chang et al. (2019), Eq. 1.
https://doi.org/10.1002/ird.2309
Parameters
----------
I : float
Thermal index, as returned by _compute_Chang_I.
Returns
-------
a : float
Thornthwaite exponent (dimensionless).
"""
return (6.75e-7 * I**3
- 7.71e-5 * I**2
+ 1.7912e-2 * I
+ 0.49239)
[docs]
def export_Hlist(self):
"""
Return the current water depths in all subsurface reservoirs.
Useful for checkpointing reservoir state between a spin-up run
and the main simulation, or for restarting a paused run.
Returns
-------
list of float
Water depth in each reservoir, ordered from shallowest
(index 0) to deepest.
"""
return [reservoir.Hwater for reservoir in self.reservoirs]
[docs]
def initialize(self, config_file=None, enforce_water_balance=None):
"""
Set up the model from a YAML configuration file.
Reads the configuration file, loads the input time series, builds
the reservoir stack, instantiates snowpack if temperature data are
present, optionally computes the water-year ET multiplier, and runs
any requested spin-up cycles. Part of the CSDMS Basic Model Interface.
Parameters
----------
config_file : str, optional
Path to the YAML configuration file. If None, all required
values must be set on the object directly before calling
update().
enforce_water_balance : str or None, optional
How to scale ET to close the water balance. When None (default),
the value is read from ``general: enforce_water_balance`` in the
YAML config, which itself defaults to ``'water-year'`` if absent.
Accepted values:
* ``'water-year'`` — Scale ET by a per-water-year multiplier so
that P - Q - ET = 0 over each water year.
* ``'global'`` — Scale ET by a single multiplier computed from
sum(P - Q_obs) / sum(ET_raw) over the full record. No
per-year overfitting; does not add hidden degrees of freedom
to AIC comparisons.
* ``'none'`` — Use raw ET without correction. Appropriate only
when supplying trusted measured ET (e.g. eddy covariance).
Using ``'none'`` with ThornthwaiteChang2019 will raise a
warning because Thornthwaite ET carries large systematic
biases.
"""
if config_file is None:
warnings.warn("No configuration file provided; all values needed "+
"for a model run therefore must be set independently.",
stacklevel=2)
# Parse YAML configuration file
# And assign variables except for optimization bounds and plotting
if config_file is not None:
try:
with open(config_file, "r") as yamlfile:
self.cfg = yaml.load(yamlfile, Loader=yaml.FullLoader)
except FileNotFoundError as e:
# Raise rather than sys.exit so library / BMI / notebook callers
# can handle a bad config; the CLI (main()) translates this into
# a clean exit code.
raise FileNotFoundError(
f"Config file not found: {config_file}") from e
except yaml.YAMLError as e:
raise yaml.YAMLError(
f"Could not parse config file {config_file}:\n{e}") from e
# Read input time series from the CSV path specified in the config
self.hydrodata = pd.read_csv(
self.cfg['timeseries']['datafile'],
parse_dates=['Date'])
# Set variables on reservoirs
# First, check if all reservoirs have the same length
for _key in self.cfg['reservoirs'].keys():
if len(self.cfg['reservoirs'][_key]) == \
len(self.cfg['initial_conditions']['water_reservoir_effective_depths__mm']):
pass
else:
raise ValueError(_key + ' within "reservoirs" contains a\n'+
'different number of entries, implying'+
'a different number of subsurface water\n'+
'reservoirs, than '+
'water_reservoir_effective_depths__mm'+
' within "initial_conditions".')
# If all are the same length, then we will assign a number of reservoirs
self.n_reservoirs = len(
self.cfg['initial_conditions']['water_reservoir_effective_depths__mm'])
# Using this, we will build a list of reservoir objects
# and initialize them based on the provided inputs
_pdm_H0 = self.cfg['reservoirs'].get('pdm_H0__mm',
[None] * self.n_reservoirs)
_f_tile = self.cfg['reservoirs'].get('tile_fractions',
[0.0] * self.n_reservoirs)
_tau_tile = self.cfg['reservoirs'].get('tile_residence_times__days',
[None] * self.n_reservoirs)
_recession_exp = self.cfg['reservoirs'].get('recession_exponents',
[1.0] * self.n_reservoirs)
_junction_types = self.cfg['reservoirs'].get('junction_types',
['fraction'] * self.n_reservoirs)
_leakance_R = self.cfg['reservoirs'].get('leakance_R__days',
[None] * self.n_reservoirs)
_H_threshold = self.cfg['reservoirs'].get('H_threshold__mm',
[0.0] * self.n_reservoirs)
_mp_threshold = self.cfg['reservoirs'].get('multipath_thresholds__mm',
[None] * self.n_reservoirs)
_mp_timescale = self.cfg['reservoirs'].get('multipath_timescales__days',
[None] * self.n_reservoirs)
self.reservoirs = [
Reservoir(
recession_coeff = self.cfg['reservoirs']['recession_coefficients'][i],
f_to_discharge = self.cfg['reservoirs']['exfiltration_fractions'][i],
Hmax = self.cfg['reservoirs']['maximum_effective_depths__mm'][i],
pdm_H0 = _pdm_H0[i],
H0 = self.cfg['initial_conditions'][
'water_reservoir_effective_depths__mm'][i],
f_tile = _f_tile[i],
tau_tile = _tau_tile[i],
junction_type = _junction_types[i],
leakance_R = _leakance_R[i],
H_threshold = _H_threshold[i] if _H_threshold[i] is not None else 0.0,
multipath_threshold = _mp_threshold[i],
multipath_timescale = _mp_timescale[i],
)
for i in range(self.n_reservoirs)]
for i, b_exp in enumerate(_recession_exp):
self.reservoirs[i].recession_exponent = float(b_exp)
# Check if bottom reservoir discharges all to river: conserve mass.
# But allow through with a warning in case the user wants a
# deep and non-discharging reservoir (although this could be set up
# explicitly too).
if self.reservoirs[-1].f_to_discharge < 1:
warnings.warn("f_to_discharge of bottom water-storage layer < 1.\n"+
"You are not conserving mass.", stacklevel=2)
# Set scalar variables based on yaml
self.melt_factor = self.cfg['snowmelt']['PDD_melt_factor']
self.snow_insulation_k = self.cfg['snowmelt'].get('snow_insulation_k', 0.0)
self.fgi_decay_coeff = self.cfg['snowmelt'].get('fgi_decay_coeff', 0.97)
self.fdd_threshold = self.cfg['snowmelt'].get('fdd_threshold', np.inf)
self.et_method = self.cfg['catchment']['evapotranspiration_method']
if self.et_method == 'ThornthwaiteChang2019' and not hasattr(self, 'Chang_I'):
n_years = len(self.hydrodata) / 365.25
if n_years < 20:
warnings.warn(
f"ThornthwaiteChang2019: monthly temperature normals were not provided "
f"and will be computed from the {n_years:.1f}-year input record. "
f"For short records this may not represent long-term climatology. "
f"Pass T_monthly_normals to Buckets() for reliable results.",
UserWarning, stacklevel=2,
)
T_normals = (self.hydrodata['Mean Temperature [C]']
.groupby(pd.DatetimeIndex(self.hydrodata['Date']).month)
.mean()
.reindex(range(1, 13))
.values)
self.Chang_I = self._compute_Chang_I(T_normals)
self.Chang_a = self._compute_Chang_a(self.Chang_I)
self.water_year_start_month = self.cfg['catchment']['water_year_start_month']
self.drainage_basin_area__km2 = self.cfg['catchment']['drainage_basin_area__km2']
self.baseflow_Q = self.cfg['catchment'].get('baseflow_Q', 0.0)
# Per-step flux-partition components, populated by update() and read
# by the BMI wrapper (mm/day). baseflow_Q itself is an output-layer
# term applied by run_and_score / the BMI, not by the cascade.
self._flux_direct_runoff = 0.0
self._flux_tile = 0.0
self._flux_multipath = 0.0
# Module enable/disable flags — read from config, default to on
# (except direct_runoff, which defaults to off).
_modules = self.cfg.get('modules', {})
self.use_snowpack = _modules.get('snowpack', True)
self.use_frozen_ground = _modules.get('frozen_ground', True)
self.use_rain_on_snow = _modules.get('rain_on_snow', True)
self.use_direct_runoff = _modules.get('direct_runoff', False)
self.use_dtr_fgi_decay = _modules.get('dtr_fgi_decay', True)
self.use_et_water_stress = _modules.get('et_water_stress', False)
self.use_et_reservoir_draw = _modules.get('et_reservoir_draw', False)
if self.use_et_water_stress and self.use_et_reservoir_draw:
warnings.warn(
"et_water_stress and et_reservoir_draw are mutually exclusive. "
"et_reservoir_draw will be used; et_water_stress ignored.",
UserWarning, stacklevel=2,
)
self.use_et_water_stress = False
self.et_scale = 1.0 # universal ET multiplier; default 1.0 (no-op); override via
# run_and_score(et_scale=) or as a free calibration parameter
# Fraction of ET_pot drawn from reservoir 0 (shallow); 1-et_alpha from reservoir 1.
# Read from general: et_alpha in config YAML; override via run_and_score(et_alpha=).
self.et_alpha = self.cfg['general'].get('et_alpha', 1.0)
# Wilting-point threshold for soil reservoir ET draw [mm].
# wp_soil_sigma > 0 enables the spatially variable (Gaussian CDF) form.
self.wp_soil = 0.0
self.wp_soil_sigma = 0.0
# Check if there is a mean temperature column for snowpack.
# If not, note that no snowpack processes will be included
self.has_snowpack = (self.use_snowpack and
'Mean Temperature [C]' in self.hydrodata.columns)
# Detect optional daily T_min / T_max for DTR-based FGI decay.
self._has_trange = (self.use_dtr_fgi_decay and
'Minimum Temperature [C]' in self.hydrodata.columns and
'Maximum Temperature [C]' in self.hydrodata.columns)
if self.has_snowpack:
# Instantiate snowpack
self.snowpack = Snowpack(self.melt_factor) # allow changes to melt factor later
elif 'Mean Temperature [C]' in self.hydrodata.columns and not self.use_snowpack:
pass # snowpack deliberately disabled via modules config
else:
warnings.warn('"Mean Temperature [C]" has not been set. '
'No snowpack processes will be simulated.', stacklevel=2)
# How many times to loop the full time series for the spin-up
# Maybe I should permit a more sophisticated spin-up at some point!
self.n_spin_up_cycles = self.cfg['general']['spin_up_cycles']
# Resolve enforce_water_balance: keyword argument takes precedence over YAML,
# which defaults to 'water-year' if the key is absent.
if enforce_water_balance is None:
enforce_water_balance = self.cfg['general'].get('enforce_water_balance',
'water-year')
# Normalize legacy boolean values from old YAML configs or API calls.
if enforce_water_balance is True:
enforce_water_balance = 'water-year'
elif enforce_water_balance is False:
enforce_water_balance = 'none'
self.enforce_water_balance = enforce_water_balance
# Fraction of positive daily recharge that bypasses the reservoir
# cascade and exits directly as runoff. Conceptually inspired by
# Hortonian (infiltration-excess) overland flow, but at a daily
# timestep rainfall intensity is unavailable, so the fraction cannot
# be a rigorous physical representation -- except in extreme events
# where intense rainfall dominates the daily total. In practice it
# acts as a calibrated fast-bypass fraction, off by default.
self.direct_runoff_fraction = self.cfg['general'].get(
'direct_runoff_fraction', 0.0)
# Initial conditions if resuming from prior run
if self.has_snowpack:
self.snowpack.Hwater = self.cfg['initial_conditions']['snowpack__mm_SWE']
# Reservoir H0 values are set in the list comprehension above.
# Enforce the daily timestep. MNiShed is a daily model by design:
# degree-day snowmelt, Thornthwaite ET, and linear reservoir drainage
# are all daily-scale parameterisations.
if (self.hydrodata['Date'].diff()[1:] == pd.Timedelta('1 day')).all():
self.dt = 1.
else:
raise ValueError(
"MNiShed requires a continuous daily time series "
"(exactly 1-day intervals throughout). Sub-daily or "
"irregular timesteps are not supported; the physical "
"parameterisations (degree-day snowmelt, Thornthwaite ET, "
"linear reservoir drainage) are daily-scale approximations."
)
# Compute specific discharge from data
self.hydrodata['Specific Discharge [mm/day]'] = (
self.hydrodata['Discharge [m^3/s]']
/ (self.drainage_basin_area__km2*1E3) * 86400)
# Create columns for model output
self.hydrodata['Specific Discharge (modeled) [mm/day]'] = pd.NA
self.hydrodata['Snowpack (modeled) [mm SWE]'] = pd.NA
self.hydrodata['Subsurface storage (modeled total) [mm]'] = pd.NA
self._store_depths = False
# Start out at first timestep
# Could modify this to pick up a run in the middle
# Or start at the beginning of a water year
# for example
self._timestep_i = self.hydrodata.index[0]
# Carry-over of any water deficit from the previous timestep that the
# deepest reservoir could not satisfy (ET > P + all storage). This is
# the unpaid debt passed forward one step; distinct from
# Reservoir.H_deficit and Snowpack.H_deficit, which are per-timestep only.
self.H_deficit_carry = 0.
# Compute the water years based on the input month for
# water-year rollover
self.compute_water_year()
# Compute ET, optionally scaling to close the water balance.
self.global_et_multiplier = 1.0 # default; overwritten below if global mode
if self.enforce_water_balance == 'global':
self.compute_global_ET_multiplier()
elif self.enforce_water_balance == 'water-year':
self.compute_ET_multiplier()
elif self.et_method == 'ThornthwaiteChang2019':
warnings.warn(
"enforce_water_balance='none' with ThornthwaiteChang2019: Thornthwaite ET "
"will not be rescaled to close the water balance. "
"Thornthwaite ET carries large systematic biases; omitting "
"the correction is likely to produce significant mass-balance "
"errors. Consider enforce_water_balance='water-year' or 'global', "
"or supply measured ET via evapotranspiration_method: datafile.",
stacklevel=2,
)
self.compute_ET()
# Model spin-up, if requested
for _ in range(self.n_spin_up_cycles):
self.run() # Spin-up; run() resets _timestep_i each call
[docs]
def compute_water_year(self):
"""
Assign a water-year label to each row in self.hydrodata.
Adds a 'Water Year' column. A water year begins on
water_year_start_month and is labelled by the calendar year in
which it ends. For example, with a start month of October (USGS
convention), 1 Oct 2020 – 30 Sep 2021 is water year 2021.
When water_year_start_month is 1 (January), the water year equals
the calendar year and no offset is applied.
"""
self.hydrodata['Water Year'] = pd.DatetimeIndex(self.hydrodata['Date']).year
if self.water_year_start_month > 1:
self.hydrodata['Water Year'] += (
pd.DatetimeIndex(self.hydrodata['Date']).month
>= self.water_year_start_month
)
[docs]
def compute_global_ET_multiplier(self, start=None, end=None):
"""
Compute a single ET scale factor to close the long-term water balance.
Computes scale = sum(P - Q_obs) / sum(ET_raw) over all days where
P, ET, and Q_obs are all finite, and stores it as self.global_et_multiplier.
Unlike compute_ET_multiplier(), which fits one multiplier per water
year, this uses a single ratio and does not overfit interannual
variability. Appropriate when enforce_water_balance is set to 'global'.
Parameters
----------
start : str or datetime-like, optional
Start of the window used to compute the multiplier (inclusive).
If None, uses the full record.
end : str or datetime-like, optional
End of the window used to compute the multiplier (inclusive).
If None, uses the full record.
"""
if self.et_method == 'datafile':
_raw_ET = np.asarray(self.hydrodata['Evapotranspiration [mm/day]'])
else:
_raw_ET = np.asarray(self.evapotranspiration_Chang2019())
# Mask to days where all three water-balance terms are finite so that
# P_sum, ET_sum, and Q_sum are computed over the same day-set.
mask = (self.hydrodata['Specific Discharge [mm/day]'].notna()
& self.hydrodata['Precipitation [mm/day]'].notna()
& np.isfinite(_raw_ET))
if start is not None:
mask &= self.hydrodata['Date'] >= pd.Timestamp(start)
if end is not None:
mask &= self.hydrodata['Date'] <= pd.Timestamp(end)
P_sum = float(self.hydrodata.loc[mask, 'Precipitation [mm/day]'].sum())
Q_sum = float(self.hydrodata.loc[mask, 'Specific Discharge [mm/day]'].sum())
ET_sum = float(_raw_ET[mask].sum())
if ET_sum <= 0:
warnings.warn("Global ET sum is zero or negative; multiplier set to 1.0.",
UserWarning, stacklevel=2)
self.global_et_multiplier = 1.0
else:
self.global_et_multiplier = (P_sum - Q_sum) / ET_sum
if self.global_et_multiplier <= 0:
warnings.warn(
f"Global ET multiplier = {self.global_et_multiplier:.3f} (≤ 0); "
"P − Q ≤ 0 over the water-balance window. Setting to 1.0.",
UserWarning, stacklevel=2)
self.global_et_multiplier = 1.0
[docs]
def compute_ET_multiplier(self):
"""
Compute per-water-year ET scaling factors to enforce water balance.
For each water year, computes the ratio of required ET (P - Q) to
measured or computed ET, and stores this as 'ET multiplier' in
self.hydrodata_WY_means. This multiplier is later applied in
compute_ET() to scale ET so that P - Q - ET = 0 over each water year.
"""
# Originally used "sum", but then used "mean" so the headers would
# still be sensible
self.hydrodata_WY_means = self.hydrodata.groupby(
self.hydrodata['Water Year']).mean(numeric_only=True)
# Not needed, but no real harm in calculating
self.hydrodata_WY_means['Runoff ratio'] = (
self.hydrodata_WY_means['Specific Discharge [mm/day]']
/ self.hydrodata_WY_means['Precipitation [mm/day]'])
_ET_required = -(self.hydrodata_WY_means['Specific Discharge [mm/day]'] -
self.hydrodata_WY_means['Precipitation [mm/day]'])
self.hydrodata_WY_means['ET multiplier'] = (
_ET_required / self.hydrodata_WY_means['Evapotranspiration [mm/day]'])
_bad_wy = self.hydrodata_WY_means.index[
self.hydrodata_WY_means['ET multiplier'] <= 0]
if len(_bad_wy) > 0:
warnings.warn(
f"ET multiplier <= 0 in water year(s) {list(_bad_wy)}. "
"Annual discharge exceeds precipitation for those years; "
"scaled ET will be zero or negative (water-generating). "
"Check gauge data or consider removing those years.",
stacklevel=2,
)
[docs]
def compute_ET(self):
"""
Build the ET time series used in the model.
Obtains raw daily ET from the input data file or the Thornthwaite–Chang
2019 equation (see evapotranspiration_Chang2019()). Five modes:
et_scale (default 1.0) is applied as a universal final multiplier in
every mode. Four modes determine the base ET before et_scale:
1. et_water_stress=True (or et_reservoir_draw + 'none'): base = raw ET.
et_scale is then the sole WB-closure mechanism.
2. enforce_water_balance='water-year' (stress off): base = raw ET ×
per-water-year multiplier so P - Q - ET ≈ 0 each year (before et_scale).
3. enforce_water_balance='global' (stress off): base = raw ET × single
full-record multiplier from sum(P - Q_obs) / sum(ET_raw).
4. enforce_water_balance='none' (stress off): base = raw ET.
In modes 2–4, et_scale=1.0 (the default) leaves existing behaviour
unchanged. Set et_scale ≠ 1 to add a calibrated offset from the
water-balance multiplier (e.g. to capture decade-specific land-cover
or climate sensitivity); this relaxes exact WB closure.
The result is stored as 'ET for model [mm/day]' in self.hydrodata.
"""
if self.et_method == 'datafile':
_raw_ET = self.hydrodata['Evapotranspiration [mm/day]']
elif self.et_method == 'ThornthwaiteChang2019':
_raw_ET = self.evapotranspiration_Chang2019()
else:
raise ValueError('evapotranspiration_method must be "datafile" or '+
'"ThornthwaiteChang2019".')
if self.use_et_water_stress or (self.use_et_reservoir_draw and
self.enforce_water_balance == 'none'):
# In stress modes, et_scale is the sole WB-closure mechanism;
# no mode-specific multiplier is applied before it.
_et_mode = np.asarray(_raw_ET)
elif self.enforce_water_balance == 'global':
_et_mode = np.asarray(_raw_ET) * self.global_et_multiplier
elif self.enforce_water_balance == 'water-year':
# Merge per-water-year multiplier into hydrodata, then apply.
# Drop any previous 'ET multiplier' column first so that calling
# compute_ET() more than once (e.g. after a module flag override)
# does not produce duplicate _x/_y suffix columns from pandas merge.
# Use .to_numpy() to multiply by position rather than pandas index
# so that any index reset from the merge cannot silently misalign rows.
if 'ET multiplier' in self.hydrodata.columns:
self.hydrodata = self.hydrodata.drop(columns=['ET multiplier'])
self.hydrodata = self.hydrodata.merge(
self.hydrodata_WY_means['ET multiplier'],
on='Water Year')
nan_wy = (self.hydrodata_WY_means['ET multiplier']
.index[self.hydrodata_WY_means['ET multiplier'].isna()]
.tolist())
if nan_wy:
warnings.warn(
f"ET multiplier is NaN for water year(s) {nan_wy} "
f"(no discharge observations). Raw ET used for those years "
f"(enforce_water_balance ineffective).",
UserWarning, stacklevel=2,
)
self.hydrodata['ET multiplier'] = (
self.hydrodata['ET multiplier'].fillna(1.0))
_et_mode = np.asarray(_raw_ET) * self.hydrodata['ET multiplier'].to_numpy()
else:
_et_mode = np.asarray(_raw_ET)
# et_scale (default 1.0) is a universal calibration multiplier applied
# after the mode-specific water-balance correction. Under stress modes
# it is the sole correction (mode factor = raw ET); under other modes it
# provides an additional degree of freedom — e.g. for decade-specific
# land-cover or climate sensitivity — at the cost of exact WB closure.
self.hydrodata['ET for model [mm/day]'] = _et_mode * self.et_scale
def _et_stress_factor(self):
"""
Water-availability multiplier applied to potential ET each time step.
Returns 1 - exp(-H_shallow / H0), where H_shallow is the current water
depth in the shallowest reservoir and H0 is its PDM characteristic
storage depth (pdm_H0). The multiplier is zero when the reservoir is
empty and approaches 1 as it fills, so actual ET = potential ET * factor.
Returns 1.0 (no stress) when et_water_stress is disabled or when the
shallow reservoir has no pdm_H0 set.
"""
if not self.use_et_water_stress:
return 1.0
H0 = self.reservoirs[0].pdm_H0
if H0 is None:
return 1.0
return 1.0 - np.exp(-max(self.reservoirs[0].Hwater, 0.0) / H0)
def _draw_et_from_reservoirs(self, ET_pot):
"""
Remove actual ET from reservoirs after cascade drainage.
ET_pot is partitioned between reservoir 0 (shallow) and reservoir 1
(soil) by et_alpha. Each draw is capped at available storage so
Hwater never goes negative; unmet demand is implicitly lost as
water-stress reduction of actual ET. A negative ET_pot represents
condensation/dew (a water input); if that input would push a
reservoir above its Hmax cap, the surplus is shed directly to
runoff rather than stored above the cap.
Parameters
----------
ET_pot : float
Potential ET for this time step [mm/day], already scaled by
et_scale or the global water-balance multiplier.
Returns
-------
float
Condensation surplus [mm] shed to runoff because it overtopped
a reservoir's Hmax. Zero in the usual (ET removal) case; the
caller adds it to the day's discharge.
"""
fractions = [self.et_alpha, 1.0 - self.et_alpha]
excess = 0.0
for i, frac in enumerate(fractions):
if i >= len(self.reservoirs):
break
demand = frac * ET_pot
H = self.reservoirs[i].Hwater
if i == 1 and self.wp_soil > 0.0:
# Soil reservoir: scale demand by fraction of catchment above WP.
if self.wp_soil_sigma > 0.0:
# Gaussian CDF: spatially variable WP — σ→0 recovers hard threshold.
f_avail = 0.5 * (1.0 + math.erf((H - self.wp_soil)
/ (self.wp_soil_sigma * math.sqrt(2.0))))
demand = demand * f_avail
else:
# Hard threshold: no ET extraction below wp_soil.
if H <= self.wp_soil:
continue
H = H - self.wp_soil # only water above WP is available
actual = min(demand, max(0.0, H))
self.reservoirs[i].Hwater -= actual
# Condensation (negative ET) that overtops the cap runs off.
if self.reservoirs[i].Hwater > self.reservoirs[i].Hmax:
excess += self.reservoirs[i].Hwater - self.reservoirs[i].Hmax
self.reservoirs[i].Hwater = self.reservoirs[i].Hmax
return excess
def _compute_snowpack(self, time_step):
"""
Update the snowpack for one timestep; return excess melt energy.
Sets temperature, applies net water input, then calls melt() with
the raw precipitation so rain-on-snow sensible heat is included.
Updates self.H_deficit_carry from the snowpack before returning.
Returns
-------
excess_dd : float
Leftover melt energy [°C·day] after SWE is fully depleted.
Pass to _update_fgi() to credit toward frozen-soil thawing.
"""
T = self.hydrodata['Mean Temperature [C]'][time_step]
P = self.hydrodata['Precipitation [mm/day]'][time_step]
self.snowpack.set_temperature(T)
if self.use_et_reservoir_draw:
_sp_recharge = P + self.H_deficit_carry
else:
_sp_recharge = (P
- self.hydrodata['ET for model [mm/day]'][time_step]
* self._et_stress_factor()
+ self.H_deficit_carry)
self.snowpack.recharge(_sp_recharge)
excess_dd = self.snowpack.melt(self.dt,
P=(P if self.use_rain_on_snow else 0.0))
self.H_deficit_carry = self.snowpack.H_deficit
return excess_dd
def _update_fgi(self, time_step, excess_dd=0.0):
"""
Update the frozen ground index; flag top reservoir as frozen if needed.
FGI(t) = max(0, A_t · FGI(t-1) - T_eff - excess_dd)
A_t = daily FGI decay coefficient (climate-dependent; see below)
T_eff = T_mean · exp(-snow_insulation_k · SWE)
T_mean < 0 → FGI rises (freezing degree-days accumulate)
T_mean > 0 → FGI falls (warm air thaws)
excess_dd → additional thaw credited from leftover snowmelt
energy [°C·day] = leftover mm SWE / melt_factor
DTR-based decay (when Minimum/Maximum Temperature columns present):
On days entirely below freezing (T_max ≤ 0), A_t = 1.0 — no
sub-daily thawing, FGI accumulates unimpeded. When the diurnal
cycle straddles 0°C (T_min < 0 < T_max), a fraction of the day
is above freezing and drives partial thawing:
f_above = T_max / (T_max - T_min) [linear diurnal cycle]
A_t = 1 - (1 - fgi_decay_coeff) · f_above
When f_above → 1, A_t → fgi_decay_coeff (M&B 0.97 maximum decay).
When f_above = 0, A_t = 1.0 (no passive decay).
This naturally gives near-unity A for continental climates (where
T_max rarely crosses 0 during cold spells) and M&B-like decay for
maritime climates with frequent freeze-thaw oscillations.
Fallback (T_min / T_max absent): A_t = fgi_decay_coeff (constant,
original M&B behaviour).
When FGI exceeds fdd_threshold, the top reservoir's f_to_discharge
is set to 1.0 so all drainage becomes direct runoff, simulating
frozen-soil blockage of deep infiltration.
References
----------
Molnau & Bissell (1983) https://westernsnowconference.org/bibliography/1983Molnau.pdf
(Western Snow Conference proceedings — original source for the
FGI formulation and the exponential snow insulation factor)
Shanley & Chalmers (1999) doi:10.1002/(SICI)1099-1085(199909)13:12/13
<1843::AID-HYP879>3.0.CO;2-G
Dunne & Black (1971) doi:10.1029/WR007i005p01160
Snow insulation parameterisation (exponential form, original: M&B 1983):
LISFLOOD: van der Knijff et al. (2010) doi:10.1080/02626660902852568
GSSHA: Downer & Ogden (2004) doi:10.1061/(ASCE)1084-0699(2004)9:3(254)
Parameters
----------
time_step : int
Current row index in self.hydrodata.
excess_dd : float, optional
Degree-day equivalent of leftover melt energy from
_compute_snowpack() [°C·day]. Reduces FGI alongside air
temperature. Default 0 (temperature-only FGI, per Molnau &
Bissell). Computed as leftover_mm_SWE / melt_factor, where
melt_factor (mm SWE °C⁻¹ day⁻¹) converts the residual melt
depth back to the degree-day units that the FGI operates in.
See Snowpack.melt(). Not scaled by snow insulation because
meltwater delivers heat directly to the soil surface.
Returns
-------
f0 : float
Calibrated f_to_discharge of the top reservoir, saved before any
frozen-ground override. Restore it after the discharge loop.
"""
f0 = self.reservoirs[0].f_to_discharge
if not self.use_frozen_ground or np.isinf(self.fdd_threshold):
return f0
if 'Mean Temperature [C]' not in self.hydrodata.columns:
raise ValueError(
"fdd_threshold is set but 'Mean Temperature [C]' is missing "
"from the input data. FGI requires temperature data."
)
T = self.hydrodata['Mean Temperature [C]'][time_step]
T_eff = T * np.exp(-self.snow_insulation_k * self.snowpack.Hwater)
if self._has_trange:
T_max = self.hydrodata['Maximum Temperature [C]'][time_step]
T_min = self.hydrodata['Minimum Temperature [C]'][time_step]
DTR = T_max - T_min
if DTR > 0 and T_max > 0:
f_above = min(1.0, T_max / DTR)
A_t = 1.0 - (1.0 - self.fgi_decay_coeff) * f_above
else:
A_t = 1.0 # entirely below freezing; no sub-daily thawing
else:
A_t = self.fgi_decay_coeff
self._fgi = max(0.0, A_t * self._fgi - T_eff - excess_dd)
if self._fgi > self.fdd_threshold:
self.reservoirs[0].f_to_discharge = 1.0
return f0
[docs]
def update(self, time_step=None):
"""
Advance the model by one time step.
Routes precipitation minus ET through the snowpack (if present) and
then through each subsurface reservoir in order from shallowest to
deepest. Stores modeled specific discharge, snowpack SWE, and total
subsurface storage in self.hydrodata for the current time step.
Part of the CSDMS Basic Model Interface.
Parameters
----------
time_step : int, optional
Index into self.hydrodata for the time step to update. If None,
uses and then increments the internal counter self._timestep_i.
"""
if time_step is None:
time_step = self._timestep_i
# Advance internal variable if external time step is not selected.
# This should be a different variable and therefore not
# modify the value of "time_step" by reference.
self._timestep_i += 1
# Skip timesteps with missing forcing: leave reservoir states unchanged
# and record NaN output so the scoring mask excludes these days.
_P = self.hydrodata['Precipitation [mm/day]'][time_step]
_ET = self.hydrodata['ET for model [mm/day]'][time_step]
_skip = not np.isfinite(_P) or not np.isfinite(_ET)
if not _skip and self.has_snowpack:
_skip = not np.isfinite(
self.hydrodata['Mean Temperature [C]'][time_step])
if _skip:
self.hydrodata.at[time_step,
'Specific Discharge (modeled) [mm/day]'] = np.nan
if self.has_snowpack:
self.hydrodata.at[time_step, 'Snowpack (modeled) [mm SWE]'] = (
self.snowpack.Hwater)
self.hydrodata.at[time_step,
'Subsurface storage (modeled total) [mm]'] = (
np.sum([res.Hwater for res in self.reservoirs])
+ np.sum([res.tile_res.Hwater for res in self.reservoirs
if res.tile_res is not None]))
self._flux_direct_runoff = np.nan
self._flux_tile = np.nan
self._flux_multipath = np.nan
return
excess_dd = self._compute_snowpack(time_step) if self.has_snowpack else 0.0
f0 = self._update_fgi(time_step, excess_dd)
qi = 0.0
for i in range(len(self.reservoirs)):
if i == 0:
if self.has_snowpack:
_recharge = (self.snowpack.H_infiltrated
+ self.H_deficit_carry)
else:
if self.use_et_reservoir_draw:
_recharge = (
self.hydrodata['Precipitation [mm/day]'][time_step] +
self.H_deficit_carry)
else:
_recharge = (
self.hydrodata['Precipitation [mm/day]'][time_step] -
self.hydrodata['ET for model [mm/day]'][time_step]
* self._et_stress_factor() +
self.H_deficit_carry)
# Hortonian-inspired bypass: fraction exits without entering reservoirs.
_q_direct = (max(0.0, _recharge) * self.direct_runoff_fraction
if self.use_direct_runoff else 0.0)
qi += _q_direct
self.reservoirs[i].recharge(_recharge - _q_direct)
else:
# Let water infiltrate to lower layers effectively
# instantaneously; this isn't quite realistic, but
# should be a simpler approach for parameter calibration
# (Plus, this is just the water that did exit that above
# container, which is already free to discharge, so this
# seems more self-consistent.)
# The amount of infiltrated water from above could be
# negative; this represents ET in excess of what the
# unsaturated zone ("soil zone"; top reservoir) holds.
# Deeper loss of water could be due to plants tapping into
# groundwater, direct lake evaporation, etc. -- or related
# to this model not being physical or distributed, so just
# needing to balance mass.
self.reservoirs[i].recharge(
self.reservoirs[i-1].H_to_next
+ self.reservoirs[i-1].H_deficit)
H_next = (self.reservoirs[i + 1].Hwater
if i + 1 < len(self.reservoirs) else None)
self.reservoirs[i].discharge(self.dt, H_next=H_next)
qi += self.reservoirs[i].H_discharge
self.reservoirs[0].f_to_discharge = f0
self.H_deficit_carry = self.reservoirs[-1].H_deficit
# Record per-step flux-partition components (mm/day) for diagnostics
# and BMI coupling. These are already reflected in the total
# specific discharge above; recording them changes no result.
self._flux_direct_runoff = _q_direct
self._flux_tile = sum(r.H_tile for r in self.reservoirs)
self._flux_multipath = sum(r.H_multipath for r in self.reservoirs)
if self.use_et_reservoir_draw:
qi += self._draw_et_from_reservoirs(
self.hydrodata['ET for model [mm/day]'][time_step])
self.hydrodata.at[time_step, 'Specific Discharge (modeled) [mm/day]'] = qi
if self.has_snowpack:
self.hydrodata.at[time_step, 'Snowpack (modeled) [mm SWE]'] = self.snowpack.Hwater
self.hydrodata.at[time_step, 'Subsurface storage (modeled total) [mm]'] = (
np.sum([res.Hwater for res in self.reservoirs])
+ np.sum([res.tile_res.Hwater for res in self.reservoirs
if res.tile_res is not None]))
if self._store_depths:
for _i, _res in enumerate(self.reservoirs):
self.hydrodata.at[time_step,
f'H_reservoir_{_i} (modeled) [mm]'] = _res.Hwater
[docs]
def evapotranspiration_Chang2019(self, Tmax=None, Tmin=None, photoperiod=None,
k=0.69):
"""
Modified daily Thornthwaite ET₀ equation.
Chang et al. (2019), Eq. 1–4. https://doi.org/10.1002/ird.2309
Parameters
----------
Tmax : array-like
Daily maximum temperature (°C).
Tmin : array-like
Daily minimum temperature (°C).
photoperiod : array-like
Photoperiod N (hours), computed from latitude and Julian day
per Allen et al. (1998), Eqs. 2–4 of Chang et al. (2019).
k : float
Calibration coefficient in the T_ef formula. Default 0.69,
recommended by Pereira & Pruitt (2004) for daily ET₀
(https://doi.org/10.1016/j.agrformet.2004.01.005).
Use 0.72 for monthly ET₀ per Camargo et al. (1999).
Returns
-------
ET0 : array-like
Daily reference evapotranspiration (mm day⁻¹).
"""
if Tmax is None:
Tmax = self.hydrodata['Maximum Temperature [C]']
if Tmin is None:
Tmin = self.hydrodata['Minimum Temperature [C]']
if photoperiod is None:
photoperiod = self.hydrodata['Photoperiod [hr]']
Tef = 0.5 * k * (3 * Tmax - Tmin)
C = photoperiod / 360.
quadratic = C * (-415.85 + 32.24 * Tef - 0.43 * Tef**2)
power_law = 16. * C * (10. * Tef / self.Chang_I) ** self.Chang_a
ET0 = np.where(np.isnan(Tef), np.nan,
np.where(Tef > 26, quadratic,
np.where(Tef > 0, power_law,
0.)))
return ET0
[docs]
def run(self, start=None, end=None, store_depths=False):
"""
Advance the model through time steps in self.hydrodata.
Resets the internal time counter before iterating, so run() is safe
to call more than once (e.g. spin-up then main run). Captures storage
at the start of the run for check_mass_balance(). Part of the CSDMS
Basic Model Interface.
Parameters
----------
start : str or datetime-like, optional
First date to simulate (inclusive). If None, starts from the
beginning of self.hydrodata.
end : str or datetime-like, optional
Last date to simulate (inclusive). If None, runs to the end of
self.hydrodata.
Notes
-----
When start or end is provided, only the matching rows are stepped
through; reservoir states at the boundaries carry in/out naturally,
making windowed runs chainable (e.g. spin-up on pre-decade data
followed by a scored run on the decade itself).
"""
self._timestep_i = self.hydrodata.index[0]
self._run_initial_storage = (
sum(res.Hwater for res in self.reservoirs)
+ (self.snowpack.Hwater if self.has_snowpack else 0.0)
)
if start is not None or end is not None:
date_mask = pd.Series(True, index=self.hydrodata.index)
if start is not None:
date_mask &= self.hydrodata['Date'] >= pd.Timestamp(start)
if end is not None:
date_mask &= self.hydrodata['Date'] <= pd.Timestamp(end)
_run_idx = self.hydrodata.index[date_mask]
else:
_run_idx = None
self._store_depths = store_depths
if store_depths:
for _i in range(len(self.reservoirs)):
self.hydrodata[f'H_reservoir_{_i} (modeled) [mm]'] = pd.NA
# JIT path: available when numba is installed, no PDM,
# and no et_water_stress (which requires pdm_H0 logic not in the JIT).
_can_jit = (
_numba_available
and all(r.pdm_H0 is None for r in self.reservoirs)
and not self.use_et_water_stress
)
if _can_jit:
_idx = _run_idx if _run_idx is not None else self.hydrodata.index
_hd = self.hydrodata.loc[_idx]
_needs_T = self.has_snowpack or (
self.use_frozen_ground and not np.isinf(self.fdd_threshold))
_T = (_hd['Mean Temperature [C]'].to_numpy(dtype=np.float64)
if _needs_T
else np.zeros(len(_idx), dtype=np.float64))
_Tmin = (_hd['Minimum Temperature [C]'].to_numpy(dtype=np.float64)
if self._has_trange
else np.full(len(_idx), np.nan))
_Tmax = (_hd['Maximum Temperature [C]'].to_numpy(dtype=np.float64)
if self._has_trange
else np.full(len(_idx), np.nan))
_jmap = {'fraction': 0, 'leakance': 1, 'threshold': 2}
(_Q, _SWE, _Hsub, _Hres_out, _finalH, _finalHTile, _finalSnow,
_finalFgi, _finalDC) = _jit_run(
_hd['Precipitation [mm/day]'].to_numpy(dtype=np.float64),
_hd['ET for model [mm/day]'].to_numpy(dtype=np.float64),
_T, _Tmin, _Tmax,
np.array([r.Hwater for r in self.reservoirs], dtype=np.float64),
self.snowpack.Hwater if self.has_snowpack else 0.0,
self._fgi,
self.H_deficit_carry,
np.array([r.recession_coeff for r in self.reservoirs], dtype=np.float64),
np.array([r.recession_exponent for r in self.reservoirs], dtype=np.float64),
np.array([r.recession_H_ref for r in self.reservoirs], dtype=np.float64),
np.array([r.f_to_discharge for r in self.reservoirs], dtype=np.float64),
np.array([_jmap[r.junction_type] for r in self.reservoirs], dtype=np.int64),
np.array([r.leakance_R if r.leakance_R is not None else np.inf
for r in self.reservoirs], dtype=np.float64),
np.array([r.H_threshold for r in self.reservoirs], dtype=np.float64),
np.array([r.Hmax for r in self.reservoirs], dtype=np.float64),
np.array([r.f_tile for r in self.reservoirs], dtype=np.float64),
np.array([r.tile_res.recession_coeff if r.tile_res is not None else 1.0
for r in self.reservoirs], dtype=np.float64),
np.array([r.tile_res.Hwater if r.tile_res is not None else 0.0
for r in self.reservoirs], dtype=np.float64),
np.array([r.multipath_threshold if r.has_multipath else 0.0
for r in self.reservoirs], dtype=np.float64),
np.array([r.multipath_timescale if r.has_multipath else 0.0
for r in self.reservoirs], dtype=np.float64),
self.melt_factor if self.has_snowpack else 1.0,
self.snow_insulation_k,
self.fgi_decay_coeff,
self.fdd_threshold,
self.direct_runoff_fraction,
self.wp_soil,
self.wp_soil_sigma,
self.et_alpha,
self.dt,
self.has_snowpack,
self.use_frozen_ground,
self.use_rain_on_snow,
self.use_et_reservoir_draw,
self.use_direct_runoff,
self._has_trange,
)
self.hydrodata.loc[_idx, 'Specific Discharge (modeled) [mm/day]'] = _Q
if self.has_snowpack:
self.hydrodata.loc[_idx, 'Snowpack (modeled) [mm SWE]'] = _SWE
self.hydrodata.loc[_idx, 'Subsurface storage (modeled total) [mm]'] = _Hsub
if store_depths:
for _i in range(len(self.reservoirs)):
self.hydrodata.loc[_idx, f'H_reservoir_{_i} (modeled) [mm]'] = _Hres_out[:, _i]
for _i, _h in enumerate(_finalH):
self.reservoirs[_i].Hwater = float(_h)
for _i, _r in enumerate(self.reservoirs):
if _r.tile_res is not None:
_r.tile_res.Hwater = float(_finalHTile[_i])
if self.has_snowpack:
self.snowpack.Hwater = float(_finalSnow)
self._fgi = float(_finalFgi)
self.H_deficit_carry = float(_finalDC)
elif _run_idx is not None:
for i in _run_idx:
self.update(time_step=i)
else:
for _ in self.hydrodata.index:
self.update()
[docs]
def finalize(self):
"""
Report model skill and display output plots.
Calls compute_NSE(verbose=True) to print the Nash–Sutcliffe Efficiency
to stdout, then calls plot() to display a time-series comparison of
observed and modeled specific discharge. Part of the CSDMS Basic Model
Interface.
"""
# Goodness of fit
# Add options to print and/or save values later
self.compute_NSE(verbose=True)
# Plot
# Add flag for plotting (or not) later
self.plot()
[docs]
def plot(self):
"""
Display a time-series comparison of precipitation and specific discharge.
Produces a dual-axis figure: precipitation as a bar chart on the left
axis and both observed and modeled specific discharge as line plots on
the right axis.
"""
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%y-%m-%d'))
plt.xlabel('Date', fontsize=14)
plt.xticks(rotation=45, horizontalalignment='right')
plt.ylabel('Precipitation [mm/day]', fontsize=14, color='C0')
plt.bar(self.hydrodata['Date'].values,
height=self.hydrodata['Precipitation [mm/day]'].values/self.dt,
width=1., align='center', label='Precipitation [mm/day]',
linewidth=0, color='C0', alpha=0.5) # C0 is the default bar-plot color
plt.twinx()
plt.plot(self.hydrodata['Date'].values,
self.hydrodata['Specific Discharge [mm/day]'].values,
'royalblue', label='Data', linewidth=2, alpha=0.8)
plt.plot(self.hydrodata['Date'].values,
self.hydrodata['Specific Discharge (modeled) [mm/day]'].values,
'k', label='Model', linewidth=2, alpha=0.8)
plt.ylim(0, plt.ylim()[-1])
plt.legend(title='Specific Discharge', fontsize=11,
title_fontsize=11, labelcolor='linecolor')
plt.ylabel('Specific Discharge [mm/day]', fontsize=14, color='0.3')
plt.tight_layout()
plt.show()
[docs]
def check_mass_balance(self, time_step=None):
"""
Compute the mass-balance discrepancy at a given time step.
Compares cumulative inputs (P - ET) from the start of the record
through time_step with cumulative outputs (discharge) plus current
storage (snowpack + subsurface reservoirs) and any carried-over
deficit. Returns the excess mass still in the model; a value near
zero indicates good mass conservation.
Parameters
----------
time_step : int, optional
Row index in self.hydrodata at which to evaluate the balance.
Defaults to the last row.
Returns
-------
excess_mass_in_model : float
Excess water remaining in the model budget (mm). Should be ~0
for a mass-conserving run.
"""
if time_step is None:
time_step = self.hydrodata.index[-1]
# Additions equals discharge out; set up this way, and can check.
total_additions = \
self.hydrodata['Precipitation [mm/day]'][:time_step+1].sum() \
- self.hydrodata['ET for model [mm/day]'][:time_step+1].sum()
# Storage reservoirs; snowpack is 0 when not simulated
snow_storage = (self.hydrodata['Snowpack (modeled) [mm SWE]'][time_step]
if self.has_snowpack else 0.)
subsurface_storage = self.hydrodata['Subsurface storage (modeled total) [mm]'][time_step]
# Mass removal
outlet_discharge = self.hydrodata[
'Specific Discharge (modeled) [mm/day]'][:time_step+1].sum()
# Unpaid water deficit carried forward from the last timestep
deficit = self.H_deficit_carry
# Initial storage at the start of the last run() call (not at initialize()
# time, since spin-up changes storage before the scored run begins).
initial_storage = getattr(self, '_run_initial_storage', 0.0)
# Discrepancy: inputs = outputs + ΔS, so excess ≈ 0 when mass is conserved.
excess_mass_in_model = (outlet_discharge + subsurface_storage
+ snow_storage - total_additions + deficit
- initial_storage)
return excess_mass_in_model
[docs]
def compute_NSE(self, return_nse=True, verbose=False):
"""
Compute the Nash–Sutcliffe Efficiency of the discharge simulation.
Compares modeled and observed specific discharge for all time steps
where both values are non-missing. Stores the result as self.NSE.
Parameters
----------
return_nse : bool, optional
If True (default), return the NSE value.
verbose : bool, optional
If True, print the NSE value to stdout.
Returns
-------
NSE : float or None
Nash–Sutcliffe Efficiency coefficient. Returns None if
return_nse is False. A value of 1 indicates perfect agreement;
values below 0 indicate the model performs worse than the
observed-mean predictor.
"""
q_data = self.hydrodata['Specific Discharge [mm/day]']
q_model = self.hydrodata['Specific Discharge (modeled) [mm/day]']
# Calculate NSE
_realvalue = ~q_model.isna() & ~q_data.isna()
NSE_num = np.sum((q_model[_realvalue] - q_data[_realvalue])**2)
NSE_denom = np.sum((q_data[_realvalue] -
np.mean(q_data[_realvalue]))**2)
if verbose and np.sum(~_realvalue):
print("Excluded", np.sum(~_realvalue), "no-data points from NSE calculation")
self.NSE = 1 - NSE_num / NSE_denom
if verbose:
print("NSE:", self.NSE)
if return_nse:
return self.NSE
def main():
"""Command-line entry point: parse ``-y``/``--configfile`` and run the model."""
parser = argparse.ArgumentParser(
description='Pass the configuration file path to run MNiShed.')
parser.add_argument('-y', '--configfile', type=str,
help='YAML file from which all inputs are read.')
# Parse args if anything is passed.
# If nothing is passed, then print help and exit.
args = parser.parse_args(args=None if sys.argv[1:] else ['--help'])
b = Buckets()
try:
b.initialize(args.configfile)
except (FileNotFoundError, yaml.YAMLError) as e:
# initialize() raises on a bad config; at the CLI boundary, translate
# that into a clean stderr message and exit code 2 (no traceback).
print(f"\n{e}\n", file=sys.stderr)
sys.exit(2)
b.run()
b.finalize()
if __name__ == "__main__":
main()