Coverage for o2/models/timetable/resource_calendar.py: 95%
92 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import operator
2from collections.abc import Iterator
3from dataclasses import dataclass, replace
4from functools import cached_property, reduce
5from itertools import groupby
6from typing import Optional
8from dataclass_wizard import JSONWizard
10from o2.models.days import DAY, is_day_in_range
11from o2.models.legacy_constraints import WorkMasks
12from o2.models.timetable.time_period import TimePeriod
13from o2.util.bit_mask_helper import any_has_overlap, find_mixed_ranges_in_bitmask
14from o2.util.custom_dumper import CustomDumper, CustomLoader
15from o2.util.helper import hash_int
18@dataclass(frozen=True)
19class ResourceCalendar(JSONWizard, CustomLoader, CustomDumper):
20 """Defines a calendar of available time periods for a resource."""
22 id: str
23 name: str
24 time_periods: list["TimePeriod"]
25 workload_ratio: Optional[list["TimePeriod"]] = None
27 def is_valid(self) -> bool:
28 """Check if the calendar is valid.
30 The calendar is valid if all time periods have a begin time before the end time.
31 And if the time periods are not overlapping.
32 """
33 # We do not check for valid time periods if the time periods have a probability
34 if any(tp.probability is not None for tp in self.time_periods):
35 return True
37 grouped_time_periods = self.split_group_by_day()
38 for _, time_periods_iter in grouped_time_periods:
39 time_periods = list(time_periods_iter)
40 for tp in time_periods:
41 if tp.begin_time >= tp.end_time:
42 return False
44 bitmasks = [tp.to_bitmask() for tp in time_periods]
45 if any_has_overlap(bitmasks):
46 return False
47 return True
49 def split_group_by_day(self) -> Iterator[tuple[DAY, Iterator["TimePeriod"]]]:
50 """Split the time periods by day."""
51 return groupby(self.split_time_periods_by_day(), key=lambda tp: tp.from_)
53 def split_time_periods_by_day(self) -> list["TimePeriod"]:
54 """Split the time periods by day and sort them."""
55 return sorted(
56 (tp for tp in self.time_periods for tp in tp.split_by_day),
57 key=lambda tp: tp.from_,
58 )
60 def get_periods_for_day(self, day: DAY) -> list["TimePeriod"]:
61 """Get the time periods for a specific day."""
62 return [tp for tp in self.split_time_periods_by_day() if tp.from_ == day]
64 def get_periods_containing_day(self, day: DAY) -> list["TimePeriod"]:
65 """Get the time periods that contain a specific day."""
66 return [tp for tp in self.time_periods if is_day_in_range(day, tp.from_, tp.to)]
68 @cached_property
69 def work_masks(self) -> WorkMasks:
70 """Convert the calendar to work masks."""
71 days = {
72 f"{day.name.lower()}": reduce(operator.or_, [tp.to_bitmask() for tp in time_periods])
73 for day, time_periods in self.split_group_by_day()
74 }
75 return WorkMasks(**days)
77 def get_period_index_by_id(self, period_id: str) -> Optional[int]:
78 """Get the index of a period by period id."""
79 for i, tp in enumerate(self.time_periods):
80 if tp.id == period_id:
81 return i
82 return None
84 @property
85 def total_hours(self) -> int:
86 """Get the total number of hours in the calendar."""
87 return sum((tp.end_time_hour - tp.begin_time_hour) for tp in self.split_time_periods_by_day())
89 @property
90 def max_consecutive_hours(self) -> int:
91 """Get the maximum number of continuous hours in the calendar."""
92 return max((tp.end_time_hour - tp.begin_time_hour) for tp in self.time_periods)
94 @property
95 def max_periods_per_day(self) -> int:
96 """Get the maximum number of periods in a day."""
97 return max(len(list(tp.split_by_day)) for tp in self.time_periods)
99 @property
100 def max_hours_per_day(self) -> int:
101 """Get the maximum number of hours in a day."""
102 return max(
103 sum(tp.end_time_hour - tp.begin_time_hour for tp in time_periods)
104 for _, time_periods in self.split_group_by_day()
105 )
107 @property
108 def total_periods(self) -> int:
109 """Get the total number of shifts in the calendar."""
110 return len(self.split_time_periods_by_day())
112 @cached_property
113 def uid(self) -> int:
114 """Get a unique identifier for the calendar."""
115 return hash_int(self.to_json())
117 def __hash__(self) -> int:
118 """Return a hash value for this calendar.
120 Uses the cached uid property for efficient hashing.
121 """
122 return self.uid
124 def replace_time_period(self, time_period_index: int, time_period: "TimePeriod") -> "ResourceCalendar":
125 """Replace a time period. Returns a new ResourceCalendar."""
126 old_time_period = self.time_periods[time_period_index]
128 if time_period.is_empty:
129 # The old period was only one day long, so we can just remove it
130 if old_time_period.from_ == old_time_period.to:
131 return replace(
132 self,
133 time_periods=self.time_periods[:time_period_index]
134 + self.time_periods[time_period_index + 1 :],
135 )
136 else:
137 # The old period was multiple days long, so we need to split it
138 # and only remove the correct day
139 new_time_periods = [
140 tp for tp in old_time_period.split_by_day if tp.from_ != time_period.from_
141 ]
142 return replace(
143 self,
144 time_periods=self.time_periods[:time_period_index]
145 + new_time_periods
146 + self.time_periods[time_period_index + 1 :],
147 )
148 if old_time_period.from_ != time_period.from_ or old_time_period.to != time_period.to:
149 # If the days are different, we need to split the time
150 # periods by day, and only replace the time period
151 # for the correct day.
152 new_time_periods = time_period.split_by_day
153 old_time_periods = old_time_period.split_by_day
155 combined_time_periods = new_time_periods + [
156 tp
157 for tp in old_time_periods
158 if not is_day_in_range(tp.from_, time_period.from_, time_period.to)
159 ]
161 return replace(self, time_periods=combined_time_periods)
163 return replace(
164 self,
165 time_periods=self.time_periods[:time_period_index]
166 + [time_period]
167 + self.time_periods[time_period_index + 1 :],
168 )
170 @cached_property
171 def bitmasks_by_day(self) -> list[tuple[DAY, int]]:
172 """Split the time periods by day and convert them to bitmasks.
174 NOTE: This does not join overlapping/adjacent time periods.
175 """
176 return [(tp.from_, tp.to_bitmask()) for tp in self.split_time_periods_by_day()]
178 def __str__(self) -> str:
179 """Get a string representation of the calendar."""
180 return f"ResourceCalendar(id={self.id},\n" + ",\t\n".join(map(str, self.time_periods)) + "\t\n)"
182 def get_time_periods_of_length_excl_idle(
183 self,
184 day: DAY,
185 length: int,
186 start_time: int,
187 last_start_time: int,
188 ) -> list["TimePeriod"]:
189 """Get all time periods of a specific length.
191 The time-periods will ignore any idle time.
192 The result will be sorted by length, with shortest first,
193 thereby sorting it by least idle time first.
194 """
195 bitmask = self.work_masks.get(day) or 0
197 # # TODO Think about this
198 # if last_start_time + length > 24:
199 # bitmask_tomorrow = bitmask_by_day.get(day.next_day()) or 0
200 # bitmask = bitmask << 24 | bitmask_tomorrow
202 if bitmask == 0:
203 return []
205 ranges = find_mixed_ranges_in_bitmask(bitmask, length, start_time, last_start_time)
207 ranges_sorted = sorted(ranges, key=lambda r: r[1] - r[0])
209 return [TimePeriod.from_start_end(start, end, day) for start, end in ranges_sorted]