Coverage for o2/simulation_runner.py: 95%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import traceback 

2from concurrent.futures import Future, ProcessPoolExecutor, as_completed 

3from dataclasses import dataclass 

4from datetime import datetime 

5from typing import TYPE_CHECKING, Optional, TypeAlias 

6 

7from prosimos.simulation_engine import run_simpy_simulation 

8from prosimos.simulation_stats_calculator import ( 

9 KPIMap, 

10 LogInfo, 

11 ResourceKPI, 

12) 

13 

14from o2.models.settings import Settings 

15from o2.util.indented_printer import print_l3 

16from o2.util.logger import info 

17 

18TaskKPIs: TypeAlias = dict[str, KPIMap] 

19ResourceKPIs: TypeAlias = dict[str, ResourceKPI] 

20SimulationKPIs: TypeAlias = tuple[KPIMap, TaskKPIs, ResourceKPIs, datetime, datetime] 

21 

22RunSimulationResult: TypeAlias = tuple[KPIMap, TaskKPIs, ResourceKPIs, LogInfo] 

23 

24 

25if TYPE_CHECKING: 

26 from o2.models.state import State 

27 

28 

29@dataclass(frozen=True) 

30class MedianResult: 

31 """A result of a simulation run, to be used for median calculation.""" 

32 

33 total_cycle_time: float 

34 result: RunSimulationResult 

35 

36 

37class SimulationRunner: 

38 """Helper class to run simulations and return the results.""" 

39 

40 _executor: Optional[ProcessPoolExecutor] = None 

41 """Process Pool Executor that can be used to evaluate the median calculation in parallel. 

42 

43 Makes sense esp. when not using parallel evaluation of actions, so at least 

44 the median calculation is sped up. Set Settings.MAX_THREADS_MEDIAN_CALCULATION 

45 to a value > 1 to enable this. 

46 """ 

47 

48 @staticmethod 

49 def run_simulation(state: "State") -> RunSimulationResult: 

50 """Run simulation and return the results.""" 

51 try: 

52 setup = state.to_sim_diff_setup() 

53 result = run_simpy_simulation(setup, None, None) 

54 assert result is not None 

55 assert isinstance(result, tuple) 

56 

57 simulation_kpis: SimulationKPIs = result[0] # type: ignore 

58 log_info = result[1] 

59 ( 

60 global_kpis, 

61 task_kpis, 

62 resource_kpis, 

63 _, 

64 _, 

65 ) = simulation_kpis 

66 

67 return global_kpis, task_kpis, resource_kpis, log_info 

68 

69 except Exception as e: 

70 print_l3(f"Error in simulation: {e}") 

71 if Settings.SHOW_SIMULATION_ERRORS: 

72 info(traceback.format_exc()) 

73 raise e 

74 

75 @staticmethod 

76 def run_simulation_median(state: "State") -> RunSimulationResult: 

77 """Run multiple simulations and return the median simulation's results.""" 

78 results: list[MedianResult] = [] 

79 if not Settings.DISABLE_PARALLEL_EVALUATION and Settings.MAX_THREADS_MEDIAN_CALCULATION > 1: 

80 if SimulationRunner._executor is None: 

81 info( 

82 f"Starting ProcessPoolExecutor with " 

83 f"{Settings.MAX_THREADS_MEDIAN_CALCULATION} workers for median calculation" 

84 ) 

85 SimulationRunner._executor = ProcessPoolExecutor( 

86 max_workers=Settings.MAX_THREADS_MEDIAN_CALCULATION 

87 ) 

88 futures: list[Future[RunSimulationResult]] = [] 

89 for _ in range(Settings.NUMBER_OF_SIMULATION_FOR_MEDIAN): 

90 futures.append(SimulationRunner._executor.submit(SimulationRunner.run_simulation, state)) 

91 

92 results: list[MedianResult] = [] 

93 for future in as_completed(futures): 

94 result = future.result() 

95 _, _, _, log_info = result 

96 

97 first_enablement = min( 

98 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list], 

99 default=log_info.started_at, 

100 ) 

101 last_completion = max( 

102 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list], 

103 default=log_info.ended_at, 

104 ) 

105 total_cycle_time = (last_completion - first_enablement).total_seconds() 

106 results.append(MedianResult(total_cycle_time, result)) 

107 else: 

108 for _ in range(Settings.NUMBER_OF_SIMULATION_FOR_MEDIAN): 

109 result = SimulationRunner.run_simulation(state) 

110 _, _, _, log_info = result 

111 

112 first_enablement = min( 

113 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list], 

114 default=log_info.started_at, 

115 ) 

116 last_completion = max( 

117 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list], 

118 default=log_info.ended_at, 

119 ) 

120 total_cycle_time = (last_completion - first_enablement).total_seconds() 

121 results.append(MedianResult(total_cycle_time, result)) 

122 

123 results.sort(key=lambda x: x.total_cycle_time) 

124 median_result = results[len(results) // 2] 

125 return median_result.result 

126 

127 @staticmethod 

128 def close_executor() -> None: 

129 """Close the executor.""" 

130 if SimulationRunner._executor is not None: 

131 SimulationRunner._executor.shutdown(wait=True) 

132 SimulationRunner._executor = None