Coverage for o2/agents/simulated_annealing_agent.py: 94%
77 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import math
2import random
3from typing import Optional
5from typing_extensions import override
7from o2.agents.agent import (
8 ACTION_CATALOG,
9 ACTION_CATALOG_BATCHING_ONLY,
10 ACTION_CATALOG_LEGACY,
11 Agent,
12 NoNewBaseSolutionFoundError,
13)
14from o2.models.solution import Solution
15from o2.pareto_front import FRONT_STATUS
16from o2.store import SolutionTry, Store
17from o2.util.indented_printer import print_l1, print_l2, print_l3
18from o2.util.logger import debug
20DISTANCE_MULTIPLIER = 4
23class SimulatedAnnealingAgent(Agent):
24 """Selects the best action to take next, based on the current state of the store."""
26 @override
27 def __init__(self, store: Store) -> None:
28 super().__init__(store)
30 self.catalog = (
31 ACTION_CATALOG_LEGACY
32 if store.settings.optimos_legacy_mode
33 else ACTION_CATALOG_BATCHING_ONLY
34 if store.settings.batching_only
35 else ACTION_CATALOG
36 )
38 self.temperature = store.settings.sa_initial_temperature
39 if self.temperature == "auto":
40 # Guestimate a good starting temperature, by basically allowing all
41 # points in a 4x circle around the base evaluation.
42 self.temperature = (
43 math.sqrt(store.base_evaluation.pareto_x**2 + store.base_evaluation.pareto_y**2)
44 * DISTANCE_MULTIPLIER
45 )
46 print_l1(f"Auto-estimated initial temperature: {self.temperature:_.2f}")
47 if self.store.settings.sa_cooling_factor == "auto":
48 if self.store.settings.max_solutions:
49 number_of_iterations_to_cool = self.store.settings.sa_cooling_iteration_percent * (
50 self.store.settings.max_solutions
51 // (
52 self.store.settings.max_number_of_actions_per_iteration
53 # Usually we don't actually create max_number_of_actions_per_iteration solutions
54 # per iteration, so we multiply by 0.75 to allow for some slack.
55 * 0.75
56 )
57 )
58 else:
59 number_of_iterations_to_cool = (
60 self.store.settings.sa_cooling_iteration_percent * self.store.settings.max_iterations
61 )
62 if self.store.settings.error_radius_in_percent is None:
63 raise ValueError(
64 "Settings.error_radius_in_percent is not set, "
65 "so we can't auto calculate the cooling factor."
66 )
67 temperature_to_reach = self.store.settings.error_radius_in_percent * math.sqrt(
68 store.base_evaluation.pareto_x**2 + store.base_evaluation.pareto_y**2
69 )
70 self.store.settings.sa_cooling_factor = (temperature_to_reach / self.temperature) ** (
71 1 / number_of_iterations_to_cool
72 )
73 print_l1(f"Auto-estimated cooling factor: {self.store.settings.sa_cooling_factor:.4f}")
74 print_l2(
75 f"Which will reach {temperature_to_reach:_.2f} after "
76 f"{number_of_iterations_to_cool} iterations..."
77 )
78 max_iterations = self.store.settings.max_iterations
79 assert isinstance(self.store.settings.sa_cooling_factor, float)
80 temp_after_all_iterations = self.temperature * (
81 self.store.settings.sa_cooling_factor**max_iterations
82 )
83 print_l2(f"Which means {temp_after_all_iterations:_.2f} after all {max_iterations} iterations.")
85 @override
86 def find_new_base_solution(self, proposed_solution_try: Optional[SolutionTry] = None) -> Solution:
87 print_l2(f"Old temperature: {self.temperature:_.2f}")
88 assert isinstance(self.temperature, float)
89 assert isinstance(self.store.settings.sa_cooling_factor, float)
90 if self.store.settings.error_radius_in_percent is not None:
91 min_radius = self.store.settings.error_radius_in_percent * math.sqrt(
92 self.store.base_evaluation.pareto_x**2 + self.store.base_evaluation.pareto_y**2
93 )
94 new_temp = self.temperature * self.store.settings.sa_cooling_factor
95 if new_temp < min_radius:
96 print_l2(f"New temperature: {min_radius:_.2f} (min radius)")
97 self.temperature = min_radius
98 else:
99 print_l2(f"New temperature: {new_temp:_.2f}")
100 self.temperature = new_temp
101 else:
102 self.temperature *= self.store.settings.sa_cooling_factor
103 print_l2(f"New temperature: {self.temperature:_.2f}")
104 if proposed_solution_try is not None:
105 status, solution = proposed_solution_try
106 # Maybe accept bad solution
107 if status == FRONT_STATUS.DOMINATES:
108 # TODO: Min Distance to front
109 distance = solution.distance_to(self.store.solution)
110 debug(f"Discarded solution {solution.id} distance: {distance}")
111 if not self.store.settings.sa_strict_ordered and self._accept_worse_solution(
112 distance, self.temperature
113 ):
114 debug(f"Randomly accepted discarded solution {solution.id}.")
115 return solution
117 return self._select_new_base_evaluation(
118 # If the proposed solution try is None, we were called from
119 # maximum non improving iterations so we don't need to reinsert
120 # the current solution (which is very unlikely, as the solution
121 # will be changed after every iteration, but might happen at the end
122 # of the optimization)
123 reinsert_current_solution=proposed_solution_try is not None
124 )
126 def _select_new_base_evaluation(self, reinsert_current_solution: bool = False) -> Solution:
127 """Select a new base evaluation."""
128 if reinsert_current_solution:
129 self.store.solution_tree.add_solution(self.store.solution, archive=True)
131 assert isinstance(self.temperature, float)
133 if self.store.settings.sa_strict_ordered:
134 solution = self.store.solution_tree.get_nearest_solution(
135 self.store.current_pareto_front,
136 max_distance=self.temperature,
137 )
138 if solution not in self.store.current_pareto_front.solutions:
139 print_l3("Nearest solution is NOT in pareto front.")
140 else:
141 print_l3("Nearest solution is IN pareto front.")
142 else:
143 solution = self.store.solution_tree.get_random_solution_near_to_pareto_front(
144 self.store.current_pareto_front,
145 max_distance=self.temperature,
146 )
147 if solution is None:
148 raise NoNewBaseSolutionFoundError("No new baseline evaluation found.")
150 distance = self.store.current_pareto_front.avg_distance_to(solution)
151 print_l2(
152 f"Selected {'nearest' if self.store.settings.sa_strict_ordered else 'random'} "
153 f"base solution {solution.id} with distance: {distance:_.2f}"
154 )
156 # We delete the newly found solution from the tree, so this is a "pop" action,
157 # similarly to the TabuAgent.
158 self.store.solution_tree.remove_solution(solution)
159 return solution
161 def _accept_worse_solution(self, distance: float, temperature: float) -> bool:
162 """Determine whether to accept a worse solution."""
163 probability = math.exp(-distance / temperature)
164 return random.random() < probability