Coverage for o2/models/timetable/resource_calendar.py: 95%

92 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import operator 

2from collections.abc import Iterator 

3from dataclasses import dataclass, replace 

4from functools import cached_property, reduce 

5from itertools import groupby 

6from typing import Optional 

7 

8from dataclass_wizard import JSONWizard 

9 

10from o2.models.days import DAY, is_day_in_range 

11from o2.models.legacy_constraints import WorkMasks 

12from o2.models.timetable.time_period import TimePeriod 

13from o2.util.bit_mask_helper import any_has_overlap, find_mixed_ranges_in_bitmask 

14from o2.util.custom_dumper import CustomDumper, CustomLoader 

15from o2.util.helper import hash_int 

16 

17 

18@dataclass(frozen=True) 

19class ResourceCalendar(JSONWizard, CustomLoader, CustomDumper): 

20 """Defines a calendar of available time periods for a resource.""" 

21 

22 id: str 

23 name: str 

24 time_periods: list["TimePeriod"] 

25 workload_ratio: Optional[list["TimePeriod"]] = None 

26 

27 def is_valid(self) -> bool: 

28 """Check if the calendar is valid. 

29 

30 The calendar is valid if all time periods have a begin time before the end time. 

31 And if the time periods are not overlapping. 

32 """ 

33 # We do not check for valid time periods if the time periods have a probability 

34 if any(tp.probability is not None for tp in self.time_periods): 

35 return True 

36 

37 grouped_time_periods = self.split_group_by_day() 

38 for _, time_periods_iter in grouped_time_periods: 

39 time_periods = list(time_periods_iter) 

40 for tp in time_periods: 

41 if tp.begin_time >= tp.end_time: 

42 return False 

43 

44 bitmasks = [tp.to_bitmask() for tp in time_periods] 

45 if any_has_overlap(bitmasks): 

46 return False 

47 return True 

48 

49 def split_group_by_day(self) -> Iterator[tuple[DAY, Iterator["TimePeriod"]]]: 

50 """Split the time periods by day.""" 

51 return groupby(self.split_time_periods_by_day(), key=lambda tp: tp.from_) 

52 

53 def split_time_periods_by_day(self) -> list["TimePeriod"]: 

54 """Split the time periods by day and sort them.""" 

55 return sorted( 

56 (tp for tp in self.time_periods for tp in tp.split_by_day), 

57 key=lambda tp: tp.from_, 

58 ) 

59 

60 def get_periods_for_day(self, day: DAY) -> list["TimePeriod"]: 

61 """Get the time periods for a specific day.""" 

62 return [tp for tp in self.split_time_periods_by_day() if tp.from_ == day] 

63 

64 def get_periods_containing_day(self, day: DAY) -> list["TimePeriod"]: 

65 """Get the time periods that contain a specific day.""" 

66 return [tp for tp in self.time_periods if is_day_in_range(day, tp.from_, tp.to)] 

67 

68 @cached_property 

69 def work_masks(self) -> WorkMasks: 

70 """Convert the calendar to work masks.""" 

71 days = { 

72 f"{day.name.lower()}": reduce(operator.or_, [tp.to_bitmask() for tp in time_periods]) 

73 for day, time_periods in self.split_group_by_day() 

74 } 

75 return WorkMasks(**days) 

76 

77 def get_period_index_by_id(self, period_id: str) -> Optional[int]: 

78 """Get the index of a period by period id.""" 

79 for i, tp in enumerate(self.time_periods): 

80 if tp.id == period_id: 

81 return i 

82 return None 

83 

84 @property 

85 def total_hours(self) -> int: 

86 """Get the total number of hours in the calendar.""" 

87 return sum((tp.end_time_hour - tp.begin_time_hour) for tp in self.split_time_periods_by_day()) 

88 

89 @property 

90 def max_consecutive_hours(self) -> int: 

91 """Get the maximum number of continuous hours in the calendar.""" 

92 return max((tp.end_time_hour - tp.begin_time_hour) for tp in self.time_periods) 

93 

94 @property 

95 def max_periods_per_day(self) -> int: 

96 """Get the maximum number of periods in a day.""" 

97 return max(len(list(tp.split_by_day)) for tp in self.time_periods) 

98 

99 @property 

100 def max_hours_per_day(self) -> int: 

101 """Get the maximum number of hours in a day.""" 

102 return max( 

103 sum(tp.end_time_hour - tp.begin_time_hour for tp in time_periods) 

104 for _, time_periods in self.split_group_by_day() 

105 ) 

106 

107 @property 

108 def total_periods(self) -> int: 

109 """Get the total number of shifts in the calendar.""" 

110 return len(self.split_time_periods_by_day()) 

111 

112 @cached_property 

113 def uid(self) -> int: 

114 """Get a unique identifier for the calendar.""" 

115 return hash_int(self.to_json()) 

116 

117 def __hash__(self) -> int: 

118 """Return a hash value for this calendar. 

119 

120 Uses the cached uid property for efficient hashing. 

121 """ 

122 return self.uid 

123 

124 def replace_time_period(self, time_period_index: int, time_period: "TimePeriod") -> "ResourceCalendar": 

125 """Replace a time period. Returns a new ResourceCalendar.""" 

126 old_time_period = self.time_periods[time_period_index] 

127 

128 if time_period.is_empty: 

129 # The old period was only one day long, so we can just remove it 

130 if old_time_period.from_ == old_time_period.to: 

131 return replace( 

132 self, 

133 time_periods=self.time_periods[:time_period_index] 

134 + self.time_periods[time_period_index + 1 :], 

135 ) 

136 else: 

137 # The old period was multiple days long, so we need to split it 

138 # and only remove the correct day 

139 new_time_periods = [ 

140 tp for tp in old_time_period.split_by_day if tp.from_ != time_period.from_ 

141 ] 

142 return replace( 

143 self, 

144 time_periods=self.time_periods[:time_period_index] 

145 + new_time_periods 

146 + self.time_periods[time_period_index + 1 :], 

147 ) 

148 if old_time_period.from_ != time_period.from_ or old_time_period.to != time_period.to: 

149 # If the days are different, we need to split the time 

150 # periods by day, and only replace the time period 

151 # for the correct day. 

152 new_time_periods = time_period.split_by_day 

153 old_time_periods = old_time_period.split_by_day 

154 

155 combined_time_periods = new_time_periods + [ 

156 tp 

157 for tp in old_time_periods 

158 if not is_day_in_range(tp.from_, time_period.from_, time_period.to) 

159 ] 

160 

161 return replace(self, time_periods=combined_time_periods) 

162 

163 return replace( 

164 self, 

165 time_periods=self.time_periods[:time_period_index] 

166 + [time_period] 

167 + self.time_periods[time_period_index + 1 :], 

168 ) 

169 

170 @cached_property 

171 def bitmasks_by_day(self) -> list[tuple[DAY, int]]: 

172 """Split the time periods by day and convert them to bitmasks. 

173 

174 NOTE: This does not join overlapping/adjacent time periods. 

175 """ 

176 return [(tp.from_, tp.to_bitmask()) for tp in self.split_time_periods_by_day()] 

177 

178 def __str__(self) -> str: 

179 """Get a string representation of the calendar.""" 

180 return f"ResourceCalendar(id={self.id},\n" + ",\t\n".join(map(str, self.time_periods)) + "\t\n)" 

181 

182 def get_time_periods_of_length_excl_idle( 

183 self, 

184 day: DAY, 

185 length: int, 

186 start_time: int, 

187 last_start_time: int, 

188 ) -> list["TimePeriod"]: 

189 """Get all time periods of a specific length. 

190 

191 The time-periods will ignore any idle time. 

192 The result will be sorted by length, with shortest first, 

193 thereby sorting it by least idle time first. 

194 """ 

195 bitmask = self.work_masks.get(day) or 0 

196 

197 # # TODO Think about this 

198 # if last_start_time + length > 24: 

199 # bitmask_tomorrow = bitmask_by_day.get(day.next_day()) or 0 

200 # bitmask = bitmask << 24 | bitmask_tomorrow 

201 

202 if bitmask == 0: 

203 return [] 

204 

205 ranges = find_mixed_ranges_in_bitmask(bitmask, length, start_time, last_start_time) 

206 

207 ranges_sorted = sorted(ranges, key=lambda r: r[1] - r[0]) 

208 

209 return [TimePeriod.from_start_end(start, end, day) for start, end in ranges_sorted]