Coverage for o2/agents/agent.py: 87%

137 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1from abc import ABC, abstractmethod 

2from collections import defaultdict 

3from typing import Optional 

4 

5from o2.actions.base_actions.add_size_rule_base_action import AddSizeRuleAction 

6from o2.actions.base_actions.base_action import BaseAction, RateSelfReturnType 

7from o2.actions.batching_actions.add_date_time_rule_by_availability_action import ( 

8 AddDateTimeRuleByAvailabilityAction, 

9) 

10from o2.actions.batching_actions.add_date_time_rule_by_enablement_action import ( 

11 AddDateTimeRuleByEnablementAction, 

12) 

13from o2.actions.batching_actions.add_date_time_rule_by_start_action import ( 

14 AddDateTimeRuleByStartAction, 

15) 

16from o2.actions.batching_actions.add_large_wt_rule_by_idle_action import ( 

17 AddLargeWTRuleByIdleAction, 

18) 

19from o2.actions.batching_actions.add_large_wt_rule_by_wt_action import AddLargeWTRuleByWTAction 

20from o2.actions.batching_actions.add_ready_wt_rule_by_wt_action import AddReadyWTRuleByWTAction 

21from o2.actions.batching_actions.modify_daily_hour_rule_action import ( 

22 ModifyDailyHourRuleAction, 

23) 

24from o2.actions.batching_actions.modify_size_rule_by_allocation_action import ( 

25 ModifySizeRuleByHighAllocationAction, 

26 ModifySizeRuleByLowAllocationAction, 

27) 

28from o2.actions.batching_actions.modify_size_rule_by_cost_action import ( 

29 ModifySizeRuleByCostAction, 

30) 

31from o2.actions.batching_actions.modify_size_rule_by_cost_fn_action import ( 

32 ModifyBatchSizeIfNoCostImprovementAction, 

33 ModifySizeRuleByCostFnHighCostsAction, 

34 ModifySizeRuleByCostFnLowCycleTimeImpactAction, 

35 ModifySizeRuleByCostFnLowProcessingTimeAction, 

36 ModifySizeRuleByCostFnRepetitiveTasksAction, 

37 ModifySizeRuleByManySimilarEnablementsAction, 

38) 

39from o2.actions.batching_actions.modify_size_rule_by_duration_fn_action import ( 

40 ModifyBatchSizeIfNoDurationImprovementAction, 

41 ModifySizeRuleByDurationFnCostImpactAction, 

42) 

43from o2.actions.batching_actions.modify_size_rule_by_utilization_action import ( 

44 ModifySizeRuleByHighUtilizationAction, 

45 ModifySizeRuleByLowUtilizationAction, 

46) 

47from o2.actions.batching_actions.modify_size_rule_by_wt_action import ( 

48 ModifySizeRuleByWTAction, 

49) 

50from o2.actions.batching_actions.random_action import RandomAction 

51from o2.actions.batching_actions.remove_rule_action import RemoveRuleAction 

52from o2.actions.legacy_optimos_actions.add_resource_action import AddResourceAction 

53from o2.actions.legacy_optimos_actions.modify_calendar_by_cost_action import ( 

54 ModifyCalendarByCostAction, 

55) 

56from o2.actions.legacy_optimos_actions.modify_calendar_by_it_action import ( 

57 ModifyCalendarByITAction, 

58) 

59from o2.actions.legacy_optimos_actions.modify_calendar_by_wt_action import ( 

60 ModifyCalendarByWTAction, 

61) 

62from o2.actions.legacy_optimos_actions.remove_resource_by_cost_action import ( 

63 RemoveResourceByCostAction, 

64) 

65from o2.actions.legacy_optimos_actions.remove_resource_by_utilization_action import ( 

66 RemoveResourceByUtilizationAction, 

67) 

68from o2.models.self_rating import RATING 

69from o2.models.solution import Solution 

70from o2.store import SolutionTry, Store 

71from o2.util.indented_printer import print_l1, print_l2, print_l3 

72 

73ACTION_CATALOG: list[type[BaseAction]] = [ 

74 AddResourceAction, 

75 ModifyCalendarByCostAction, 

76 ModifyCalendarByITAction, 

77 ModifyCalendarByWTAction, 

78 ModifyDailyHourRuleAction, 

79 RemoveResourceByCostAction, 

80 RemoveResourceByUtilizationAction, 

81 RemoveRuleAction, 

82] 

83 

84 

85ACTION_CATALOG_LEGACY: list[type[BaseAction]] = [ 

86 AddResourceAction, 

87 ModifyCalendarByCostAction, 

88 ModifyCalendarByITAction, 

89 ModifyCalendarByWTAction, 

90 RemoveResourceByCostAction, 

91 RemoveResourceByUtilizationAction, 

92] 

93 

94ACTION_CATALOG_BATCHING_ONLY: list[type[BaseAction]] = [ 

95 AddDateTimeRuleByAvailabilityAction, 

96 AddDateTimeRuleByEnablementAction, 

97 AddDateTimeRuleByStartAction, 

98 AddLargeWTRuleByIdleAction, 

99 AddLargeWTRuleByWTAction, 

100 AddReadyWTRuleByWTAction, 

101 ModifyBatchSizeIfNoCostImprovementAction, 

102 ModifyBatchSizeIfNoDurationImprovementAction, 

103 ModifySizeRuleByCostAction, 

104 ModifySizeRuleByCostFnHighCostsAction, 

105 ModifySizeRuleByCostFnLowCycleTimeImpactAction, 

106 ModifySizeRuleByCostFnLowProcessingTimeAction, 

107 ModifySizeRuleByCostFnRepetitiveTasksAction, 

108 ModifySizeRuleByDurationFnCostImpactAction, 

109 ModifySizeRuleByHighAllocationAction, 

110 ModifySizeRuleByHighUtilizationAction, 

111 ModifySizeRuleByLowAllocationAction, 

112 ModifySizeRuleByLowUtilizationAction, 

113 ModifySizeRuleByManySimilarEnablementsAction, 

114 ModifySizeRuleByWTAction, 

115 # Legacy Rules, that are fallbacks now 

116 AddSizeRuleAction, 

117 ModifyDailyHourRuleAction, 

118 RemoveRuleAction, 

119] 

120 

121 

122ACTION_CATALOG_RANDOM: list[type[BaseAction]] = [RandomAction] 

123 

124 

125class NoNewBaseSolutionFoundError(Exception): 

126 """Exception raised when no new base solution is found.""" 

127 

128 pass 

129 

130 

131class NoActionsLeftError(Exception): 

132 """Exception raised when no actions are left to perform.""" 

133 

134 pass 

135 

136 

137class IterationsPerSolutionReachedError(Exception): 

138 """Exception raised when the iterations per solution limit is reached.""" 

139 

140 pass 

141 

142 

143class Agent(ABC): 

144 """Selects the best action to take next, based on the current state of the store.""" 

145 

146 def __init__(self, store: Store) -> None: 

147 """Initialize an agent with a reference to the store. 

148 

149 Sets up the action catalog based on store settings and initializes 

150 tracking for action generators and tabu actions. 

151 """ 

152 super().__init__() 

153 self.store = store 

154 self.catalog = ( 

155 ACTION_CATALOG_LEGACY 

156 if store.settings.optimos_legacy_mode 

157 else ACTION_CATALOG_BATCHING_ONLY 

158 if store.settings.batching_only 

159 else ACTION_CATALOG 

160 ) 

161 self.iterations_per_solution: float = store.settings.iterations_per_solution or float("inf") 

162 self.action_generators: list[RateSelfReturnType[BaseAction]] = [] 

163 self.action_generator_tabu_ids: set[str] = set() 

164 self.action_generator_counter: dict[RateSelfReturnType[BaseAction], int] = defaultdict(int) 

165 self.set_action_generators(store.solution) 

166 

167 def select_actions(self) -> Optional[list[BaseAction]]: 

168 """Select the best actions to take next. 

169 

170 It will pick at most cpu_count actions, so parallel evaluation is possible. 

171 

172 If the possible options for the current base evaluation are exhausted, 

173 it will choose a new base evaluation. 

174 """ 

175 while True: 

176 if self.iterations_per_solution < 0: 

177 print_l1(f"Finished iterations for solution {self.store.solution.id}.") 

178 self.set_new_base_solution() 

179 continue 

180 

181 if self.store.settings.iterations_per_solution is not None: 

182 done = self.store.settings.iterations_per_solution - self.iterations_per_solution + 1 

183 extra_info = f" - {done}/{self.store.settings.iterations_per_solution}" 

184 else: 

185 extra_info = "" 

186 print_l1(f"Choosing best action (based on {self.store.solution.id}{extra_info})...") 

187 

188 # Get valid actions from the generators, even multiple per generator, 

189 # if we don't have enough valid actions yet 

190 possible_actions = self.get_valid_actions() 

191 # Remove None values 

192 possible_actions = [action for action in possible_actions if action is not None] 

193 

194 if len(possible_actions) == 0: 

195 print_l1("No actions remaining, after removing Tabu & N/A actions.") 

196 self.set_new_base_solution( 

197 # Setting proposed_solution_try to None, we make sure that 

198 # the current solution is not reinserted into the solution tree 

199 proposed_solution_try=None 

200 ) 

201 continue 

202 

203 sorted_actions = sorted(possible_actions, key=lambda x: x[0], reverse=True) 

204 

205 number_of_actions_to_select = self.store.settings.max_number_of_actions_per_iteration 

206 selected_actions = sorted_actions[:number_of_actions_to_select] 

207 avg_rating = sum(rating for rating, _ in selected_actions) / len(selected_actions) 

208 

209 print_l1( 

210 f"Chose {len(selected_actions)} actions with average rating {avg_rating:.1f} to evaluate." # noqa: E501 

211 ) 

212 

213 if self.store.settings.print_chosen_actions: 

214 for rating, action in selected_actions: 

215 print_l2(f"{action} with rating {rating}") 

216 

217 self.iterations_per_solution -= 1 

218 return [action for _, action in selected_actions] 

219 

220 def get_valid_actions(self) -> list[tuple[RATING, BaseAction]]: 

221 """Get settings.number_of_actions_to_select valid actions from the generators. 

222 

223 If an action has already been selected for this iteration, it will skip it. 

224 If the action is tabu, it will skip it and try the next one. 

225 If the action is not applicable, it will not try more 

226 

227 

228 It will take into account the `settings.only_allow_low_last` setting, 

229 to first select non RATING.LOW actions first. 

230 

231 NOTE: This function will modify the generators_queue, so it's important 

232 so think about possible side effects. 

233 """ 

234 actions: list[tuple[RATING, BaseAction]] = [] 

235 low_actions: list[tuple[RATING, BaseAction]] = [] 

236 

237 while len(self.action_generators) > 0: 

238 action_generator = self.action_generators.pop(0) 

239 if isinstance(action_generator, tuple): 

240 continue 

241 

242 for rating, action in action_generator: 

243 if rating == RATING.NOT_APPLICABLE or action is None: 

244 break 

245 if action.id in self.action_generator_tabu_ids: 

246 continue 

247 if self.store.is_tabu(action): 

248 self.action_generator_tabu_ids.add(action.id) 

249 continue 

250 if not self.store.settings.disable_action_validity_check and not action.check_if_valid( 

251 self.store, mark_no_change_as_invalid=True 

252 ): 

253 self.action_generator_tabu_ids.add(action.id) 

254 continue 

255 if self.store.settings.only_allow_low_last and rating <= RATING.LOW: 

256 low_actions.append((rating, action)) 

257 self.action_generator_tabu_ids.add(action.id) 

258 self.action_generator_counter[action_generator] += 1 

259 else: 

260 actions.append((rating, action)) 

261 self.action_generator_tabu_ids.add(action.id) 

262 self.action_generator_counter[action_generator] += 1 

263 if len(actions) >= self.store.settings.max_number_of_actions_per_iteration: 

264 self.action_generators.append(action_generator) 

265 return actions 

266 # If the action generator has yielded more than the max, 

267 # do not re-add it, thereby forbidding it to yield more 

268 if ( 

269 self.store.settings.MAX_YIELDS_PER_ACTION is not None 

270 and self.action_generator_counter[action_generator] 

271 >= self.store.settings.MAX_YIELDS_PER_ACTION 

272 ): 

273 break 

274 

275 self.action_generators.append(action_generator) 

276 break 

277 if len(actions) == 0: 

278 return low_actions 

279 return actions 

280 

281 def set_action_generators(self, solution: Solution) -> None: 

282 """Set the action generators for the given solution. 

283 

284 NOTE: This function **must** be called when setting a new base solution. 

285 """ 

286 rating_input = solution 

287 self.action_generator_tabu_ids = set() 

288 self.action_generator_counter = defaultdict(int) 

289 self.action_generators = [Action.rate_self(self.store, rating_input) for Action in self.catalog] 

290 

291 def process_many_solutions( 

292 self, solutions: list[Solution] 

293 ) -> tuple[list[SolutionTry], list[SolutionTry]]: 

294 """Process a list of solutions. 

295 

296 See Store.process_many_solutions for more information. 

297 """ 

298 chosen_tries, not_chosen_tries = self.store.process_many_solutions( 

299 solutions, 

300 self.set_new_base_solution if not self.store.settings.never_select_new_base_solution else None, 

301 ) 

302 

303 print_l1("Actions processed:") 

304 print_l2( 

305 f"Out of the {len(solutions)} solutions, {len(chosen_tries)} in/better than the current front" 

306 ) 

307 

308 return chosen_tries, not_chosen_tries 

309 

310 def set_new_base_solution(self, proposed_solution_try: Optional[SolutionTry] = None) -> None: 

311 """Set a new base solution.""" 

312 print_l2(f"Selecting new base evaluation {'(reinserting)' if proposed_solution_try else ''}...") 

313 print_l3(f"Solutions explored so far: {self.store.solution_tree.discarded_solutions}") 

314 print_l3(f"Current solution: {self.store.solution.id}") 

315 solution = self.find_new_base_solution(proposed_solution_try) 

316 if solution != self.store.solution: 

317 self.iterations_per_solution = self.store.settings.iterations_per_solution or float("inf") 

318 self.set_action_generators(solution) 

319 else: 

320 print_l3("The new base solution is the same as the current solution.") 

321 self.store.solution = solution 

322 

323 def try_solution(self, solution: Solution) -> SolutionTry: 

324 """Try a solution and return the result.""" 

325 return self.store.try_solution(solution) 

326 

327 @abstractmethod 

328 def find_new_base_solution(self, proposed_solution_try: Optional[SolutionTry] = None) -> Solution: 

329 """Select a new base solution. 

330 

331 E.g from the SolutionTree. 

332 

333 If proposed_solution_try is None, than the current solution is not 

334 reinserted into the solution tree, as it's assumed that the function 

335 was called outside of the normal optimization loop. (E.g. when running out of 

336 actions) 

337 If it's not None, than this proposed_solution_try **may** be used as 

338 the new base solution, but this depends on the Agent implementation. 

339 

340 NOTE: This function will update the store.solution attribute. 

341 """ 

342 pass