Coverage for o2/models/timetable/time_period.py: 96%
129 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import functools
2from json import dumps, loads
3from typing import Any, ClassVar, Optional, Union
5from pydantic import BaseModel, Field, model_validator
7from o2.models.days import DAY, day_range
8from o2.util.bit_mask_helper import get_ranges_from_bitmask
9from o2.util.helper import hash_string
12class TimePeriod(BaseModel):
13 """A Time Period in a resource calendar.
15 Because `from` is a reserved keyword in Python, we use `from_` instead,
16 this also means that we need to use pydantic directly instead of JSONWizard.
17 """
19 from_: "DAY" = Field(...)
20 """The start of the time period (day, uppercase, e.g. MONDAY)"""
22 to: "DAY" = Field(...)
23 """The end of the time period (day, uppercase, e.g. FRIDAY)"""
25 begin_time: str = Field(...)
26 """The start time of the time period (24h format, e.g. 08:00)"""
27 end_time: str = Field(...)
28 """The end time of the time period (24h format, e.g. 17:00)"""
30 probability: Optional[float] = Field(default=None)
31 """The probability of the time period."""
33 class Config: # noqa: D106
34 frozen = True
36 # Override model_dump for custom dictionary serialization
37 def model_dump(self, **kwargs): # noqa: ANN201, D102, ANN003
38 # Get the default dictionary
39 data = super().model_dump(**kwargs)
40 # Replace Python-safe names with JSON-friendly aliases
41 data["from"] = data.pop("from_")
42 data["beginTime"] = data.pop("begin_time")
43 data["endTime"] = data.pop("end_time")
44 return data
46 # Override model_dump_json for custom JSON serialization
47 def model_dump_json(self, **kwargs: Any): # noqa: ANN201, D102, ANN401
48 # Serialize using model_dump and convert to JSON string
49 return dumps(self.model_dump(**kwargs))
51 # Custom deserialization from dictionary
52 @classmethod
53 def model_validate( # noqa: ANN206, D102
54 cls, # noqa: ANN102
55 obj: Any, # noqa: ANN401
56 *,
57 strict: Union[bool, None] = None,
58 from_attributes: Union[bool, None] = None,
59 context: Union[Any, None] = None, # noqa: ANN401
60 ):
61 # Convert JSON keys back to Python attribute names
62 if "from" in obj:
63 obj["from_"] = obj.pop("from")
64 if "beginTime" in obj:
65 obj["begin_time"] = obj.pop("beginTime")
66 if "endTime" in obj:
67 obj["end_time"] = obj.pop("endTime")
68 return super().model_validate(obj)
70 # Custom deserialization from JSON
71 @classmethod
72 def model_validate_json(cls, json_data, **kwargs): # noqa: ANN206, D102, ANN001, ANN102, ANN003
73 # Use model_validate after parsing the JSON data
74 data = loads(json_data)
75 return cls.model_validate(data, **kwargs)
77 @model_validator(mode="before")
78 def handle_aliases(cls, values: dict[str, Any]) -> dict[str, Any]: # noqa: N805
79 """Handle field aliases for compatibility with different naming conventions.
81 Maps alternative field names to their standardized counterparts.
82 """
83 # Handle aliasing for 'from', 'beginTime', and 'endTime'
84 if "from" in values:
85 values["from_"] = values.pop("from")
86 if "beginTime" in values:
87 values["begin_time"] = values.pop("beginTime")
88 if "endTime" in values:
89 values["end_time"] = values.pop("endTime")
90 return values
92 ALL_DAY_BITMASK: ClassVar[int] = 0b111111111111111111111111
94 @property
95 def begin_time_hour(self) -> int:
96 """Get the start time hour."""
97 return int(self.begin_time.split(":")[0])
99 @property
100 def end_time_hour(self) -> int:
101 """Get the end time hour."""
102 return int(self.end_time.split(":")[0])
104 @property
105 def begin_time_minute(self) -> int:
106 """Get the start time minute."""
107 return int(self.begin_time.split(":")[1])
109 @property
110 def begin_time_second(self) -> int:
111 """Get the start time second."""
112 spited = self.begin_time.split(":")
113 if len(spited) == 3:
114 return int(spited[2])
115 return 0
117 @property
118 def end_time_second(self) -> int:
119 """Get the end time second."""
120 spited = self.end_time.split(":")
121 if len(spited) == 3:
122 return int(spited[2])
123 return 0
125 @property
126 def end_time_minute(self) -> int:
127 """Get the end time minute."""
128 return int(self.end_time.split(":")[1])
130 @property
131 def duration(self) -> int:
132 """Get the duration of the time period in hours."""
133 return self.end_time_hour - self.begin_time_hour
135 @property
136 def is_empty(self) -> bool:
137 """Check if the time period is empty."""
138 return self.begin_time == self.end_time
140 def add_hours_before(self, hours: int) -> Optional["TimePeriod"]:
141 """Get new TimePeriod with hours added before."""
142 return self._modify(add_start=hours)
144 def add_hours_after(self, hours: int) -> Optional["TimePeriod"]:
145 """Get new TimePeriod with hours added after."""
146 return self._modify(add_end=hours)
148 def shift_hours(self, hours: int) -> Optional["TimePeriod"]:
149 """Get new TimePeriod with hours shifted.
151 If hours is positive, the period is shifted forward.
152 (Begins later and ends later)
153 """
154 return self._modify(add_start=-hours, add_end=hours)
156 def _modify(self, add_start: int = 0, add_end: int = 0) -> Optional["TimePeriod"]:
157 new_begin = self.begin_time_hour - add_start
158 new_end = self.end_time_hour + add_end
159 if new_begin < 0 or new_begin >= 24 or new_end < 0 or new_end >= 24:
160 return None
162 new_begin_time = f"{new_begin:02}:{self.begin_time_minute:02}:{self.begin_time_second:02}"
163 new_end_time = f"{new_end:02}:{self.end_time_minute:02}:{self.end_time_second:02}"
165 new_period = TimePeriod(
166 from_=self.from_,
167 to=self.to,
168 begin_time=new_begin_time,
169 end_time=new_end_time,
170 probability=self.probability,
171 )
172 return new_period
174 @functools.cached_property
175 def split_by_day(self) -> list["TimePeriod"]:
176 """Split the time period by day.
178 Return a list of time periods, one for each day in the range.
179 """
180 if self.is_empty:
181 return []
182 if self.from_ == self.to:
183 return [self]
184 return [
185 TimePeriod(
186 from_=day,
187 to=day,
188 begin_time=self.begin_time,
189 end_time=self.end_time,
190 probability=self.probability,
191 )
192 for day in day_range(self.from_, self.to)
193 ]
195 def to_bitmask(self) -> int:
196 """Get a bitmask for the time period.
198 Each bit represents an hour in the day.
199 The left most bit represents the first hour of the day.
200 The right most bit represents the last hour of the day.
201 Of course this only includes one day.
202 """
203 bitarray = [0] * 24
204 end = self.end_time_hour
205 if self.end_time_minute > 0 or self.end_time_second > 0:
206 end += 1
207 for i in range(self.begin_time_hour, end):
208 bitarray[i] = 1
209 return int("".join(map(str, bitarray)), 2)
211 def __repr__(self) -> str:
212 """Create a string representation of the TimePeriod.
214 Returns a formatted string showing the day and time information.
215 """
216 return f"TimePeriod({self.from_},{self.begin_time} -> {self.to},{self.end_time})"
218 @staticmethod
219 def from_bitmask(bitmask: int, day: "DAY") -> list["TimePeriod"]:
220 """Create a time period from a bitmask."""
221 hour_ranges = get_ranges_from_bitmask(bitmask)
222 return [
223 TimePeriod(
224 from_=day,
225 to=day,
226 begin_time=f"{start:02}:00",
227 end_time=f"{end:02}:00",
228 )
229 for start, end in hour_ranges
230 ]
232 @staticmethod
233 def from_start_end(start: int, end: int, day: "DAY" = DAY.MONDAY) -> "TimePeriod":
234 """Create a time period from a start and end time."""
235 end_time = f"{end:02}:00:00"
236 if end > 23 or end == 0:
237 end_time = "23:59:59"
238 return TimePeriod(
239 from_=day,
240 to=day,
241 begin_time=f"{start:02}:00:00",
242 end_time=end_time,
243 )
245 @staticmethod
246 def empty(day: "DAY" = DAY.MONDAY) -> "TimePeriod":
247 """Create an empty TimePeriod for the specified day.
249 Returns a TimePeriod with default values for the given day.
250 """
251 return TimePeriod(
252 from_=day,
253 to=day,
254 begin_time="00:00:00",
255 end_time="00:00:00",
256 )
258 @functools.cached_property
259 def id(self) -> str:
260 """A unique identifier for the time period."""
261 return hash_string(self.model_dump_json())