# SPDX-License-Identifier: AGPL-3.0-only
# Copyright (C) 2026 SWGY, Inc.
"""Endogenous feedback signals (network congestion, peer-departure share).
Both signals at time ``t`` are computed from the lagged state vector at ``t-1``.
Lagging keeps the model strictly IO-HMM-compatible: a household's transition
at step ``t`` depends only on inputs known *before* ``t``.
"""
from __future__ import annotations
import numpy as np
from iohmm_evac.types import IntArray, State
__all__ = ["congestion", "peer_share"]
def congestion(prev_state: IntArray, n_cap: int) -> float:
"""Fraction of households en-route at ``t-1``, capped at 1.
Expects integer state codes (matching :class:`State` values).
"""
if n_cap <= 0:
msg = "n_cap must be a positive integer"
raise ValueError(msg)
en_route = int(np.count_nonzero(prev_state == State.ER))
return float(min(en_route / n_cap, 1.0))
def peer_share(prev_state: IntArray, evac_path: IntArray) -> float:
"""Share of households visibly evacuating at ``t-1``.
Visible evacuators are those currently en-route plus those already
sheltered *away* from home. ``evac_path`` is encoded as 0=NONE, 1=AWAY,
2=HOME (matching :func:`encode_evac_path`).
"""
n = prev_state.shape[0]
if n == 0:
return 0.0
en_route = prev_state == State.ER
shelter_away = (prev_state == State.SH) & (evac_path == 1)
visible = int(np.count_nonzero(en_route | shelter_away))
return float(visible / n)