Source code for mnished.bmi

"""
mnished.bmi
~~~~~~~~~~~~~~~
CSDMS Basic Model Interface (BMI) wrapper for MNiShed.

Wraps the :class:`~mnished.Buckets` class so that MNiShed can be
driven by any CSDMS-compliant framework or coupled online with other BMI
models (e.g. a channel-routing or sediment-transport model receiving daily
streamflow from a gauged watershed).

References
----------
Peckham, S.D., Hutton, E.W.H., and Norris, B. (2013). A component-based
approach to integrated modeling in the geosciences: The design of CSDMS.
Computers & Geosciences, 53, 3-12. https://doi.org/10.1016/j.cageo.2012.04.002
"""

import numpy as np
import pandas as pd

try:
    from bmipy import Bmi
except ImportError as exc:
    raise ImportError(
        "bmipy is required for the BMI wrapper. "
        "Install it with:  pip install 'MNiShed[bmi]'"
    ) from exc

from .mnished import Buckets

# ---------------------------------------------------------------------------
# CSDMS Standard Names and variable metadata
# ---------------------------------------------------------------------------

_INPUT_VAR_NAMES = (
    "atmosphere_water__liquid_equivalent_precipitation_rate",
    "atmosphere_bottom_air__temperature",
    "atmosphere_bottom_air__time_min_of_temperature",
    "atmosphere_bottom_air__time_max_of_temperature",
    "land_surface_water__uncorrected_evapotranspiration_volume_flux",
)

_OUTPUT_VAR_NAMES = (
    "land_surface_water__runoff_volume_flux",
    "channel_exit_water_x-section__volume_flow_rate",
    "snowpack__liquid_equivalent_depth",
    "subsurface_water__depth",
    "land_surface_water__evapotranspiration_volume_flux",
    "land_surface_water__direct_runoff_volume_flux",
    "land_surface_water__baseflow_volume_flux",
    "land_surface_water__tile_drain_volume_flux",
    "land_surface_water__multipath_drain_volume_flux",
    "land_surface__frozen_ground_index",
    "subsurface_water_reservoir_0__depth",
    "subsurface_water_reservoir_1__depth",
    "subsurface_water_reservoir_2__depth",
    "subsurface_water_reservoir_3__depth",
    "subsurface_water_reservoir_4__depth",
    "subsurface_water_reservoir_5__depth",
    "subsurface_water_reservoir_6__depth",
    "subsurface_water_reservoir_7__depth",
    "subsurface_water_reservoir_8__depth",
    "subsurface_water_reservoir_9__depth",
)

# Hard cap: reservoirs 0–9 are declared as BMI output variables.
# To support more than 10 reservoirs, add names to _OUTPUT_VAR_NAMES,
# entries to _VAR_UNITS and _RESERVOIR_DEPTH_NAMES, and raise this constant.
_BMI_MAX_RESERVOIRS = 10

_ALL_VAR_NAMES = frozenset(_INPUT_VAR_NAMES + _OUTPUT_VAR_NAMES)

_VAR_UNITS = {
    "atmosphere_water__liquid_equivalent_precipitation_rate": "mm d-1",
    "atmosphere_bottom_air__temperature":                    "degC",
    "atmosphere_bottom_air__time_min_of_temperature":        "degC",
    "atmosphere_bottom_air__time_max_of_temperature":        "degC",
    "land_surface_water__uncorrected_evapotranspiration_volume_flux": "mm d-1",
    "land_surface_water__runoff_volume_flux":                "mm d-1",
    "channel_exit_water_x-section__volume_flow_rate":        "m3 s-1",
    "snowpack__liquid_equivalent_depth":                     "mm",
    "subsurface_water__depth":                               "mm",
    "land_surface_water__evapotranspiration_volume_flux":    "mm d-1",
    "land_surface_water__direct_runoff_volume_flux":         "mm d-1",
    "land_surface_water__baseflow_volume_flux":              "mm d-1",
    "land_surface_water__tile_drain_volume_flux":            "mm d-1",
    "land_surface_water__multipath_drain_volume_flux":       "mm d-1",
    "land_surface__frozen_ground_index":                     "degC d",
    "subsurface_water_reservoir_0__depth":                   "mm",
    "subsurface_water_reservoir_1__depth":                   "mm",
    "subsurface_water_reservoir_2__depth":                   "mm",
    "subsurface_water_reservoir_3__depth":                   "mm",
    "subsurface_water_reservoir_4__depth":                   "mm",
    "subsurface_water_reservoir_5__depth":                   "mm",
    "subsurface_water_reservoir_6__depth":                   "mm",
    "subsurface_water_reservoir_7__depth":                   "mm",
    "subsurface_water_reservoir_8__depth":                   "mm",
    "subsurface_water_reservoir_9__depth":                   "mm",
}

# MNiShed DataFrame column name for each input variable
_INPUT_COLUMNS = {
    "atmosphere_water__liquid_equivalent_precipitation_rate": "Precipitation [mm/day]",
    "atmosphere_bottom_air__temperature":             "Mean Temperature [C]",
    "atmosphere_bottom_air__time_min_of_temperature": "Minimum Temperature [C]",
    "atmosphere_bottom_air__time_max_of_temperature": "Maximum Temperature [C]",
    "land_surface_water__uncorrected_evapotranspiration_volume_flux":
        "Evapotranspiration [mm/day]",
}

# Ordered reservoir depth variable names (index = reservoir index in model)
_RESERVOIR_DEPTH_NAMES = (
    "subsurface_water_reservoir_0__depth",
    "subsurface_water_reservoir_1__depth",
    "subsurface_water_reservoir_2__depth",
    "subsurface_water_reservoir_3__depth",
    "subsurface_water_reservoir_4__depth",
    "subsurface_water_reservoir_5__depth",
    "subsurface_water_reservoir_6__depth",
    "subsurface_water_reservoir_7__depth",
    "subsurface_water_reservoir_8__depth",
    "subsurface_water_reservoir_9__depth",
)


def _check_var(name: str) -> None:
    if name not in _ALL_VAR_NAMES:
        raise KeyError(
            f"Unknown variable {name!r}. "
            f"Available: {sorted(_ALL_VAR_NAMES)}"
        )


# ---------------------------------------------------------------------------
# BMI class
# ---------------------------------------------------------------------------

[docs] class BmiMNiShed(Bmi): """ CSDMS Basic Model Interface wrapper for MNiShed. Wraps a :class:`~mnished.Buckets` instance so that MNiShed can participate in a CSDMS-compliant coupling framework. Two usage modes are supported: **File-driven** (standard MNiShed workflow) — the YAML config points to a CSV containing all forcing data; the framework steps through the record by calling :meth:`update` repeatedly:: from mnished import BmiMNiShed import numpy as np bmi = BmiMNiShed() bmi.initialize("config.yml") while bmi.get_current_time() < bmi.get_end_time(): bmi.update() bmi.finalize() **Online coupled** — an upstream model provides forcing each step via :meth:`set_value` before calling :meth:`update`:: bmi.initialize("config.yml") bmi.set_value( "atmosphere_water__liquid_equivalent_precipitation_rate", np.array([5.2]) ) bmi.set_value("atmosphere_bottom_air__temperature", np.array([3.1])) bmi.update() q = np.empty(1, dtype=np.float64) bmi.get_value("land_surface_water__runoff_volume_flux", q) print(f"Discharge: {q[0]:.3f} mm d-1") Notes ----- **Specific discharge**: ``land_surface_water__runoff_volume_flux`` is area-normalised specific discharge in mm d⁻¹, not volumetric flux. To convert to volumetric discharge Q [m³ s⁻¹]: .. code-block:: python Q_m3s = q_mm_d * area_km2 * 1e3 / 86400 where ``area_km2 = bmi._model.drainage_basin_area__km2``. **Difference from run_and_score discharge**: the streaming BMI reports the raw per-step reservoir-cascade discharge. It does *not* apply the two output-layer post-processing steps that :func:`mnished.calibration.run_and_score` performs on the full series: Nash-cascade flow routing (``routing_K``), which is an inherently batch convolution unavailable to a per-step interface, and the constant regional baseflow term (``baseflow_Q``). Baseflow is instead exposed as its own output (``land_surface_water__baseflow_volume_flux``) so a coupler can add it explicitly. A configuration calibrated with routing and/or baseflow will therefore not reproduce its scored hydrograph through the BMI unless the coupler reapplies those terms. **get_value_ptr**: MNiShed stores scalar state as Python floats, not numpy arrays. :meth:`get_value_ptr` therefore returns a fresh length-1 array rather than a live pointer into model memory. Values in the returned array do not update when the model advances; call :meth:`get_value` after each :meth:`update` call to retrieve current values. **Per-reservoir depths**: ``subsurface_water_reservoir_0__depth`` through ``subsurface_water_reservoir_9__depth`` correspond to reservoirs 0–9 (shallowest to deepest) in the configuration. Depths for reservoir indices that do not exist in the current configuration are returned as ``np.nan``. **Optional input variables**: the five input Standard Names are always declared. Temperature and ET inputs will raise :exc:`KeyError` from :meth:`set_value` if the corresponding column is absent from the loaded time series. **Evapotranspiration, input vs. output**: the input ``…__uncorrected_evapotranspiration_volume_flux`` is the ET forcing as supplied (Thornthwaite–Chang, or a user series in ``datafile`` mode), *before* water-balance correction. The output ``…__evapotranspiration_volume_flux`` is that forcing *after* a bulk multiplier — one constant, or one per water year — closes P − Q − ET over the record: the post-correction ET *target*, which equals the ET actually removed except under ``et_reservoir_draw`` / ``et_water_stress``, where storage availability reduces it further. The multiplier is a coarse stand-in for moisture limitation, not a physical potential-to-actual conversion; input and output coincide only when ``enforce_water_balance='none'``. ``uncorrected`` is an MNiShed-specific name (CSDMS has no pre-/post-correction modifier). """ def __init__(self) -> None: self._model: "Buckets | None" = None self._current_time: float = 0.0 self._end_time: float = 0.0 # ----------------------------------------------------------------------- # Lifecycle # -----------------------------------------------------------------------
[docs] def initialize(self, config_file: str) -> None: """ Load a MNiShed YAML configuration file and prepare the model. Reads the configuration, loads the input CSV time series, builds the reservoir stack, and runs spin-up cycles. After this call, :meth:`update` steps through the record one day at a time. Parameters ---------- config_file : str Path to a MNiShed YAML configuration file. """ self._model = Buckets() self._model.initialize(config_file) n = len(self._model.reservoirs) if n > _BMI_MAX_RESERVOIRS: raise ValueError( f"This model configuration has {n} reservoirs, but the BMI " f"wrapper declares outputs for at most {_BMI_MAX_RESERVOIRS} " f"(subsurface_water_reservoir_0__depth through " f"subsurface_water_reservoir_{_BMI_MAX_RESERVOIRS - 1}__depth). " f"To support more reservoirs, add names to _OUTPUT_VAR_NAMES, " f"_VAR_UNITS, and _RESERVOIR_DEPTH_NAMES in mnished/bmi.py " f"and raise _BMI_MAX_RESERVOIRS." ) # Buckets.initialize() ends with _timestep_i at end-of-record if # spin_up_cycles > 0 (spin-up is run via run(), which exhausts the # index). Reset to the start so BMI update() steps from row 0. self._model._timestep_i = self._model.hydrodata.index[0] self._current_time = 0.0 self._end_time = float(len(self._model.hydrodata))
[docs] def update(self) -> None: """ Advance the model by one day. If :meth:`set_value` was called for any input variable before this step, those values override the CSV values for the current row. """ self._model.update() self._current_time += self._model.dt
[docs] def update_until(self, time: float) -> None: """ Advance the model until ``get_current_time() >= time``. Parameters ---------- time : float Target time [days since start of record]. """ while self._current_time < time: self.update()
[docs] def finalize(self) -> None: """ Release internal resources. Discards the :class:`~mnished.Buckets` instance. Does not call :meth:`~mnished.Buckets.finalize` on the inner model (which would trigger an NSE print and a plot pop-up). """ self._model = None
# ----------------------------------------------------------------------- # Component information # -----------------------------------------------------------------------
[docs] def get_component_name(self) -> str: return "MNiShed"
[docs] def get_input_item_count(self) -> int: return len(_INPUT_VAR_NAMES)
[docs] def get_output_item_count(self) -> int: return len(_OUTPUT_VAR_NAMES)
[docs] def get_input_var_names(self): return _INPUT_VAR_NAMES
[docs] def get_output_var_names(self): return _OUTPUT_VAR_NAMES
# ----------------------------------------------------------------------- # Variable information # -----------------------------------------------------------------------
[docs] def get_var_grid(self, name: str) -> int: _check_var(name) return 0
[docs] def get_var_type(self, name: str) -> str: _check_var(name) return "float64"
[docs] def get_var_units(self, name: str) -> str: _check_var(name) return _VAR_UNITS[name]
[docs] def get_var_itemsize(self, name: str) -> int: _check_var(name) return 8 # float64 = 8 bytes
[docs] def get_var_nbytes(self, name: str) -> int: _check_var(name) return 8 # scalar: 1 element × 8 bytes
[docs] def get_var_location(self, name: str) -> str: _check_var(name) return "node"
# ----------------------------------------------------------------------- # Time # -----------------------------------------------------------------------
[docs] def get_start_time(self) -> float: return 0.0
[docs] def get_end_time(self) -> float: return self._end_time
[docs] def get_current_time(self) -> float: return self._current_time
[docs] def get_time_step(self) -> float: return 1.0
[docs] def get_time_units(self) -> str: return "d"
# ----------------------------------------------------------------------- # Grid — scalar (rank 0, size 1, grid_id 0) # -----------------------------------------------------------------------
[docs] def get_grid_rank(self, grid_id: int) -> int: if grid_id != 0: raise ValueError(f"Unknown grid_id {grid_id!r}; only grid 0 exists.") return 0
[docs] def get_grid_size(self, grid_id: int) -> int: if grid_id != 0: raise ValueError(f"Unknown grid_id {grid_id!r}; only grid 0 exists.") return 1
[docs] def get_grid_type(self, grid_id: int) -> str: if grid_id != 0: raise ValueError(f"Unknown grid_id {grid_id!r}; only grid 0 exists.") return "scalar"
# Rank-0 grids carry no shape, spacing, origin, coordinate arrays, # or connectivity — raise NotImplementedError for all such methods. def get_grid_shape(self, grid_id: int, shape: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no shape array.") def get_grid_spacing(self, grid_id: int, spacing: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no spacing array.") def get_grid_origin(self, grid_id: int, origin: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no origin array.") def get_grid_x(self, grid_id: int, x: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no coordinate arrays.") def get_grid_y(self, grid_id: int, y: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no coordinate arrays.") def get_grid_z(self, grid_id: int, z: np.ndarray) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no coordinate arrays.") def get_grid_node_count(self, grid_id: int) -> int: raise NotImplementedError("Scalar grid (rank 0) has no node count.") def get_grid_edge_count(self, grid_id: int) -> int: raise NotImplementedError("Scalar grid (rank 0) has no edge count.") def get_grid_face_count(self, grid_id: int) -> int: raise NotImplementedError("Scalar grid (rank 0) has no face count.") def get_grid_edge_nodes( self, grid_id: int, edge_nodes: np.ndarray ) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no connectivity.") def get_grid_face_nodes( self, grid_id: int, face_nodes: np.ndarray ) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no connectivity.") def get_grid_face_edges( self, grid_id: int, face_edges: np.ndarray ) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no connectivity.") def get_grid_nodes_per_face( self, grid_id: int, nodes_per_face: np.ndarray ) -> np.ndarray: raise NotImplementedError("Scalar grid (rank 0) has no connectivity.") # ----------------------------------------------------------------------- # Internal helpers # ----------------------------------------------------------------------- def _scalar_output(self, name: str) -> float: """Return the scalar value of an output variable after the last step.""" m = self._model idx = m._timestep_i - 1 first = m.hydrodata.index[0] if name == "land_surface_water__runoff_volume_flux": if idx < first: return np.nan val = m.hydrodata.at[idx, "Specific Discharge (modeled) [mm/day]"] return float(val) if not pd.isna(val) else np.nan if name == "channel_exit_water_x-section__volume_flow_rate": if idx < first: return np.nan val = m.hydrodata.at[idx, "Specific Discharge (modeled) [mm/day]"] if pd.isna(val): return np.nan # mm→m (×1e-3) × km²→m² (×1e6) / day→s (×86400) = ×1e3/86400 return float(val) * m.drainage_basin_area__km2 * 1e3 / 86400.0 if name == "snowpack__liquid_equivalent_depth": return float(m.snowpack.Hwater) if m.has_snowpack else 0.0 if name == "subsurface_water__depth": if idx < first: return np.nan val = m.hydrodata.at[idx, "Subsurface storage (modeled total) [mm]"] return float(val) if not pd.isna(val) else np.nan if name == "land_surface_water__evapotranspiration_volume_flux": # Model evapotranspiration flux after water-balance scaling # (storage-stress reduction, where enabled, applies separately). if idx < first: return np.nan val = m.hydrodata.at[idx, "ET for model [mm/day]"] return float(val) if not pd.isna(val) else np.nan # Flux-partition components of the total discharge, recorded by the # most recent update(). baseflow is the constant regional-import term # (mm/day); it is not part of the reservoir cascade, so the BMI keeps # it separate and does NOT add it to the land_surface_water__runoff_ # volume_flux total (a coupler adds it explicitly). Note this differs # from run_and_score, which DOES fold baseflow_Q (and Nash routing) # into its scored discharge as an output-layer post-process; see the # class docstring's "Difference from run_and_score discharge". if name == "land_surface_water__direct_runoff_volume_flux": return np.nan if idx < first else float(m._flux_direct_runoff) if name == "land_surface_water__baseflow_volume_flux": return np.nan if idx < first else float(m.baseflow_Q) if name == "land_surface_water__tile_drain_volume_flux": return np.nan if idx < first else float(m._flux_tile) if name == "land_surface_water__multipath_drain_volume_flux": return np.nan if idx < first else float(m._flux_multipath) if name == "land_surface__frozen_ground_index": return np.nan if idx < first else float(m._fgi) for i, rname in enumerate(_RESERVOIR_DEPTH_NAMES): if name == rname: if i < len(m.reservoirs): return float(m.reservoirs[i].Hwater) return np.nan raise KeyError(f"Unknown output variable: {name!r}") # ----------------------------------------------------------------------- # Getters # -----------------------------------------------------------------------
[docs] def get_value(self, name: str, dest: np.ndarray) -> np.ndarray: """ Copy the current scalar value of *name* into *dest* and return it. For input variables, returns the value at the pending row (the value that will be consumed by the next :meth:`update` call). For output variables, returns the value written by the most recent :meth:`update` call. Parameters ---------- name : str CSDMS Standard Name. dest : numpy.ndarray Pre-allocated length-1 array of dtype float64. Returns ------- numpy.ndarray *dest*, filled in place. """ _check_var(name) if name in _INPUT_VAR_NAMES: col = _INPUT_COLUMNS[name] idx = self._model._timestep_i if (col in self._model.hydrodata.columns and idx in self._model.hydrodata.index): val = self._model.hydrodata.at[idx, col] dest[0] = float(val) if not pd.isna(val) else np.nan else: dest[0] = np.nan else: dest[0] = self._scalar_output(name) return dest
[docs] def get_value_ptr(self, name: str) -> np.ndarray: """ Return a length-1 float64 array containing the current value. Returns a snapshot, not a live pointer — see class docstring. """ _check_var(name) dest = np.empty(1, dtype=np.float64) return self.get_value(name, dest)
[docs] def get_value_at_indices( self, name: str, dest: np.ndarray, inds: np.ndarray ) -> np.ndarray: """Get value at specific indices (scalar: only index 0 is valid).""" if np.any(np.asarray(inds) != 0): raise IndexError("Scalar variable has only index 0.") return self.get_value(name, dest)
# ----------------------------------------------------------------------- # Setters # -----------------------------------------------------------------------
[docs] def set_value(self, name: str, src: np.ndarray) -> None: """ Override an input variable for the current (next) timestep. Writes ``src[0]`` into the hydrodata DataFrame at the pending row (``_timestep_i``), replacing the value read from the CSV. The written value is consumed by the next :meth:`update` call. This is the online-coupling path; in file-driven mode this method need not be called. Parameters ---------- name : str CSDMS Standard Name of an input variable. src : numpy.ndarray Length-1 float64 array containing the new value. Raises ------ KeyError If *name* is not a recognised input variable, or if the corresponding DataFrame column does not exist in the loaded time series. """ if name not in _INPUT_VAR_NAMES: raise KeyError( f"{name!r} is not a settable input variable. " f"Settable inputs: {_INPUT_VAR_NAMES}" ) col = _INPUT_COLUMNS[name] if col not in self._model.hydrodata.columns: raise KeyError( f"Column {col!r} is not present in the loaded time series. " f"Add it to the CSV or do not call set_value for {name!r}." ) self._model.hydrodata.at[self._model._timestep_i, col] = float(src[0])
[docs] def set_value_at_indices( self, name: str, inds: np.ndarray, src: np.ndarray ) -> None: """Set value at specific indices (scalar: only index 0 is valid).""" if np.any(np.asarray(inds) != 0): raise IndexError("Scalar variable has only index 0.") self.set_value(name, src)