Coverage for o2/simulation_runner.py: 95%
55 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import traceback
2from concurrent.futures import Future, ProcessPoolExecutor, as_completed
3from dataclasses import dataclass
4from datetime import datetime
5from typing import TYPE_CHECKING, Optional, TypeAlias
7from prosimos.simulation_engine import run_simpy_simulation
8from prosimos.simulation_stats_calculator import (
9 KPIMap,
10 LogInfo,
11 ResourceKPI,
12)
14from o2.models.settings import Settings
15from o2.util.indented_printer import print_l3
16from o2.util.logger import info
18TaskKPIs: TypeAlias = dict[str, KPIMap]
19ResourceKPIs: TypeAlias = dict[str, ResourceKPI]
20SimulationKPIs: TypeAlias = tuple[KPIMap, TaskKPIs, ResourceKPIs, datetime, datetime]
22RunSimulationResult: TypeAlias = tuple[KPIMap, TaskKPIs, ResourceKPIs, LogInfo]
25if TYPE_CHECKING:
26 from o2.models.state import State
29@dataclass(frozen=True)
30class MedianResult:
31 """A result of a simulation run, to be used for median calculation."""
33 total_cycle_time: float
34 result: RunSimulationResult
37class SimulationRunner:
38 """Helper class to run simulations and return the results."""
40 _executor: Optional[ProcessPoolExecutor] = None
41 """Process Pool Executor that can be used to evaluate the median calculation in parallel.
43 Makes sense esp. when not using parallel evaluation of actions, so at least
44 the median calculation is sped up. Set Settings.MAX_THREADS_MEDIAN_CALCULATION
45 to a value > 1 to enable this.
46 """
48 @staticmethod
49 def run_simulation(state: "State") -> RunSimulationResult:
50 """Run simulation and return the results."""
51 try:
52 setup = state.to_sim_diff_setup()
53 result = run_simpy_simulation(setup, None, None)
54 assert result is not None
55 assert isinstance(result, tuple)
57 simulation_kpis: SimulationKPIs = result[0] # type: ignore
58 log_info = result[1]
59 (
60 global_kpis,
61 task_kpis,
62 resource_kpis,
63 _,
64 _,
65 ) = simulation_kpis
67 return global_kpis, task_kpis, resource_kpis, log_info
69 except Exception as e:
70 print_l3(f"Error in simulation: {e}")
71 if Settings.SHOW_SIMULATION_ERRORS:
72 info(traceback.format_exc())
73 raise e
75 @staticmethod
76 def run_simulation_median(state: "State") -> RunSimulationResult:
77 """Run multiple simulations and return the median simulation's results."""
78 results: list[MedianResult] = []
79 if not Settings.DISABLE_PARALLEL_EVALUATION and Settings.MAX_THREADS_MEDIAN_CALCULATION > 1:
80 if SimulationRunner._executor is None:
81 info(
82 f"Starting ProcessPoolExecutor with "
83 f"{Settings.MAX_THREADS_MEDIAN_CALCULATION} workers for median calculation"
84 )
85 SimulationRunner._executor = ProcessPoolExecutor(
86 max_workers=Settings.MAX_THREADS_MEDIAN_CALCULATION
87 )
88 futures: list[Future[RunSimulationResult]] = []
89 for _ in range(Settings.NUMBER_OF_SIMULATION_FOR_MEDIAN):
90 futures.append(SimulationRunner._executor.submit(SimulationRunner.run_simulation, state))
92 results: list[MedianResult] = []
93 for future in as_completed(futures):
94 result = future.result()
95 _, _, _, log_info = result
97 first_enablement = min(
98 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list],
99 default=log_info.started_at,
100 )
101 last_completion = max(
102 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list],
103 default=log_info.ended_at,
104 )
105 total_cycle_time = (last_completion - first_enablement).total_seconds()
106 results.append(MedianResult(total_cycle_time, result))
107 else:
108 for _ in range(Settings.NUMBER_OF_SIMULATION_FOR_MEDIAN):
109 result = SimulationRunner.run_simulation(state)
110 _, _, _, log_info = result
112 first_enablement = min(
113 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list],
114 default=log_info.started_at,
115 )
116 last_completion = max(
117 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list],
118 default=log_info.ended_at,
119 )
120 total_cycle_time = (last_completion - first_enablement).total_seconds()
121 results.append(MedianResult(total_cycle_time, result))
123 results.sort(key=lambda x: x.total_cycle_time)
124 median_result = results[len(results) // 2]
125 return median_result.result
127 @staticmethod
128 def close_executor() -> None:
129 """Close the executor."""
130 if SimulationRunner._executor is not None:
131 SimulationRunner._executor.shutdown(wait=True)
132 SimulationRunner._executor = None