Coverage for o2/models/timetable/time_period.py: 96%

129 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import functools 

2from json import dumps, loads 

3from typing import Any, ClassVar, Optional, Union 

4 

5from pydantic import BaseModel, Field, model_validator 

6 

7from o2.models.days import DAY, day_range 

8from o2.util.bit_mask_helper import get_ranges_from_bitmask 

9from o2.util.helper import hash_string 

10 

11 

12class TimePeriod(BaseModel): 

13 """A Time Period in a resource calendar. 

14 

15 Because `from` is a reserved keyword in Python, we use `from_` instead, 

16 this also means that we need to use pydantic directly instead of JSONWizard. 

17 """ 

18 

19 from_: "DAY" = Field(...) 

20 """The start of the time period (day, uppercase, e.g. MONDAY)""" 

21 

22 to: "DAY" = Field(...) 

23 """The end of the time period (day, uppercase, e.g. FRIDAY)""" 

24 

25 begin_time: str = Field(...) 

26 """The start time of the time period (24h format, e.g. 08:00)""" 

27 end_time: str = Field(...) 

28 """The end time of the time period (24h format, e.g. 17:00)""" 

29 

30 probability: Optional[float] = Field(default=None) 

31 """The probability of the time period.""" 

32 

33 class Config: # noqa: D106 

34 frozen = True 

35 

36 # Override model_dump for custom dictionary serialization 

37 def model_dump(self, **kwargs): # noqa: ANN201, D102, ANN003 

38 # Get the default dictionary 

39 data = super().model_dump(**kwargs) 

40 # Replace Python-safe names with JSON-friendly aliases 

41 data["from"] = data.pop("from_") 

42 data["beginTime"] = data.pop("begin_time") 

43 data["endTime"] = data.pop("end_time") 

44 return data 

45 

46 # Override model_dump_json for custom JSON serialization 

47 def model_dump_json(self, **kwargs: Any): # noqa: ANN201, D102, ANN401 

48 # Serialize using model_dump and convert to JSON string 

49 return dumps(self.model_dump(**kwargs)) 

50 

51 # Custom deserialization from dictionary 

52 @classmethod 

53 def model_validate( # noqa: ANN206, D102 

54 cls, # noqa: ANN102 

55 obj: Any, # noqa: ANN401 

56 *, 

57 strict: Union[bool, None] = None, 

58 from_attributes: Union[bool, None] = None, 

59 context: Union[Any, None] = None, # noqa: ANN401 

60 ): 

61 # Convert JSON keys back to Python attribute names 

62 if "from" in obj: 

63 obj["from_"] = obj.pop("from") 

64 if "beginTime" in obj: 

65 obj["begin_time"] = obj.pop("beginTime") 

66 if "endTime" in obj: 

67 obj["end_time"] = obj.pop("endTime") 

68 return super().model_validate(obj) 

69 

70 # Custom deserialization from JSON 

71 @classmethod 

72 def model_validate_json(cls, json_data, **kwargs): # noqa: ANN206, D102, ANN001, ANN102, ANN003 

73 # Use model_validate after parsing the JSON data 

74 data = loads(json_data) 

75 return cls.model_validate(data, **kwargs) 

76 

77 @model_validator(mode="before") 

78 def handle_aliases(cls, values: dict[str, Any]) -> dict[str, Any]: # noqa: N805 

79 """Handle field aliases for compatibility with different naming conventions. 

80 

81 Maps alternative field names to their standardized counterparts. 

82 """ 

83 # Handle aliasing for 'from', 'beginTime', and 'endTime' 

84 if "from" in values: 

85 values["from_"] = values.pop("from") 

86 if "beginTime" in values: 

87 values["begin_time"] = values.pop("beginTime") 

88 if "endTime" in values: 

89 values["end_time"] = values.pop("endTime") 

90 return values 

91 

92 ALL_DAY_BITMASK: ClassVar[int] = 0b111111111111111111111111 

93 

94 @property 

95 def begin_time_hour(self) -> int: 

96 """Get the start time hour.""" 

97 return int(self.begin_time.split(":")[0]) 

98 

99 @property 

100 def end_time_hour(self) -> int: 

101 """Get the end time hour.""" 

102 return int(self.end_time.split(":")[0]) 

103 

104 @property 

105 def begin_time_minute(self) -> int: 

106 """Get the start time minute.""" 

107 return int(self.begin_time.split(":")[1]) 

108 

109 @property 

110 def begin_time_second(self) -> int: 

111 """Get the start time second.""" 

112 spited = self.begin_time.split(":") 

113 if len(spited) == 3: 

114 return int(spited[2]) 

115 return 0 

116 

117 @property 

118 def end_time_second(self) -> int: 

119 """Get the end time second.""" 

120 spited = self.end_time.split(":") 

121 if len(spited) == 3: 

122 return int(spited[2]) 

123 return 0 

124 

125 @property 

126 def end_time_minute(self) -> int: 

127 """Get the end time minute.""" 

128 return int(self.end_time.split(":")[1]) 

129 

130 @property 

131 def duration(self) -> int: 

132 """Get the duration of the time period in hours.""" 

133 return self.end_time_hour - self.begin_time_hour 

134 

135 @property 

136 def is_empty(self) -> bool: 

137 """Check if the time period is empty.""" 

138 return self.begin_time == self.end_time 

139 

140 def add_hours_before(self, hours: int) -> Optional["TimePeriod"]: 

141 """Get new TimePeriod with hours added before.""" 

142 return self._modify(add_start=hours) 

143 

144 def add_hours_after(self, hours: int) -> Optional["TimePeriod"]: 

145 """Get new TimePeriod with hours added after.""" 

146 return self._modify(add_end=hours) 

147 

148 def shift_hours(self, hours: int) -> Optional["TimePeriod"]: 

149 """Get new TimePeriod with hours shifted. 

150 

151 If hours is positive, the period is shifted forward. 

152 (Begins later and ends later) 

153 """ 

154 return self._modify(add_start=-hours, add_end=hours) 

155 

156 def _modify(self, add_start: int = 0, add_end: int = 0) -> Optional["TimePeriod"]: 

157 new_begin = self.begin_time_hour - add_start 

158 new_end = self.end_time_hour + add_end 

159 if new_begin < 0 or new_begin >= 24 or new_end < 0 or new_end >= 24: 

160 return None 

161 

162 new_begin_time = f"{new_begin:02}:{self.begin_time_minute:02}:{self.begin_time_second:02}" 

163 new_end_time = f"{new_end:02}:{self.end_time_minute:02}:{self.end_time_second:02}" 

164 

165 new_period = TimePeriod( 

166 from_=self.from_, 

167 to=self.to, 

168 begin_time=new_begin_time, 

169 end_time=new_end_time, 

170 probability=self.probability, 

171 ) 

172 return new_period 

173 

174 @functools.cached_property 

175 def split_by_day(self) -> list["TimePeriod"]: 

176 """Split the time period by day. 

177 

178 Return a list of time periods, one for each day in the range. 

179 """ 

180 if self.is_empty: 

181 return [] 

182 if self.from_ == self.to: 

183 return [self] 

184 return [ 

185 TimePeriod( 

186 from_=day, 

187 to=day, 

188 begin_time=self.begin_time, 

189 end_time=self.end_time, 

190 probability=self.probability, 

191 ) 

192 for day in day_range(self.from_, self.to) 

193 ] 

194 

195 def to_bitmask(self) -> int: 

196 """Get a bitmask for the time period. 

197 

198 Each bit represents an hour in the day. 

199 The left most bit represents the first hour of the day. 

200 The right most bit represents the last hour of the day. 

201 Of course this only includes one day. 

202 """ 

203 bitarray = [0] * 24 

204 end = self.end_time_hour 

205 if self.end_time_minute > 0 or self.end_time_second > 0: 

206 end += 1 

207 for i in range(self.begin_time_hour, end): 

208 bitarray[i] = 1 

209 return int("".join(map(str, bitarray)), 2) 

210 

211 def __repr__(self) -> str: 

212 """Create a string representation of the TimePeriod. 

213 

214 Returns a formatted string showing the day and time information. 

215 """ 

216 return f"TimePeriod({self.from_},{self.begin_time} -> {self.to},{self.end_time})" 

217 

218 @staticmethod 

219 def from_bitmask(bitmask: int, day: "DAY") -> list["TimePeriod"]: 

220 """Create a time period from a bitmask.""" 

221 hour_ranges = get_ranges_from_bitmask(bitmask) 

222 return [ 

223 TimePeriod( 

224 from_=day, 

225 to=day, 

226 begin_time=f"{start:02}:00", 

227 end_time=f"{end:02}:00", 

228 ) 

229 for start, end in hour_ranges 

230 ] 

231 

232 @staticmethod 

233 def from_start_end(start: int, end: int, day: "DAY" = DAY.MONDAY) -> "TimePeriod": 

234 """Create a time period from a start and end time.""" 

235 end_time = f"{end:02}:00:00" 

236 if end > 23 or end == 0: 

237 end_time = "23:59:59" 

238 return TimePeriod( 

239 from_=day, 

240 to=day, 

241 begin_time=f"{start:02}:00:00", 

242 end_time=end_time, 

243 ) 

244 

245 @staticmethod 

246 def empty(day: "DAY" = DAY.MONDAY) -> "TimePeriod": 

247 """Create an empty TimePeriod for the specified day. 

248 

249 Returns a TimePeriod with default values for the given day. 

250 """ 

251 return TimePeriod( 

252 from_=day, 

253 to=day, 

254 begin_time="00:00:00", 

255 end_time="00:00:00", 

256 ) 

257 

258 @functools.cached_property 

259 def id(self) -> str: 

260 """A unique identifier for the time period.""" 

261 return hash_string(self.model_dump_json())