Coverage for o2/agents/simulated_annealing_agent.py: 94%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import math 

2import random 

3from typing import Optional 

4 

5from typing_extensions import override 

6 

7from o2.agents.agent import ( 

8 ACTION_CATALOG, 

9 ACTION_CATALOG_BATCHING_ONLY, 

10 ACTION_CATALOG_LEGACY, 

11 Agent, 

12 NoNewBaseSolutionFoundError, 

13) 

14from o2.models.solution import Solution 

15from o2.pareto_front import FRONT_STATUS 

16from o2.store import SolutionTry, Store 

17from o2.util.indented_printer import print_l1, print_l2, print_l3 

18from o2.util.logger import debug 

19 

20DISTANCE_MULTIPLIER = 4 

21 

22 

23class SimulatedAnnealingAgent(Agent): 

24 """Selects the best action to take next, based on the current state of the store.""" 

25 

26 @override 

27 def __init__(self, store: Store) -> None: 

28 super().__init__(store) 

29 

30 self.catalog = ( 

31 ACTION_CATALOG_LEGACY 

32 if store.settings.optimos_legacy_mode 

33 else ACTION_CATALOG_BATCHING_ONLY 

34 if store.settings.batching_only 

35 else ACTION_CATALOG 

36 ) 

37 

38 self.temperature = store.settings.sa_initial_temperature 

39 if self.temperature == "auto": 

40 # Guestimate a good starting temperature, by basically allowing all 

41 # points in a 4x circle around the base evaluation. 

42 self.temperature = ( 

43 math.sqrt(store.base_evaluation.pareto_x**2 + store.base_evaluation.pareto_y**2) 

44 * DISTANCE_MULTIPLIER 

45 ) 

46 print_l1(f"Auto-estimated initial temperature: {self.temperature:_.2f}") 

47 if self.store.settings.sa_cooling_factor == "auto": 

48 if self.store.settings.max_solutions: 

49 number_of_iterations_to_cool = self.store.settings.sa_cooling_iteration_percent * ( 

50 self.store.settings.max_solutions 

51 // ( 

52 self.store.settings.max_number_of_actions_per_iteration 

53 # Usually we don't actually create max_number_of_actions_per_iteration solutions 

54 # per iteration, so we multiply by 0.75 to allow for some slack. 

55 * 0.75 

56 ) 

57 ) 

58 else: 

59 number_of_iterations_to_cool = ( 

60 self.store.settings.sa_cooling_iteration_percent * self.store.settings.max_iterations 

61 ) 

62 if self.store.settings.error_radius_in_percent is None: 

63 raise ValueError( 

64 "Settings.error_radius_in_percent is not set, " 

65 "so we can't auto calculate the cooling factor." 

66 ) 

67 temperature_to_reach = self.store.settings.error_radius_in_percent * math.sqrt( 

68 store.base_evaluation.pareto_x**2 + store.base_evaluation.pareto_y**2 

69 ) 

70 self.store.settings.sa_cooling_factor = (temperature_to_reach / self.temperature) ** ( 

71 1 / number_of_iterations_to_cool 

72 ) 

73 print_l1(f"Auto-estimated cooling factor: {self.store.settings.sa_cooling_factor:.4f}") 

74 print_l2( 

75 f"Which will reach {temperature_to_reach:_.2f} after " 

76 f"{number_of_iterations_to_cool} iterations..." 

77 ) 

78 max_iterations = self.store.settings.max_iterations 

79 assert isinstance(self.store.settings.sa_cooling_factor, float) 

80 temp_after_all_iterations = self.temperature * ( 

81 self.store.settings.sa_cooling_factor**max_iterations 

82 ) 

83 print_l2(f"Which means {temp_after_all_iterations:_.2f} after all {max_iterations} iterations.") 

84 

85 @override 

86 def find_new_base_solution(self, proposed_solution_try: Optional[SolutionTry] = None) -> Solution: 

87 print_l2(f"Old temperature: {self.temperature:_.2f}") 

88 assert isinstance(self.temperature, float) 

89 assert isinstance(self.store.settings.sa_cooling_factor, float) 

90 if self.store.settings.error_radius_in_percent is not None: 

91 min_radius = self.store.settings.error_radius_in_percent * math.sqrt( 

92 self.store.base_evaluation.pareto_x**2 + self.store.base_evaluation.pareto_y**2 

93 ) 

94 new_temp = self.temperature * self.store.settings.sa_cooling_factor 

95 if new_temp < min_radius: 

96 print_l2(f"New temperature: {min_radius:_.2f} (min radius)") 

97 self.temperature = min_radius 

98 else: 

99 print_l2(f"New temperature: {new_temp:_.2f}") 

100 self.temperature = new_temp 

101 else: 

102 self.temperature *= self.store.settings.sa_cooling_factor 

103 print_l2(f"New temperature: {self.temperature:_.2f}") 

104 if proposed_solution_try is not None: 

105 status, solution = proposed_solution_try 

106 # Maybe accept bad solution 

107 if status == FRONT_STATUS.DOMINATES: 

108 # TODO: Min Distance to front 

109 distance = solution.distance_to(self.store.solution) 

110 debug(f"Discarded solution {solution.id} distance: {distance}") 

111 if not self.store.settings.sa_strict_ordered and self._accept_worse_solution( 

112 distance, self.temperature 

113 ): 

114 debug(f"Randomly accepted discarded solution {solution.id}.") 

115 return solution 

116 

117 return self._select_new_base_evaluation( 

118 # If the proposed solution try is None, we were called from 

119 # maximum non improving iterations so we don't need to reinsert 

120 # the current solution (which is very unlikely, as the solution 

121 # will be changed after every iteration, but might happen at the end 

122 # of the optimization) 

123 reinsert_current_solution=proposed_solution_try is not None 

124 ) 

125 

126 def _select_new_base_evaluation(self, reinsert_current_solution: bool = False) -> Solution: 

127 """Select a new base evaluation.""" 

128 if reinsert_current_solution: 

129 self.store.solution_tree.add_solution(self.store.solution, archive=True) 

130 

131 assert isinstance(self.temperature, float) 

132 

133 if self.store.settings.sa_strict_ordered: 

134 solution = self.store.solution_tree.get_nearest_solution( 

135 self.store.current_pareto_front, 

136 max_distance=self.temperature, 

137 ) 

138 if solution not in self.store.current_pareto_front.solutions: 

139 print_l3("Nearest solution is NOT in pareto front.") 

140 else: 

141 print_l3("Nearest solution is IN pareto front.") 

142 else: 

143 solution = self.store.solution_tree.get_random_solution_near_to_pareto_front( 

144 self.store.current_pareto_front, 

145 max_distance=self.temperature, 

146 ) 

147 if solution is None: 

148 raise NoNewBaseSolutionFoundError("No new baseline evaluation found.") 

149 

150 distance = self.store.current_pareto_front.avg_distance_to(solution) 

151 print_l2( 

152 f"Selected {'nearest' if self.store.settings.sa_strict_ordered else 'random'} " 

153 f"base solution {solution.id} with distance: {distance:_.2f}" 

154 ) 

155 

156 # We delete the newly found solution from the tree, so this is a "pop" action, 

157 # similarly to the TabuAgent. 

158 self.store.solution_tree.remove_solution(solution) 

159 return solution 

160 

161 def _accept_worse_solution(self, distance: float, temperature: float) -> bool: 

162 """Determine whether to accept a worse solution.""" 

163 probability = math.exp(-distance / temperature) 

164 return random.random() < probability