Coverage for o2/models/evaluation.py: 95%

359 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import math 

2from collections import Counter 

3from dataclasses import dataclass 

4from functools import cached_property, reduce 

5from typing import TYPE_CHECKING, Callable, cast 

6 

7import pandas as pd 

8from prosimos.execution_info import TaskEvent, Trace 

9from prosimos.simulation_stats_calculator import ( 

10 KPIMap, 

11 ResourceKPI, 

12) 

13 

14from o2.models.days import DAY 

15from o2.models.settings import CostType, Settings 

16from o2.simulation_runner import RunSimulationResult 

17from o2.util.waiting_time_helper import ( 

18 BatchInfo, 

19 BatchInfoKey, 

20 SimpleBatchInfo, 

21 get_batches_from_event_log, 

22) 

23 

24if TYPE_CHECKING: 

25 pass 

26 

27 

28HourlyRates = dict[str, int] 

29 

30 

31@dataclass(frozen=True) 

32class Evaluation: 

33 """An evaluation of a simulation run. 

34 

35 It's a wrapper for the result classes of a PROSIMOS simulation run, 

36 with a lot of useful getters and methods to analyze the results. 

37 """ 

38 

39 hourly_rates: HourlyRates 

40 

41 task_kpis: dict[str, KPIMap] 

42 resource_kpis: dict[str, ResourceKPI] 

43 

44 avg_cycle_time_by_case: float 

45 """Get the mean cycle time of the simulation.""" 

46 

47 avg_waiting_time_by_case: float 

48 """Get the average waiting time of the simulation.""" 

49 

50 avg_batching_waiting_time_by_case: float 

51 """Get the average batching waiting time per case.""" 

52 

53 avg_batching_waiting_time_per_task: dict[str, float] 

54 """Get the average batching waiting time per task.""" 

55 total_batching_waiting_time_per_task: dict[str, float] 

56 """Get the total batching waiting time per task (all cases).""" 

57 

58 total_duration: float 

59 """Get the total duration (processing + idle) of the simulation.""" 

60 

61 sum_of_durations: float 

62 """Get the sum of all task durations of the simulation.""" 

63 

64 sum_of_cycle_times: float 

65 """Get the sum of all task cycle times of the simulation.""" 

66 

67 total_batching_waiting_time: float 

68 """Get the total batching waiting time of the simulation (all cases).""" 

69 

70 total_batching_waiting_time_per_resource: dict[str, float] 

71 """Get the total batching waiting time of the simulation per resource.""" 

72 

73 total_cycle_time: float 

74 """Get the total cycle time of the simulation.""" 

75 

76 total_processing_time: float 

77 """Get the total processing time of the simulation.""" 

78 

79 total_waiting_time: float 

80 """Get the total waiting time of the simulation.""" 

81 

82 task_execution_count_by_resource: dict[str, dict[str, int]] 

83 """Get the number of times each task was executed by a given resource. 

84 

85 E.g. task_execution_count_by_resource["resource_id"]["task_id"] 

86 """ 

87 

88 is_empty: bool 

89 """Is this evaluation based on an empty simulation run?""" 

90 

91 task_execution_count_with_wt_or_it: dict[str, int] 

92 """Get the count each task was executed with a waiting or idle time.""" 

93 

94 task_execution_counts: dict[str, int] 

95 """Get the count each task was executed""" 

96 

97 task_enablement_weekdays: dict[str, dict[DAY, dict[int, int]]] 

98 """Get the weekdays & hours on which a task was enabled.""" 

99 

100 task_started_weekdays: dict[str, dict[DAY, dict[int, int]]] 

101 """Get the weekdays & hours on which a task was started.""" 

102 

103 resource_allocation_ratio_task: dict[str, float] 

104 """Get the allocation ratio of each task.""" 

105 

106 total_fixed_cost_by_task: dict[str, float] 

107 """Get the total fixed cost of each task.""" 

108 avg_fixed_cost_per_case: float 

109 """Get the average fixed cost per case.""" 

110 

111 batches_by_activity_with_idle: dict[str, list[SimpleBatchInfo]] 

112 """Get the batches grouped by activity, only including those with idle time.""" 

113 

114 avg_batch_size_for_batch_enabled_tasks: float 

115 """Get the average batch size over all batches.""" 

116 

117 avg_batch_size_per_task: dict[str, float] 

118 """Get the average batch size per task.""" 

119 

120 avg_idle_wt_per_task_instance: float 

121 """Get the average idle waiting time per task instance.""" 

122 

123 avg_batch_processing_time_per_task_instance: float 

124 """Get the average batch processing time per task instance. 

125 

126 Pseudo-Code: 

127 sum(batch.processing_time for batch in batches) / sum(batch.size for batch in batches) 

128 """ 

129 

130 resource_started_weekdays: dict[str, dict[DAY, dict[int, int]]] 

131 """Get the weekdays & hours on which a resource started any task.""" 

132 

133 tasks_by_number_of_duplicate_enablement_dates: dict[str, int] 

134 """Get the tasks sorted by the number of duplicate enablement dates.""" 

135 

136 @cached_property 

137 def total_processing_cost_for_tasks(self) -> float: 

138 """Get the total cost of all tasks.""" 

139 return sum( 

140 map( 

141 lambda task_kpi: task_kpi.cost.total, 

142 self.task_kpis.values(), 

143 ) 

144 ) 

145 

146 @cached_property 

147 def total_cost_for_worked_time(self) -> float: 

148 """Get the total flexible cost of the simulation. 

149 

150 This takes the worked time and the resource cost per hour into account. 

151 It will therefore give you a "realistic" of hiring the resources for the 

152 duration of the simulation. 

153 """ 

154 return sum( 

155 (resource_kpi.worked_time / (60 * 60)) * self.hourly_rates[resource_id] 

156 for resource_id, resource_kpi in self.resource_kpis.items() 

157 ) 

158 

159 @cached_property 

160 def total_cost_for_available_time(self) -> float: 

161 """Get the cost of the resources for the worked time. 

162 

163 Aka the cost you had if the resource calender would exactly match the 

164 worked time. 

165 """ 

166 return sum( 

167 (resource_kpi.available_time / (60 * 60)) * self.hourly_rates[resource_id] 

168 for resource_id, resource_kpi in self.resource_kpis.items() 

169 ) 

170 

171 @cached_property 

172 def avg_cost_by_case(self) -> float: 

173 """Get the average cost sum of all tasks.""" 

174 return sum( 

175 map( 

176 lambda task_kpi: task_kpi.cost.avg, 

177 self.task_kpis.values(), 

178 ) 

179 ) 

180 

181 @cached_property 

182 def avg_resource_utilization_by_case(self) -> float: 

183 """Get the average resource utilization of the simulation.""" 

184 return reduce(lambda x, y: x + y, self.resource_utilizations.values()) / len( 

185 self.resource_utilizations 

186 ) 

187 

188 @cached_property 

189 def resource_worked_times(self) -> dict[str, float]: 

190 """Get the worked time of all resources.""" 

191 return { 

192 resource_id: resource_kpi.worked_time for resource_id, resource_kpi in self.resource_kpis.items() 

193 } 

194 

195 @cached_property 

196 def resource_available_times(self) -> dict[str, float]: 

197 """Get the availability of all resources.""" 

198 return { 

199 resource_id: resource_kpi.available_time 

200 for resource_id, resource_kpi in self.resource_kpis.items() 

201 } 

202 

203 @cached_property 

204 def resource_utilizations(self) -> dict[str, float]: 

205 """Get the utilization of all resources.""" 

206 return { 

207 resource_id: resource_kpi.utilization for resource_id, resource_kpi in self.resource_kpis.items() 

208 } 

209 

210 @cached_property 

211 def total_fixed_cost(self) -> float: 

212 """Get the total fixed cost of the simulation.""" 

213 return sum(self.total_fixed_cost_by_task.values()) 

214 

215 @cached_property 

216 def total_cost(self) -> float: 

217 """Get the total cost of the simulation.""" 

218 return self.total_cost_for_worked_time + self.total_fixed_cost 

219 

220 @cached_property 

221 def total_resource_idle_time(self) -> float: 

222 """Get the total resource idle time of the simulation. 

223 

224 This is calculated by summing up the difference worked time and available time 

225 for all resources. 

226 """ 

227 return sum( 

228 resource_kpi.worked_time - resource_kpi.available_time 

229 for resource_kpi in self.resource_kpis.values() 

230 ) 

231 

232 @cached_property 

233 def total_task_idle_time(self) -> float: 

234 """Get the total task idle time of the simulation.""" 

235 return sum(task_kpi.idle_time.total for task_kpi in self.task_kpis.values()) 

236 

237 @property 

238 def pareto_x(self) -> float: 

239 """Get the cost used for positioning the evaluation in the pareto front. 

240 

241 NOTE: This is depended on the global, static setting found in `Settings.COST_TYPE` 

242 """ 

243 if Settings.COST_TYPE == CostType.FIXED_COST: 

244 return float(self.total_fixed_cost) 

245 elif Settings.COST_TYPE == CostType.RESOURCE_COST: 

246 return float(self.total_cost_for_available_time) 

247 elif Settings.COST_TYPE == CostType.TOTAL_COST: 

248 return float(self.total_cost_for_worked_time) 

249 elif Settings.COST_TYPE == CostType.WAITING_TIME_AND_PROCESSING_TIME: 

250 return float(self.total_processing_time) 

251 elif Settings.COST_TYPE == CostType.AVG_WT_AND_PT_PER_TASK_INSTANCE: 

252 return float(self.avg_batch_processing_time_per_task_instance) 

253 raise ValueError(f"Unknown cost type: {Settings.COST_TYPE}") 

254 

255 @property 

256 def pareto_y(self) -> float: 

257 """Get the duration used for positioning the evaluation in the pareto front.""" 

258 if Settings.COST_TYPE == CostType.WAITING_TIME_AND_PROCESSING_TIME: 

259 return float(self.total_waiting_time + self.total_task_idle_time) 

260 elif Settings.COST_TYPE == CostType.AVG_WT_AND_PT_PER_TASK_INSTANCE: 

261 return float(self.avg_idle_wt_per_task_instance) 

262 return float(self.total_duration) 

263 

264 def get_avg_waiting_time_of_task_id(self, task_id: str) -> float: 

265 """Get the average waiting time of a task.""" 

266 return self.task_kpis[task_id].waiting_time.avg 

267 

268 def get_total_waiting_time_of_task_id(self, task_id: str) -> float: 

269 """Get the total waiting time of a task.""" 

270 return self.task_kpis[task_id].waiting_time.total 

271 

272 def get_max_waiting_time_of_task_id(self, task_id: str) -> float: 

273 """Get the maximum waiting time of a task.""" 

274 return self.task_kpis[task_id].waiting_time.max 

275 

276 def get_task_names_sorted_by_waiting_time_desc(self) -> list[str]: 

277 """Get a list of task names sorted by the average waiting time in desc order.""" 

278 task_waiting_time = [ 

279 (task_name, task_kpi.waiting_time.avg) for task_name, task_kpi in self.task_kpis.items() 

280 ] 

281 return [task_name for task_name, _ in sorted(task_waiting_time, key=lambda x: x[1], reverse=True)] 

282 

283 def get_task_names_sorted_by_idle_time_desc(self) -> list[str]: 

284 """Get a list of task names sorted by the average idle time in desc order.""" 

285 task_idle_time = [ 

286 (task_name, task_kpi.idle_time.avg) for task_name, task_kpi in self.task_kpis.items() 

287 ] 

288 return [task_name for task_name, _ in sorted(task_idle_time, key=lambda x: x[1], reverse=True)] 

289 

290 def get_most_frequent_enablement_weekdays(self, task_name: str) -> list[DAY]: 

291 """Get a list of weekdays, on which the task was enabled. 

292 

293 The list is sorted by the most common weekday first. 

294 """ 

295 return [ 

296 day 

297 for day, _ in sorted( 

298 self.task_enablement_weekdays[task_name].items(), 

299 key=lambda x: sum(x[1].values()), 

300 reverse=True, 

301 ) 

302 ] 

303 

304 def get_most_frequent_resources(self, task_name: str) -> list[str]: 

305 """Get a list of resources that executed the task the most.""" 

306 return self.get_resources_sorted_by_task_execution_count(task_name) 

307 

308 def get_least_utilized_resources(self) -> list[str]: 

309 """Get a list of resources that have the least utilization.""" 

310 return [ 

311 resource_name 

312 for resource_name, _ in sorted( 

313 self.resource_utilizations.items(), key=lambda x: x[1], reverse=False 

314 ) 

315 ] 

316 

317 def get_tasks_sorted_by_occurrences_of_wt_and_it(self) -> list[str]: 

318 """Get a list of task names sorted by wt & it instances. 

319 

320 In clear words: Orders descending the tasks by the number of times 

321 they were executed(number of events) and had either a waiting or idle time. 

322 """ 

323 occurrences = self.task_execution_count_with_wt_or_it 

324 return [task_name for task_name, _ in sorted(occurrences.items(), key=lambda x: x[1], reverse=True)] 

325 

326 def get_task_execution_count_by_resource(self, resource_id: str) -> dict[str, int]: 

327 """Get the number of times each task was executed by a given resource.""" 

328 return self.task_execution_count_by_resource.get(resource_id, {}) 

329 

330 def get_avg_processing_cost_per_task(self) -> dict[str, float]: 

331 """Get the average processing cost per task.""" 

332 return {task_id: task_kpi.cost.avg for task_id, task_kpi in self.task_kpis.items()} 

333 

334 def get_avg_cost_per_task(self) -> dict[str, float]: 

335 """Get the average total (fixed + processing) cost per task.""" 

336 return { 

337 task_id: task_kpi.cost.avg + (self.total_fixed_cost_by_task.get(task_id, 0) / task_kpi.cost.count) 

338 for task_id, task_kpi in self.task_kpis.items() 

339 } 

340 

341 def get_total_cost_per_task(self) -> dict[str, float]: 

342 """Get the total (fixed + processing) cost per task.""" 

343 return { 

344 task_id: task_kpi.cost.total + self.total_fixed_cost_by_task.get(task_id, 0) 

345 for task_id, task_kpi in self.task_kpis.items() 

346 } 

347 

348 def get_resources_sorted_by_task_execution_count(self, task_id: str) -> list[str]: 

349 """Get a list of resource_ids, that executed the given task. 

350 

351 The list is sorted by the most common resource first. 

352 """ 

353 resource_counts = Counter( 

354 { 

355 resource_id: count[task_id] 

356 for resource_id, count in self.task_execution_count_by_resource.items() 

357 if task_id in count 

358 } 

359 ) 

360 

361 # Return the resource_ids sorted by most common first 

362 return [resource_id for resource_id, _ in resource_counts.most_common()] 

363 

364 def get_total_processing_time_per_task(self) -> dict[str, float]: 

365 """Get the total processing time per task (excl. idle times).""" 

366 return {task_id: task_kpi.processing_time.total for task_id, task_kpi in self.task_kpis.items()} 

367 

368 def get_average_processing_time_per_task(self) -> dict[str, float]: 

369 """Get the average processing time per task (excl. idle times).""" 

370 return {task_id: task_kpi.processing_time.avg for task_id, task_kpi in self.task_kpis.items()} 

371 

372 def get_total_duration_time_per_task(self) -> dict[str, float]: 

373 """Get the total duration time per task (incl. idle times).""" 

374 return {task_id: task_kpi.idle_processing_time.total for task_id, task_kpi in self.task_kpis.items()} 

375 

376 def get_avg_duration_time_per_task(self) -> dict[str, float]: 

377 """Get the average duration time per task (incl. idle times & wt).""" 

378 return { 

379 task_id: task_kpi.idle_processing_time.avg + task_kpi.waiting_time.avg 

380 for task_id, task_kpi in self.task_kpis.items() 

381 } 

382 

383 def get_total_idle_time_of_task_id(self, task_id: str) -> float: 

384 """Get the total idle time of a task.""" 

385 return self.task_kpis[task_id].idle_time.total 

386 

387 def get_total_cycle_time_of_task_id(self, task_id: str) -> float: 

388 """Get the total cycle time of a task.""" 

389 return self.task_kpis[task_id].idle_cycle_time.total 

390 

391 def to_tuple(self) -> tuple[float, float]: 

392 """Convert self to a tuple of cost for available time and total cycle time.""" 

393 return (self.pareto_x, self.pareto_y) 

394 

395 def distance_to(self, other: "Evaluation") -> float: 

396 """Calculate the euclidean distance between two evaluations.""" 

397 return math.sqrt((self.pareto_x - other.pareto_x) ** 2 + (self.pareto_y - other.pareto_y) ** 2) 

398 

399 # Is this evaluation dominated by another evaluation? 

400 # (Taking only the total cost & total cycle time into account) 

401 def is_dominated_by(self, other: "Evaluation") -> bool: 

402 """Check if this evaluation is dominated by another evaluation.""" 

403 if not Settings.EQUAL_DOMINATION_ALLOWED: 

404 return other.pareto_x <= self.pareto_x and other.pareto_y <= self.pareto_y 

405 return other.pareto_x < self.pareto_x and other.pareto_y < self.pareto_y 

406 

407 def __str__(self) -> str: 

408 """Return a string representation of the evaluation.""" 

409 return f"{Settings.get_pareto_x_label()}: {self.pareto_x:.1f}, {Settings.get_pareto_y_label()}: {self.pareto_y:.1f}" # noqa: E501 

410 

411 @staticmethod 

412 def get_task_enablement_weekdays( 

413 cases: list[Trace], 

414 ) -> dict[str, dict[DAY, dict[int, int]]]: 

415 """Get the weekdays & time of day on which a task was enabled.""" 

416 weekdays: dict[str, dict[DAY, dict[int, int]]] = {} 

417 for case in cases: 

418 event_list: list[TaskEvent] = case.event_list 

419 for event in event_list: 

420 if event.task_id not in weekdays: 

421 weekdays[event.task_id] = {} 

422 if event.enabled_datetime is None: 

423 continue 

424 day = DAY.from_date(event.enabled_datetime) 

425 hour = event.enabled_datetime.hour 

426 if day not in weekdays[event.task_id]: 

427 weekdays[event.task_id][day] = {} 

428 if hour not in weekdays[event.task_id][day]: 

429 weekdays[event.task_id][day][hour] = 0 

430 weekdays[event.task_id][day][hour] += 1 

431 return weekdays 

432 

433 @staticmethod 

434 def get_task_started_at_weekdays( 

435 cases: list[Trace], 

436 ) -> dict[str, dict[DAY, dict[int, int]]]: 

437 """Get the weekdays & time of day on which a task was started.""" 

438 weekdays: dict[str, dict[DAY, dict[int, int]]] = {} 

439 for case in cases: 

440 event_list: list[TaskEvent] = case.event_list 

441 for event in event_list: 

442 if event.task_id not in weekdays: 

443 weekdays[event.task_id] = {} 

444 if event.started_datetime is None: 

445 continue 

446 day = DAY.from_date(event.started_datetime) 

447 hour = event.started_datetime.hour 

448 if day not in weekdays[event.task_id]: 

449 weekdays[event.task_id][day] = {} 

450 if hour not in weekdays[event.task_id][day]: 

451 weekdays[event.task_id][day][hour] = 0 

452 weekdays[event.task_id][day][hour] += 1 

453 return weekdays 

454 

455 @staticmethod 

456 def _get_events_for_task(cases: list[Trace], task_name: str) -> list[TaskEvent]: 

457 """Get all events for a task.""" 

458 return [event for case in cases for event in case.event_list if event.task_id == task_name] 

459 

460 @staticmethod 

461 def get_resource_started_weekdays( 

462 cases: list[Trace], 

463 ) -> dict[str, dict[DAY, dict[int, int]]]: 

464 """Get the weekdays & time of day on which a resource started a(/any) task.""" 

465 return { 

466 resource_id: { 

467 DAY(weekday): {hour: count for hour, count in task_start_times.items()} 

468 for _, resource_start_times in task_start_times_by_day.items() 

469 for weekday, task_start_times in resource_start_times.items() 

470 } 

471 for resource_id, task_start_times_by_day in Evaluation.get_resource_task_started_weekdays( 

472 cases 

473 ).items() 

474 } 

475 

476 @staticmethod 

477 def get_task_execution_counts(cases: list[Trace]) -> dict[str, int]: 

478 """Get the count each task was executed.""" 

479 occurrences: dict[str, int] = {} 

480 for case in cases: 

481 for event in cast(list[TaskEvent], case.event_list): 

482 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1 

483 return occurrences 

484 

485 @staticmethod 

486 def get_task_execution_count_with_wt_or_it(cases: list[Trace]) -> dict[str, int]: 

487 """Get the count each task was executed with a waiting or idle time.""" 

488 occurrences: dict[str, int] = {} 

489 for case in cases: 

490 for event in cast(list[TaskEvent], case.event_list): 

491 if event.waiting_time is not None and event.waiting_time > 0: 

492 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1 

493 if event.idle_time is not None and event.idle_time > 0: 

494 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1 

495 return occurrences 

496 

497 @staticmethod 

498 def get_task_execution_count_by_resources( 

499 cases: list[Trace], 

500 ) -> dict[str, dict[str, int]]: 

501 """Get the number of times each task was executed by a given resource.""" 

502 occurrences: dict[str, dict[str, int]] = {} 

503 for case in cases: 

504 event_list: list[TaskEvent] = case.event_list 

505 for event in cast(list[TaskEvent], event_list): 

506 occurrences[event.resource_id] = occurrences.get(event.resource_id, {}) 

507 occurrences[event.resource_id][event.task_id] = ( 

508 occurrences[event.resource_id].get(event.task_id, 0) + 1 

509 ) 

510 return occurrences 

511 

512 @staticmethod 

513 def get_resource_task_started_weekdays( 

514 cases: list[Trace], 

515 ) -> dict[str, dict[str, dict[DAY, dict[int, int]]]]: 

516 """Get the weekdays & time of day on which a task was started by a resource.""" 

517 weekdays: dict[str, dict[str, dict[DAY, dict[int, int]]]] = {} 

518 for case in cases: 

519 event_list: list[TaskEvent] = case.event_list 

520 for event in event_list: 

521 if event.resource_id not in weekdays: 

522 weekdays[event.resource_id] = {} 

523 if event.task_id not in weekdays[event.resource_id]: 

524 weekdays[event.resource_id][event.task_id] = {} 

525 if event.started_datetime is None: 

526 continue 

527 day = DAY.from_date(event.started_datetime) 

528 hour = event.started_datetime.hour 

529 if day not in weekdays[event.resource_id][event.task_id]: 

530 weekdays[event.resource_id][event.task_id][day] = {} 

531 if hour not in weekdays[event.resource_id][event.task_id][day]: 

532 weekdays[event.resource_id][event.task_id][day][hour] = 0 

533 weekdays[event.resource_id][event.task_id][day][hour] += 1 

534 return weekdays 

535 

536 @staticmethod 

537 def get_resource_allocation_ratio( 

538 cases: list[Trace], 

539 ) -> dict[str, float]: 

540 """Get the allocation ratio of each task. 

541 

542 The allocation ratio is calculated = 

543 (number of unique resources that executed the task) / (total number of resources) 

544 """ 

545 resources_total = set() 

546 resources_per_task: dict[str, set[str]] = {} 

547 for case in cases: 

548 event_list: list[TaskEvent] = case.event_list 

549 for event in event_list: 

550 resources_per_task[event.task_id] = resources_per_task.get(event.task_id, set()) 

551 resources_per_task[event.task_id].add(event.resource_id) 

552 resources_total.add(event.resource_id) 

553 

554 return { 

555 task_id: len(resources) / len(resources_total) 

556 for task_id, resources in resources_per_task.items() 

557 } 

558 

559 @staticmethod 

560 def get_tasks_by_number_of_duplicate_enablement_dates( 

561 cases: list[Trace], 

562 ) -> dict[str, int]: 

563 """Get the tasks sorted by the number of duplicate enablement dates. 

564 

565 Meaning the more often the same task is enabled at the same time, the higher the rank. 

566 The granularity is one hour. 

567 NOTE: tasks with only one enablement at a given time are not considered. 

568 """ 

569 tasks_with_enablement_dates: dict[str, dict[DAY, dict[int, int]]] = dict() 

570 for case in cases: 

571 event_list: list[TaskEvent] = case.event_list 

572 for event in event_list: 

573 if event.task_id not in tasks_with_enablement_dates: 

574 tasks_with_enablement_dates[event.task_id] = {} 

575 if event.started_datetime is None: 

576 continue 

577 day = DAY.from_date(event.started_datetime) 

578 hour = event.started_datetime.hour 

579 if day not in tasks_with_enablement_dates[event.task_id]: 

580 tasks_with_enablement_dates[event.task_id][day] = {} 

581 if hour not in tasks_with_enablement_dates[event.task_id][day]: 

582 tasks_with_enablement_dates[event.task_id][day][hour] = 0 

583 tasks_with_enablement_dates[event.task_id][day][hour] += 1 

584 

585 return { 

586 task_id: sum( 

587 sum(count for count in day_counts.values() if count > 1) 

588 for day_counts in tasks_with_enablement_dates[task_id].values() 

589 ) 

590 for task_id in tasks_with_enablement_dates 

591 } 

592 

593 @staticmethod 

594 def get_batches_by_activity_with_idle( 

595 batches: dict[BatchInfoKey, BatchInfo], 

596 ) -> dict[str, list[SimpleBatchInfo]]: 

597 """Get batches grouped by activity, only including those with idle time. 

598 

599 Returns a dict where: 

600 - key: activity name 

601 - value: list of dicts containing only the essential batch info: 

602 { 

603 'accumulation_begin': datetime, 

604 'start': datetime, 

605 'ideal_proc': float, 

606 'idle_time': float 

607 } 

608 """ 

609 result: dict[str, list[SimpleBatchInfo]] = {} 

610 for batch in batches.values(): 

611 if batch["idle_time"] == 0: 

612 continue 

613 

614 activity = batch["activity"] 

615 if activity not in result: 

616 result[activity] = [] 

617 

618 result[activity].append( 

619 { 

620 "accumulation_begin": batch["accumulation_begin"], 

621 "start": batch["start"], 

622 "ideal_proc": batch["ideal_proc"], 

623 "idle_time": batch["idle_time"], 

624 } 

625 ) 

626 

627 return result 

628 

629 @staticmethod 

630 def get_average_batch_size_per_task( 

631 batches: dict[BatchInfoKey, BatchInfo], 

632 ) -> dict[str, float]: 

633 """Get the average batch size per task.""" 

634 batches_by_task = {} 

635 for batch in batches.values(): 

636 task_id = batch["activity"] 

637 if task_id not in batches_by_task: 

638 batches_by_task[task_id] = [] 

639 batches_by_task[task_id].append(batch) 

640 

641 return { 

642 task_id: sum(batch["size"] for batch in batches) / len(batches) 

643 for task_id, batches in batches_by_task.items() 

644 } 

645 

646 @staticmethod 

647 def get_avg_batch_size_for_batch_enabled_tasks( 

648 batches: dict[BatchInfoKey, BatchInfo], 

649 ) -> float: 

650 """Get the average batch size over all batches.""" 

651 if not batches: 

652 return 0 

653 return sum(batch["size"] for batch in batches.values()) / len(batches) 

654 

655 @staticmethod 

656 def empty() -> "Evaluation": 

657 """Create an empty evaluation.""" 

658 return Evaluation( 

659 hourly_rates={}, 

660 total_duration=0, 

661 total_cycle_time=0, 

662 avg_cycle_time_by_case=0, 

663 is_empty=True, 

664 task_kpis={}, 

665 resource_kpis={}, 

666 task_execution_count_with_wt_or_it={}, 

667 task_execution_count_by_resource={}, 

668 task_execution_counts={}, 

669 task_enablement_weekdays={}, 

670 task_started_weekdays={}, 

671 avg_batching_waiting_time_per_task={}, 

672 total_batching_waiting_time_per_task={}, 

673 total_batching_waiting_time_per_resource={}, 

674 avg_batching_waiting_time_by_case=0, 

675 total_batching_waiting_time=0, 

676 avg_waiting_time_by_case=0, 

677 total_waiting_time=0, 

678 total_processing_time=0, 

679 sum_of_durations=0, 

680 sum_of_cycle_times=0, 

681 total_fixed_cost_by_task={}, 

682 avg_fixed_cost_per_case=0, 

683 resource_allocation_ratio_task={}, 

684 batches_by_activity_with_idle={}, 

685 avg_batch_size_per_task={}, 

686 avg_batch_size_for_batch_enabled_tasks=0, 

687 resource_started_weekdays={}, 

688 tasks_by_number_of_duplicate_enablement_dates={}, 

689 avg_idle_wt_per_task_instance=0, 

690 avg_batch_processing_time_per_task_instance=0, 

691 ) 

692 

693 @staticmethod 

694 def from_run_simulation_result( 

695 hourly_rates: HourlyRates, 

696 fixed_cost_fns: dict[str, Callable[[float], float]], 

697 batching_rules_exist: bool, 

698 result: RunSimulationResult, 

699 ) -> "Evaluation": 

700 """Create an evaluation from a simulation result.""" 

701 global_kpis, task_kpis, resource_kpis, log_info = result 

702 cases: list[Trace] = [] if log_info is None else log_info.trace_list 

703 

704 all_cases_are_non_empty = all([len(trace.event_list) > 0 for trace in log_info.trace_list]) 

705 if not all_cases_are_non_empty: 

706 return Evaluation.empty() 

707 

708 batches = get_batches_from_event_log(log_info, fixed_cost_fns, batching_rules_exist) 

709 

710 batches_greater_than_one = { 

711 batch_key: batch for batch_key, batch in batches.items() if batch["size"] > 1 

712 } 

713 

714 batch_pd = pd.DataFrame( 

715 { 

716 "batch_id": batch["batch_id"], 

717 "activity": batch["activity"], 

718 "case": batch["case"], 

719 "resource": batch["resource"], 

720 "batch_size": batch["size"], 

721 "batch_waiting_time_seconds": batch["wt_batching"], 

722 "fixed_cost": batch["fixed_cost"], 

723 "processing_time": batch["ideal_proc"], 

724 } 

725 for batch in batches.values() 

726 ) 

727 

728 total_fixed_cost_by_task = batch_pd.groupby("activity")["fixed_cost"].sum().fillna(0).to_dict() 

729 

730 avg_fixed_cost_per_case = float(batch_pd.groupby("case")["fixed_cost"].sum().mean()) 

731 

732 first_enablement = min( 

733 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list], 

734 default=log_info.started_at, 

735 ) 

736 last_completion = max( 

737 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list], 

738 default=log_info.ended_at, 

739 ) 

740 total_cycle_time = (last_completion - first_enablement).total_seconds() 

741 total_idle_time = sum( 

742 [kpi.idle_time.total for kpi in task_kpis.values() if kpi.idle_time.total is not None] 

743 ) 

744 total_processing_time = sum( 

745 [kpi.processing_time.total for kpi in task_kpis.values() if kpi.processing_time.total is not None] 

746 ) 

747 

748 total_duration = total_idle_time + total_processing_time 

749 

750 # Calculate the average processing time per task_instance 

751 # This means that we take sum of processing times per unique batch 

752 # and divide by the number of task_instances 

753 # Which in this case can be achieved by taking the sum of batch sizes 

754 task_instance_count = batch_pd.groupby("batch_id")["batch_size"].first().sum() 

755 

756 if task_instance_count > 0: 

757 avg_batch_processing_time_per_task_instance = ( 

758 batch_pd.groupby("batch_id")["processing_time"].first().sum() / task_instance_count 

759 ) 

760 avg_idle_wt_per_task_instance = ( 

761 sum(kpi.idle_time.total + kpi.waiting_time.total for kpi in task_kpis.values()) 

762 / task_instance_count 

763 ) 

764 

765 else: 

766 avg_batch_processing_time_per_task_instance = 0 

767 avg_idle_wt_per_task_instance = 0 

768 

769 sum_of_durations = sum( 

770 [ 

771 kpi.idle_processing_time.total 

772 for kpi in task_kpis.values() 

773 if kpi.idle_processing_time.total is not None 

774 ] 

775 ) 

776 

777 sum_of_cycle_times = sum( 

778 [kpi.idle_cycle_time.total for kpi in task_kpis.values() if kpi.idle_cycle_time.total is not None] 

779 ) 

780 

781 # print("\n".join([ 

782 # f"{event.started_datetime.isoformat()} -> {event.completed_datetime.isoformat()} " 

783 # f"(enabled: {event.enabled_datetime.isoformat()}) " 

784 # f"(I:{event.idle_time / 3600}, P:{event.processing_time/3600}, " 

785 # f"WT:{event.waiting_time/3600}, C:{event.cycle_time / 3600})" 

786 # for trace in log_info.trace_list for event in trace.event_list 

787 # ])) 

788 return Evaluation( 

789 hourly_rates=hourly_rates, 

790 total_duration=total_duration, 

791 total_cycle_time=total_cycle_time, 

792 total_waiting_time=global_kpis.waiting_time.total, 

793 avg_cycle_time_by_case=global_kpis.cycle_time.avg, 

794 avg_waiting_time_by_case=global_kpis.waiting_time.avg, 

795 avg_idle_wt_per_task_instance=avg_idle_wt_per_task_instance, 

796 avg_batch_processing_time_per_task_instance=avg_batch_processing_time_per_task_instance, 

797 total_processing_time=total_processing_time, 

798 sum_of_durations=sum_of_durations, 

799 sum_of_cycle_times=sum_of_cycle_times, 

800 is_empty=not cases, 

801 task_kpis=task_kpis, 

802 resource_kpis=resource_kpis, 

803 task_execution_count_with_wt_or_it=Evaluation.get_task_execution_count_with_wt_or_it(cases), 

804 task_execution_count_by_resource=Evaluation.get_task_execution_count_by_resources(cases), 

805 task_execution_counts=Evaluation.get_task_execution_counts(cases), 

806 task_enablement_weekdays=Evaluation.get_task_enablement_weekdays(cases), 

807 task_started_weekdays=Evaluation.get_task_started_at_weekdays(cases), 

808 resource_allocation_ratio_task=Evaluation.get_resource_allocation_ratio(cases), 

809 avg_batching_waiting_time_per_task=( 

810 batch_pd.groupby("activity")["batch_waiting_time_seconds"].mean().fillna(0).to_dict() 

811 ), 

812 total_batching_waiting_time_per_task=( 

813 batch_pd.groupby("activity")["batch_waiting_time_seconds"].sum().fillna(0).to_dict() 

814 ), 

815 total_batching_waiting_time_per_resource=( 

816 batch_pd.groupby("resource")["batch_waiting_time_seconds"].sum().fillna(0).to_dict() 

817 ), 

818 avg_batching_waiting_time_by_case=float(batch_pd["batch_waiting_time_seconds"].mean()), 

819 total_batching_waiting_time=(batch_pd["batch_waiting_time_seconds"].sum()), 

820 total_fixed_cost_by_task=total_fixed_cost_by_task, 

821 avg_fixed_cost_per_case=avg_fixed_cost_per_case, 

822 batches_by_activity_with_idle=Evaluation.get_batches_by_activity_with_idle( 

823 batches_greater_than_one 

824 ), 

825 avg_batch_size_per_task=Evaluation.get_average_batch_size_per_task(batches_greater_than_one), 

826 avg_batch_size_for_batch_enabled_tasks=Evaluation.get_avg_batch_size_for_batch_enabled_tasks( 

827 batches_greater_than_one 

828 ), 

829 resource_started_weekdays=Evaluation.get_resource_started_weekdays(cases), 

830 tasks_by_number_of_duplicate_enablement_dates=Evaluation.get_tasks_by_number_of_duplicate_enablement_dates( 

831 cases 

832 ), 

833 ) 

834 

835 @property 

836 def achieved_cycle_time(self) -> float: 

837 """Return the achieved cycle time.""" 

838 return self.total_cycle_time