Coverage for o2/models/timetable/timetable_type.py: 93%
214 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import operator
2from dataclasses import dataclass, field, replace
3from typing import TYPE_CHECKING, Callable, Literal, Optional, Union
5from dataclass_wizard import JSONWizard
7from o2.models.days import DAY
8from o2.models.timetable.batch_type import BATCH_TYPE
9from o2.models.timetable.batching_rule import BatchingRule
10from o2.models.timetable.firing_rule import FiringRule
11from o2.models.timetable.gateway_branching_probability import GatewayBranchingProbability
12from o2.models.timetable.granule_size import GranuleSize
13from o2.models.timetable.multitask import Multitask
14from o2.models.timetable.resource import Resource
15from o2.models.timetable.resource_calendar import ResourceCalendar
16from o2.models.timetable.resource_pool import ResourcePool
17from o2.models.timetable.rule_type import RULE_TYPE
18from o2.models.timetable.task_resource_distribution import (
19 ArrivalTimeDistribution,
20 TaskResourceDistributions,
21)
22from o2.models.timetable.time_period import TimePeriod
23from o2.util.custom_dumper import CustomDumper, CustomLoader
24from o2.util.helper import (
25 cached_lambdify,
26 hash_int,
27 name_is_clone_of,
28)
29from o2.util.logger import info
31if TYPE_CHECKING:
32 from o2.models.constraints import ConstraintsType
33 from o2.models.rule_selector import RuleSelector
34 from o2.models.state import State
37@dataclass(frozen=True, eq=True)
38class TimetableType(JSONWizard, CustomLoader, CustomDumper):
39 """The main class representing the timetable."""
41 resource_profiles: list[ResourcePool]
42 arrival_time_distribution: ArrivalTimeDistribution
43 arrival_time_calendar: list["TimePeriod"]
44 gateway_branching_probabilities: list[GatewayBranchingProbability]
45 task_resource_distribution: list[TaskResourceDistributions]
46 resource_calendars: list[ResourceCalendar]
47 batch_processing: list[BatchingRule] = field(default_factory=list)
48 start_time: str = "2000-01-01T00:00:00Z"
49 total_cases: int = 1000
50 multitask: Optional[Multitask] = None
51 model_type: Optional[Literal["FUZZY", "CRISP"]] = None
52 granule_size: Optional[GranuleSize] = None
53 event_distribution: Optional[Union[list[dict], dict]] = None
54 global_attributes: Optional[list[dict]] = None
55 case_attributes: Optional[list[dict]] = None
56 event_attributes: Optional[list[dict]] = None
57 branch_rules: Optional[list[dict]] = None
59 class _(JSONWizard.Meta): # noqa: N801
60 key_transform_with_dump = "SNAKE"
61 skip_defaults = True
63 def init_fixed_cost_fns(self, constraints: "ConstraintsType") -> "TimetableType":
64 """Initialize the fixed cost fn for all resources."""
65 return replace(
66 self,
67 resource_profiles=[
68 replace(
69 resource_profile,
70 fixed_cost_fn=constraints.get_fixed_cost_fn_for_task(resource_profile.id),
71 )
72 for resource_profile in self.resource_profiles
73 ],
74 )
76 def get_batching_rule(
77 self, rule_selector: "RuleSelector"
78 ) -> Union[tuple[int, BatchingRule], tuple[None, None]]:
79 """Get a batching rule by rule selector."""
80 return next(
81 (
82 (i, rule)
83 for i, rule in enumerate(self.batch_processing)
84 if rule.task_id == rule_selector.batching_rule_task_id
85 ),
86 (
87 None,
88 None,
89 ),
90 )
92 def get_batching_rules_for_task(
93 self, task_id: str, batch_type: Optional["BATCH_TYPE"] = None
94 ) -> list[BatchingRule]:
95 """Get all batching rules for a task."""
96 return [
97 rule
98 for rule in self.batch_processing
99 if rule.task_id == task_id and (batch_type is None or rule.type == batch_type)
100 ]
102 def get_batching_rules_for_tasks(
103 self, task_ids: list[str], batch_type: Optional["BATCH_TYPE"] = None
104 ) -> list[BatchingRule]:
105 """Get all batching rules for a list of tasks."""
106 return [
107 rule
108 for rule in self.batch_processing
109 if rule.task_id in task_ids and (batch_type is None or rule.type == batch_type)
110 ]
112 def get_firing_rules_for_task(
113 self,
114 task_id: str,
115 batch_type: Optional["BATCH_TYPE"] = None,
116 rule_type: Optional[RULE_TYPE] = None,
117 ) -> list[FiringRule]:
118 """Get all firing rules for a task."""
119 return [
120 firing_rule
121 for batching_rule in self.get_batching_rules_for_tasks([task_id], batch_type)
122 for firing_rules in batching_rule.firing_rules
123 for firing_rule in firing_rules
124 if rule_type is None or firing_rule.attribute == rule_type
125 ]
127 def get_longest_time_period_for_daily_hour_firing_rules(
128 self, task_id: str, day: DAY
129 ) -> Optional[tuple[Optional["RuleSelector"], "RuleSelector", "RuleSelector"]]:
130 """Get the longest time period for daily hour firing rules.
132 Returns the Rule Selector of the day, lower bound, and upper bound.
133 """
134 batching_rules = self.get_batching_rules_for_task(
135 task_id=task_id,
136 )
138 best_selector = None
139 best_length = 0
141 for batching_rule in batching_rules:
142 if batching_rule is None:
143 continue
144 periods = batching_rule.get_time_period_for_daily_hour_firing_rules().items()
145 for (day_selector, lower_bound_selector, upper_bound_selector), (
146 _day,
147 lower_bound,
148 upper_bound,
149 ) in periods:
150 if _day == day:
151 length = (
152 (upper_bound - lower_bound)
153 if upper_bound is not None and lower_bound is not None
154 else 0
155 )
156 if length > best_length:
157 best_selector = (
158 day_selector,
159 lower_bound_selector,
160 upper_bound_selector,
161 )
162 best_length = length
163 return best_selector
165 def get_firing_rule_selectors_for_task(
166 self,
167 task_id: str,
168 batch_type: Optional["BATCH_TYPE"] = None,
169 rule_type: Optional[RULE_TYPE] = None,
170 ) -> list["RuleSelector"]:
171 """Get all firing rule selectors for a task."""
172 return [
173 rule_selector
174 for batching_rule in self.get_batching_rules_for_tasks([task_id], batch_type)
175 for rule_selector in batching_rule.get_firing_rule_selectors(rule_type)
176 ]
178 def get_firing_rule_selectors_for_tasks(
179 self,
180 task_ids: list[str],
181 batch_type: Optional["BATCH_TYPE"] = None,
182 rule_type: Optional[RULE_TYPE] = None,
183 ) -> list["RuleSelector"]:
184 """Get all firing rule selectors for a list of tasks."""
185 return [
186 rule_selector
187 for batching_rule in self.get_batching_rules_for_tasks(task_ids, batch_type)
188 for rule_selector in batching_rule.get_firing_rule_selectors(rule_type)
189 ]
191 def get_firing_rules_for_tasks(
192 self,
193 task_ids: list[str],
194 batch_type: Optional["BATCH_TYPE"] = None,
195 rule_type: Optional[RULE_TYPE] = None,
196 ) -> list[FiringRule]:
197 """Get all firing rules for a list of tasks."""
198 return [
199 firing_rule
200 for batching_rule in self.get_batching_rules_for_tasks(task_ids, batch_type)
201 for firing_rules in batching_rule.firing_rules
202 for firing_rule in firing_rules
203 if rule_type is None or firing_rule.attribute == rule_type
204 ]
206 def get_resource(self, resource_name: str) -> Optional[Resource]:
207 """Get resource (from resource_profiles) with the given name.
209 Looks through all resource profiles and returns the first resource,
210 that matches the given id.
211 """
212 for resource_profile in self.resource_profiles:
213 for resource in resource_profile.resource_list:
214 # For compatibility with legacy Optimos, we also check for the
215 # resource name with "timetable" appended.
216 if (
217 resource.name == resource_name
218 or resource.id == (resource_name + "timetable")
219 or (resource.id + "timetable") == resource_name
220 ):
221 return resource
222 return None
224 def get_tasks(self, resource_id: str) -> list[str]:
225 """Get all tasks assigned to a resource."""
226 resource = self.get_resource(resource_id)
227 if resource is None:
228 return []
229 return resource.assigned_tasks
231 def get_task_resource_distribution(self, task_id: str) -> Optional[TaskResourceDistributions]:
232 """Get task resource distribution by task id."""
233 for task_resource_distribution in self.task_resource_distribution:
234 if task_resource_distribution.task_id == task_id:
235 return task_resource_distribution
236 return None
238 def get_resources_assigned_to_task(self, task_id: str) -> list[str]:
239 """Get all resources assigned to a task."""
240 task_resource_distribution = self.get_task_resource_distribution(task_id)
241 if task_resource_distribution is None:
242 return []
243 return [resource.resource_id for resource in task_resource_distribution.resources]
245 def get_task_ids_assigned_to_resource(self, resource_id: str) -> list[str]:
246 """Get all tasks assigned to a resource."""
247 return [
248 task_resource_distribution.task_id
249 for task_resource_distribution in self.task_resource_distribution
250 if resource_id in task_resource_distribution.resource_ids
251 ]
253 def get_resource_profiles_containing_resource(self, resource_id: str) -> list[ResourcePool]:
254 """Get the resource profiles containing a resource."""
255 return [
256 resource_profile
257 for resource_profile in self.resource_profiles
258 if any(resource.id == resource_id for resource in resource_profile.resource_list)
259 ]
261 def get_resource_profile(self, profile_id: str) -> Optional[ResourcePool]:
262 """Get a resource profile by profile id.
264 Legacy Optimos considers the profile id to be a task id.
265 """
266 return next(
267 (
268 resource_profile
269 for resource_profile in self.resource_profiles
270 if resource_profile.id == profile_id
271 ),
272 None,
273 )
275 def get_resource_calendar_id(self, resource_id: str) -> Optional[str]:
276 """Get the resource calendar id for a resource."""
277 resource = self.get_resource(resource_id)
278 if resource is None:
279 return None
280 return resource.calendar
282 def get_hourly_rates(self) -> dict[str, int]:
283 """Get the cost per hour for each resource."""
284 return {resource.id: resource.cost_per_hour for resource in self.get_all_resources()}
286 def get_fixed_cost_fns(self) -> dict[str, Callable[[float], float]]:
287 """Get the fixed cost function for each resource pool (task)."""
288 return {
289 resource_profile.id: cached_lambdify(resource_profile.fixed_cost_fn)
290 for resource_profile in self.resource_profiles
291 }
293 def get_calendar_for_resource(self, resource_name: str) -> Optional[ResourceCalendar]:
294 """Get a resource calendar by resource name."""
295 calendar_id = self.get_resource_calendar_id(resource_name)
296 if calendar_id is None:
297 return None
298 return self.get_calendar(calendar_id)
300 def get_calendar_for_base_resource(self, resource_id: str) -> Optional[ResourceCalendar]:
301 """Get a resource calendar by resource clone/original name.
303 If the resource is a clone, get the calendar of the base resource.
304 """
305 return next(
306 (
307 self.get_calendar(resource.calendar)
308 for resource in self.resource_profiles
309 for resource in resource.resource_list
310 if resource.id == resource_id or name_is_clone_of(resource_id, resource.id)
311 ),
312 None,
313 )
315 def get_calendars_for_resource_clones(self, resource_name: str) -> list[ResourceCalendar]:
316 """Get all resource calendars of clones of a resource."""
317 return [
318 resource_calendar
319 for resource_calendar in self.resource_calendars
320 if name_is_clone_of(resource_calendar.name, resource_name)
321 ]
323 def get_calendar(self, calendar_id: str) -> Optional[ResourceCalendar]:
324 """Get a resource calendar by calendar id."""
325 for resource_calendar in self.resource_calendars:
326 if resource_calendar.id == calendar_id:
327 return resource_calendar
328 return None
330 def get_all_resources(self) -> list[Resource]:
331 """Get all resources."""
332 resources = {
333 resource.id: resource
334 for resource_profile in self.resource_profiles
335 for resource in resource_profile.resource_list
336 }
337 return list(resources.values())
339 def get_deleted_resources(self, base_state: "State") -> list[Resource]:
340 """Get all resources that have been deleted."""
341 return [
342 resource
343 for resource_profile in base_state.timetable.resource_profiles
344 for resource in resource_profile.resource_list
345 if self.get_resource(resource.id) is None
346 ]
348 def get_resources_with_cost(self) -> list[tuple[Resource, int]]:
349 """Get all resources with cost. Sorted desc."""
350 return sorted(
351 (
352 (
353 resource_profile,
354 resource_profile.get_total_cost(self),
355 )
356 for resource_profile in self.get_all_resources()
357 ),
358 key=operator.itemgetter(1),
359 reverse=True,
360 )
362 def replace_batching_rule(
363 self, rule_selector: "RuleSelector", new_batching_rule: BatchingRule
364 ) -> "TimetableType":
365 """Replace a batching rule."""
366 return replace(
367 self,
368 batch_processing=[
369 new_batching_rule if rule.task_id == rule_selector.batching_rule_task_id else rule
370 for rule in self.batch_processing
371 ],
372 )
374 def replace_firing_rule(
375 self, rule_selector: "RuleSelector", new_firing_rule: FiringRule, duration_fn: Optional[str] = None
376 ) -> "TimetableType":
377 """Replace a firing rule."""
378 _, batching_rule = self.get_batching_rule(rule_selector)
379 if batching_rule is None:
380 return self
381 new_batching_rule = batching_rule.replace_firing_rule(
382 rule_selector, new_firing_rule, duration_fn=duration_fn
383 )
384 return self.replace_batching_rule(rule_selector, new_batching_rule)
386 def add_firing_rule(
387 self,
388 rule_selector: "RuleSelector",
389 new_firing_rule: FiringRule,
390 duration_fn: Optional[str] = None,
391 ) -> "TimetableType":
392 """Add a firing rule."""
393 _, old_batching_rule = self.get_batching_rule(rule_selector)
394 if old_batching_rule is None:
395 batching_rule = BatchingRule.from_task_id(
396 rule_selector.batching_rule_task_id,
397 firing_rules=[new_firing_rule],
398 duration_fn=duration_fn,
399 )
400 return replace(self, batch_processing=self.batch_processing + [batching_rule])
401 else:
402 batching_rule = old_batching_rule.add_firing_rule(new_firing_rule)
403 return self.replace_batching_rule(rule_selector, batching_rule)
405 def replace_resource_calendar(self, new_calendar: ResourceCalendar) -> "TimetableType":
406 """Replace a resource calendar. Returns a new TimetableType."""
407 resource_calendars = [
408 new_calendar if rc.id == new_calendar.id else rc for rc in self.resource_calendars
409 ]
410 return replace(self, resource_calendars=resource_calendars)
412 def remove_resource(self, resource_id: str) -> "TimetableType":
413 """Get a new timetable with a resource removed."""
414 resource = self.get_resource(resource_id)
415 if resource is None:
416 return self
418 new_resource_profiles = [
419 resource_profile.remove_resource(resource_id) for resource_profile in self.resource_profiles
420 ]
422 new_task_resource_distribution = [
423 task_resource_distribution.remove_resource(resource_id)
424 for task_resource_distribution in self.task_resource_distribution
425 ]
427 new_resource_calendars = [
428 resource_calendar
429 for resource_calendar in self.resource_calendars
430 if resource_calendar.id != resource.calendar
431 ]
433 return replace(
434 self,
435 resource_profiles=new_resource_profiles,
436 task_resource_distribution=new_task_resource_distribution,
437 resource_calendars=new_resource_calendars,
438 )
440 def clone_resource(self, resource_id: str, assigned_tasks: Optional[list[str]]) -> "TimetableType":
441 """Get a new timetable with a resource duplicated.
443 The new resource will only have the assigned tasks given,
444 but copy all other properties from the original resource.
446 The Clone will be added in three places:
447 1. in the resource calendars
448 2. in the resource pools of the assigned_tasks
449 3. in the task_resource_distribution of the assigned_tasks
451 The Resource Constraints will not be cloned, because the original
452 constraints will automatically be "assigned" based on the name.
454 The Naming of the resource will also reflect clones of clones,
455 meaning a clone of a clone will have the same name to a first level clone
456 """
457 original_resource = self.get_resource(resource_id)
458 if original_resource is None:
459 return self
460 if assigned_tasks is None:
461 assigned_tasks = original_resource.assigned_tasks
462 resource_clone = original_resource.clone(assigned_tasks)
464 cloned_resource_calendars = self._clone_resource_calendars(
465 original_resource, resource_clone, assigned_tasks
466 )
468 cloned_resource_profiles = self._clone_resource_profiles(
469 original_resource, resource_clone, assigned_tasks
470 )
472 cloned_resource_distribution = self._clone_task_distributions(
473 original_resource, resource_clone, assigned_tasks
474 )
475 return replace(
476 self,
477 resource_profiles=cloned_resource_profiles,
478 task_resource_distribution=cloned_resource_distribution,
479 resource_calendars=cloned_resource_calendars,
480 )
482 def remove_task_from_resource(self, resource_id: str, task_id: str) -> "TimetableType":
483 """Get a new timetable with a task removed from a resource.
485 The task will be removed from the resource's assigned tasks.
486 The resource will be removed from the task's resource distribution.
487 """
488 resource = self.get_resource(resource_id)
489 if resource is None:
490 return self
492 updated_resource = resource.remove_task(task_id)
494 new_resource_profiles = [
495 resource_profile.remove_resource(resource_id)
496 if resource_profile.id == task_id
497 else resource_profile.update_resource(updated_resource)
498 for resource_profile in self.resource_profiles
499 ]
501 new_task_resource_distribution = [
502 task_resource_distribution.remove_resource(resource_id)
503 if task_resource_distribution.task_id == task_id
504 else task_resource_distribution
505 for task_resource_distribution in self.task_resource_distribution
506 ]
508 return replace(
509 self,
510 resource_profiles=new_resource_profiles,
511 task_resource_distribution=new_task_resource_distribution,
512 )
514 def get_task_ids(self) -> list[str]:
515 """Get all task ids."""
516 return [task.task_id for task in self.task_resource_distribution]
518 def get_highest_availability_time_period(self, task_id: str, min_hours: int) -> Optional[TimePeriod]:
519 """Get the highest availability time period for the task.
521 The highest availability time period is the time period with the highest
522 frequency of availability.
523 """
524 task_distribution = self.get_task_resource_distribution(task_id)
525 if task_distribution is None:
526 return None
527 return task_distribution.get_highest_availability_time_period(self, min_hours)
529 @property
530 def max_total_hours_per_resource(self) -> int:
531 """Get the maximum total hours per resource."""
532 return max(resource_calendar.total_hours for resource_calendar in self.resource_calendars)
534 @property
535 def max_consecutive_hours_per_resource(self) -> int:
536 """Get the maximum shift size per resource."""
537 return max(resource_calendar.max_consecutive_hours for resource_calendar in self.resource_calendars)
539 @property
540 def max_periods_per_day_per_resource(self) -> int:
541 """Get the maximum shifts per day per resource."""
542 return max(resource_calendar.max_periods_per_day for resource_calendar in self.resource_calendars)
544 @property
545 def batching_rules_exist(self) -> bool:
546 """Check if any batching rules exist."""
547 return len(self.batch_processing) > 0
549 def _clone_resource_calendars(self, original: Resource, clone: Resource, _: list[str]):
550 """Get a Clone of the Resource Calendars, with the new resource added."""
551 original_resource_calendar = self.get_calendar(original.calendar)
552 if original_resource_calendar is None:
553 return self.resource_calendars
555 return self.resource_calendars + [
556 replace(
557 original_resource_calendar,
558 id=clone.calendar,
559 name=clone.calendar,
560 )
561 ]
563 def _clone_task_distributions(
564 self,
565 original: Resource,
566 clone: Resource,
567 assigned_tasks: list[str],
568 ):
569 """Get a Clone of the Task Distributions, with the new resource added."""
570 original_task_distributions = [
571 self.get_task_resource_distribution(task_id) for task_id in assigned_tasks
572 ]
574 new_task_distributions = [
575 task_distribution.add_resource_based_on_original(original.id, clone.id)
576 for task_distribution in self.task_resource_distribution
577 if task_distribution in original_task_distributions
578 ]
580 return [
581 task_distribution
582 for task_distribution in self.task_resource_distribution
583 if task_distribution not in original_task_distributions
584 ] + new_task_distributions
586 def _clone_resource_profiles(self, _: Resource, clone: Resource, assigned_tasks: list[str]):
587 """Get a Clone of the Resource Profiles, with the new resource added."""
588 original_resource_profiles = [self.get_resource_profile(task) for task in assigned_tasks]
590 new_resource_profiles = [
591 replace(
592 resource_profile,
593 resource_list=resource_profile.resource_list + [clone],
594 )
595 for resource_profile in original_resource_profiles
596 if resource_profile is not None
597 ]
599 return [
600 resource_profile
601 for resource_profile in self.resource_profiles
602 if resource_profile not in original_resource_profiles
603 ] + new_resource_profiles
605 def batching_rules_debug_str(self) -> str:
606 """Get the batching rules as a string."""
607 lines = []
608 for batching_rule in self.batch_processing:
609 lines.append(f"\tTask: {batching_rule.task_id}")
610 or_lines = []
611 for or_rule in batching_rule.firing_rules:
612 and_lines = []
613 for rule in or_rule:
614 and_lines.append(f"\t\t{rule.attribute} {rule.comparison} {rule.value}\n")
615 or_lines.append("\t\tAND\n".join(and_lines))
616 lines.append("\tOR\n".join(or_lines))
617 return "\n".join(lines)
619 def print_batching_rules(self) -> None:
620 """Print the batching rules."""
621 info(self.batching_rules_debug_str())
623 def is_valid(self) -> bool:
624 """Check if the timetable is valid.
626 The timetable is valid if all calendars are valid.
627 TODO: Add more checks.
628 """
629 return all(calendar.is_valid() for calendar in self.resource_calendars) and all(
630 rule.is_valid() for rule in self.batch_processing
631 )
633 def __hash__(self) -> int:
634 """Hash the timetable.
636 NOTE: We cache the hash in a __dict__ field, because we need to
637 make sure that the hash is only computed once. functools don't
638 work for __hash__.
639 """
640 if "_hash" in self.__dict__:
641 return self.__dict__["_hash"]
642 self.__dict__["_hash"] = hash_int(self.to_json())
643 return self.__dict__["_hash"]