Source code for chiller_sim.simulation.simulator

from __future__ import annotations

from collections.abc import Generator
from typing import TYPE_CHECKING

import numpy as np
from numpy.typing import NDArray

from chiller_sim.layout.grid import ChillerLayout
from chiller_sim.layout.wind import WindConditions, WindFn
from chiller_sim.physics.ambient_temp import AmbientTempFn
from chiller_sim.physics.cop import CopFn
from chiller_sim.physics.degradation import DegradationFn
from chiller_sim.physics.gaussian_plume import GaussianPlumeModel
from chiller_sim.physics.load import LoadFn
from chiller_sim.physics.ramp import RampFn
from chiller_sim.simulation.results import InitialState, OptimizeResult, SimulationResult

if TYPE_CHECKING:
    from chiller_sim.simulation.builder import SimulatorBuilder


[docs] class Simulator: """Chiller array simulator that optimises active-chiller selection each time step."""
[docs] def __init__( self, builder: SimulatorBuilder, layout: ChillerLayout, initial_wind: WindConditions, model: GaussianPlumeModel, load_fn: LoadFn, cop_fn: CopFn, degradation_fn: DegradationFn, ramp_fn: RampFn, wind_fn: WindFn | None, ambient_temp_k: float | None, ambient_temp_fn: AmbientTempFn | None, min_savings_kw: float, ) -> None: """Initialise the simulator and precompute the interaction matrix.""" self._builder = builder self._layout = layout self._model = model self._load_fn = load_fn self._cop_fn = cop_fn self._degradation_fn = degradation_fn self._ramp_fn = ramp_fn self._wind_fn = wind_fn self._ambient_temp_k = ambient_temp_k self._ambient_temp_fn = ambient_temp_fn self._min_savings_kw = min_savings_kw # Precompute interaction matrix for initial wind self._current_wind = initial_wind self._interaction_matrix = model.compute_interaction_matrix( layout.positions_m, initial_wind ) # Optimizer state — reset at start of each stream()/simulate() call self._is_first_call = True self._active_mask: NDArray[np.bool_] = np.zeros(layout.num_chillers, dtype=bool) self._time_since_start: NDArray[np.float64] = np.zeros(layout.num_chillers) self._last_optimize_time_hours: float | None = None
# Builder re-entry: allow sim.with_wind(...).build()
[docs] def with_wind(self, speed_m_per_s: float, angle_deg: float) -> SimulatorBuilder: """Delegate to builder for re-configuration with new static wind.""" return self._builder.with_wind(speed_m_per_s=speed_m_per_s, angle_deg=angle_deg)
[docs] def with_wind_fn(self, fn: WindFn) -> SimulatorBuilder: """Delegate to builder for re-configuration with a time-varying wind plugin.""" return self._builder.with_wind_fn(fn)
[docs] def with_ambient_temp(self, temp_k: float) -> SimulatorBuilder: """Delegate to builder for re-configuration with a new constant ambient temperature.""" return self._builder.with_ambient_temp(temp_k=temp_k)
[docs] def with_ambient_temp_fn(self, fn: AmbientTempFn) -> SimulatorBuilder: """Delegate to builder for re-configuration with a time-varying ambient temperature.""" return self._builder.with_ambient_temp_fn(fn)
def _get_ambient_temp(self, time_hours: float) -> float: if self._ambient_temp_fn is not None: return self._ambient_temp_fn(time_hours) return self._ambient_temp_k # type: ignore[return-value] def _get_wind(self, time_hours: float) -> WindConditions: if self._wind_fn is not None: speed, angle = self._wind_fn(time_hours) return WindConditions(speed_m_per_s=speed, angle_deg=angle) return self._current_wind def _update_wind_if_changed(self, time_hours: float) -> None: new_wind = self._get_wind(time_hours) if new_wind != self._current_wind: self._current_wind = new_wind self._interaction_matrix = self._model.compute_interaction_matrix( self._layout.positions_m, new_wind ) def _evaluate_work( self, active_mask: NDArray[np.bool_], time_since_start: NDArray[np.float64], load_kw: float, ambient_temp_k: float, use_steady_state_ramp: bool = False, ) -> tuple[float, NDArray[np.float64], NDArray[np.float64]]: """Return (total_work_kw, cop_array, temp_rise_array).""" n = len(active_mask) n_active = int(active_mask.sum()) if n_active == 0: return float("inf"), np.zeros(n), np.zeros(n) ramp_factors = ( np.ones(n) if use_steady_state_ramp else np.array([self._ramp_fn(t) for t in time_since_start]) ) effective_caps = np.array( [ self._layout.max_cooling_kw * self._degradation_fn(self._layout.ages_years[i]) * ramp_factors[i] for i in range(n) ] ) if load_kw > effective_caps[active_mask].sum(): return float("inf"), np.zeros(n), np.zeros(n) # Distribute load in proportion to each active chiller's effective capacity active_caps = effective_caps[active_mask] loads = load_kw * active_caps / active_caps.sum() temp_rise = active_mask.astype(np.float64) @ self._interaction_matrix cop_array = np.array( [ self._cop_fn( self._layout.base_cop, temp_rise[i], ambient_temp_k, age_years=float(self._layout.ages_years[i]), ) for i in range(n) ] ) cop_array = np.maximum(cop_array, 1e-6) cop_array[~active_mask] = 0.0 total_work = float(np.sum(loads / cop_array[active_mask])) return total_work, cop_array, temp_rise def _greedy_optimize( self, load_kw: float, ambient_temp_k: float, ) -> tuple[NDArray[np.bool_], NDArray[np.float64]]: n = self._layout.num_chillers # On the first call, start all-on at steady-state ramp throughout the # entire greedy loop — ensures custom ramp_fn values do not affect # first-call behavior, matching the spec guarantee. is_first = self._is_first_call if is_first: active_mask = np.ones(n, dtype=bool) time_since_start = np.full(n, np.inf) self._is_first_call = False else: active_mask = self._active_mask.copy() time_since_start = self._time_since_start.copy() current_work, _, _ = self._evaluate_work( active_mask, time_since_start, load_kw, ambient_temp_k, use_steady_state_ramp=is_first, ) improved = True while improved: improved = False best_work = current_work best_mask: NDArray[np.bool_] | None = None best_times: NDArray[np.float64] | None = None for i in range(n): candidate_mask = active_mask.copy() candidate_mask[i] = not candidate_mask[i] # Activation is evaluated at steady-state COP (time → ∞) so # the startup ramp does not deter adding capacity that is # beneficial at equilibrium. Deactivation uses real elapsed # times to measure the immediate saving accurately. candidate_eval_times = time_since_start.copy() if candidate_mask[i]: # activating candidate_eval_times[i] = np.inf candidate_work, _, _ = self._evaluate_work( candidate_mask, candidate_eval_times, load_kw, ambient_temp_k, use_steady_state_ramp=is_first, ) savings = current_work - candidate_work if savings >= self._min_savings_kw and candidate_work < best_work: best_work = candidate_work best_mask = candidate_mask # Real state: newly activated chillers begin their ramp at 0. best_times = time_since_start.copy() if candidate_mask[i]: # activating best_times[i] = 0.0 if best_mask is not None: active_mask = best_mask time_since_start = best_times # type: ignore[assignment] current_work = best_work improved = True return active_mask, time_since_start
[docs] def optimize( self, time_hours: float, load_kw: float | None = None, ) -> OptimizeResult: """Run one greedy optimization step and return detailed results.""" if self._last_optimize_time_hours is not None: elapsed = time_hours - self._last_optimize_time_hours if elapsed > 0: self._time_since_start[self._active_mask] += elapsed self._time_since_start[~self._active_mask] = 0.0 self._update_wind_if_changed(time_hours) ambient_temp_k = self._get_ambient_temp(time_hours) resolved_load = load_kw if load_kw is not None else self._load_fn(time_hours) # Baseline: all chillers on at steady-state ramp n = self._layout.num_chillers all_on = np.ones(n, dtype=bool) steady_times = np.full(n, np.inf) baseline_work, _, _ = self._evaluate_work( all_on, steady_times, resolved_load, ambient_temp_k, use_steady_state_ramp=True ) was_first_call = self._is_first_call prev_active = self._active_mask.copy() active_mask, time_since_start = self._greedy_optimize(resolved_load, ambient_temp_k) # Report work at steady-state capacity for chillers just activated this step # (matching the optimizer's own evaluation), so the result reflects equilibrium # performance rather than the first instant of ramp-up. eval_times = time_since_start.copy() if not was_first_call: newly_activated = active_mask & ~prev_active eval_times[newly_activated] = np.inf total_work, cop_array, temp_rise = self._evaluate_work( active_mask, eval_times, resolved_load, ambient_temp_k, use_steady_state_ramp=was_first_call, ) savings_fraction = ( (baseline_work - total_work) / baseline_work if baseline_work > 0 else 0.0 ) # Persist state for next optimize() call self._active_mask = active_mask self._time_since_start = time_since_start self._last_optimize_time_hours = time_hours return OptimizeResult( time_hours=time_hours, load_kw=resolved_load, active_mask=active_mask, total_work_kw=total_work, baseline_work_kw=baseline_work, savings_fraction=savings_fraction, cop_array=cop_array, temp_rise_array=temp_rise, )
def _reset_state(self, initial_state: InitialState | None) -> None: """Reset internal optimizer state at start of stream()/simulate().""" n = self._layout.num_chillers if initial_state is not None: self._active_mask = initial_state.active_mask.copy() self._time_since_start = initial_state.time_since_start_hours.copy() self._is_first_call = False else: self._active_mask = np.zeros(n, dtype=bool) self._time_since_start = np.zeros(n) self._is_first_call = True self._last_optimize_time_hours = None
[docs] def stream( self, duration_hours: float, time_step_hours: float = 1.0, initial_time_hours: float = 0.0, initial_state: InitialState | None = None, ) -> Generator[OptimizeResult, None, None]: """Yield one OptimizeResult per time step over the requested duration.""" self._reset_state(initial_state) t = initial_time_hours while t < initial_time_hours + duration_hours - 1e-9: result = self.optimize(time_hours=t) yield result t += time_step_hours
[docs] def simulate( self, duration_hours: float, time_step_hours: float = 1.0, initial_time_hours: float = 0.0, initial_state: InitialState | None = None, ) -> SimulationResult: """Run the full simulation and return all steps collected into a SimulationResult.""" steps = list( self.stream( duration_hours=duration_hours, time_step_hours=time_step_hours, initial_time_hours=initial_time_hours, initial_state=initial_state, ) ) return SimulationResult(steps=steps)