Coverage for o2/models/evaluation.py: 95%
359 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import math
2from collections import Counter
3from dataclasses import dataclass
4from functools import cached_property, reduce
5from typing import TYPE_CHECKING, Callable, cast
7import pandas as pd
8from prosimos.execution_info import TaskEvent, Trace
9from prosimos.simulation_stats_calculator import (
10 KPIMap,
11 ResourceKPI,
12)
14from o2.models.days import DAY
15from o2.models.settings import CostType, Settings
16from o2.simulation_runner import RunSimulationResult
17from o2.util.waiting_time_helper import (
18 BatchInfo,
19 BatchInfoKey,
20 SimpleBatchInfo,
21 get_batches_from_event_log,
22)
24if TYPE_CHECKING:
25 pass
28HourlyRates = dict[str, int]
31@dataclass(frozen=True)
32class Evaluation:
33 """An evaluation of a simulation run.
35 It's a wrapper for the result classes of a PROSIMOS simulation run,
36 with a lot of useful getters and methods to analyze the results.
37 """
39 hourly_rates: HourlyRates
41 task_kpis: dict[str, KPIMap]
42 resource_kpis: dict[str, ResourceKPI]
44 avg_cycle_time_by_case: float
45 """Get the mean cycle time of the simulation."""
47 avg_waiting_time_by_case: float
48 """Get the average waiting time of the simulation."""
50 avg_batching_waiting_time_by_case: float
51 """Get the average batching waiting time per case."""
53 avg_batching_waiting_time_per_task: dict[str, float]
54 """Get the average batching waiting time per task."""
55 total_batching_waiting_time_per_task: dict[str, float]
56 """Get the total batching waiting time per task (all cases)."""
58 total_duration: float
59 """Get the total duration (processing + idle) of the simulation."""
61 sum_of_durations: float
62 """Get the sum of all task durations of the simulation."""
64 sum_of_cycle_times: float
65 """Get the sum of all task cycle times of the simulation."""
67 total_batching_waiting_time: float
68 """Get the total batching waiting time of the simulation (all cases)."""
70 total_batching_waiting_time_per_resource: dict[str, float]
71 """Get the total batching waiting time of the simulation per resource."""
73 total_cycle_time: float
74 """Get the total cycle time of the simulation."""
76 total_processing_time: float
77 """Get the total processing time of the simulation."""
79 total_waiting_time: float
80 """Get the total waiting time of the simulation."""
82 task_execution_count_by_resource: dict[str, dict[str, int]]
83 """Get the number of times each task was executed by a given resource.
85 E.g. task_execution_count_by_resource["resource_id"]["task_id"]
86 """
88 is_empty: bool
89 """Is this evaluation based on an empty simulation run?"""
91 task_execution_count_with_wt_or_it: dict[str, int]
92 """Get the count each task was executed with a waiting or idle time."""
94 task_execution_counts: dict[str, int]
95 """Get the count each task was executed"""
97 task_enablement_weekdays: dict[str, dict[DAY, dict[int, int]]]
98 """Get the weekdays & hours on which a task was enabled."""
100 task_started_weekdays: dict[str, dict[DAY, dict[int, int]]]
101 """Get the weekdays & hours on which a task was started."""
103 resource_allocation_ratio_task: dict[str, float]
104 """Get the allocation ratio of each task."""
106 total_fixed_cost_by_task: dict[str, float]
107 """Get the total fixed cost of each task."""
108 avg_fixed_cost_per_case: float
109 """Get the average fixed cost per case."""
111 batches_by_activity_with_idle: dict[str, list[SimpleBatchInfo]]
112 """Get the batches grouped by activity, only including those with idle time."""
114 avg_batch_size_for_batch_enabled_tasks: float
115 """Get the average batch size over all batches."""
117 avg_batch_size_per_task: dict[str, float]
118 """Get the average batch size per task."""
120 avg_idle_wt_per_task_instance: float
121 """Get the average idle waiting time per task instance."""
123 avg_batch_processing_time_per_task_instance: float
124 """Get the average batch processing time per task instance.
126 Pseudo-Code:
127 sum(batch.processing_time for batch in batches) / sum(batch.size for batch in batches)
128 """
130 resource_started_weekdays: dict[str, dict[DAY, dict[int, int]]]
131 """Get the weekdays & hours on which a resource started any task."""
133 tasks_by_number_of_duplicate_enablement_dates: dict[str, int]
134 """Get the tasks sorted by the number of duplicate enablement dates."""
136 @cached_property
137 def total_processing_cost_for_tasks(self) -> float:
138 """Get the total cost of all tasks."""
139 return sum(
140 map(
141 lambda task_kpi: task_kpi.cost.total,
142 self.task_kpis.values(),
143 )
144 )
146 @cached_property
147 def total_cost_for_worked_time(self) -> float:
148 """Get the total flexible cost of the simulation.
150 This takes the worked time and the resource cost per hour into account.
151 It will therefore give you a "realistic" of hiring the resources for the
152 duration of the simulation.
153 """
154 return sum(
155 (resource_kpi.worked_time / (60 * 60)) * self.hourly_rates[resource_id]
156 for resource_id, resource_kpi in self.resource_kpis.items()
157 )
159 @cached_property
160 def total_cost_for_available_time(self) -> float:
161 """Get the cost of the resources for the worked time.
163 Aka the cost you had if the resource calender would exactly match the
164 worked time.
165 """
166 return sum(
167 (resource_kpi.available_time / (60 * 60)) * self.hourly_rates[resource_id]
168 for resource_id, resource_kpi in self.resource_kpis.items()
169 )
171 @cached_property
172 def avg_cost_by_case(self) -> float:
173 """Get the average cost sum of all tasks."""
174 return sum(
175 map(
176 lambda task_kpi: task_kpi.cost.avg,
177 self.task_kpis.values(),
178 )
179 )
181 @cached_property
182 def avg_resource_utilization_by_case(self) -> float:
183 """Get the average resource utilization of the simulation."""
184 return reduce(lambda x, y: x + y, self.resource_utilizations.values()) / len(
185 self.resource_utilizations
186 )
188 @cached_property
189 def resource_worked_times(self) -> dict[str, float]:
190 """Get the worked time of all resources."""
191 return {
192 resource_id: resource_kpi.worked_time for resource_id, resource_kpi in self.resource_kpis.items()
193 }
195 @cached_property
196 def resource_available_times(self) -> dict[str, float]:
197 """Get the availability of all resources."""
198 return {
199 resource_id: resource_kpi.available_time
200 for resource_id, resource_kpi in self.resource_kpis.items()
201 }
203 @cached_property
204 def resource_utilizations(self) -> dict[str, float]:
205 """Get the utilization of all resources."""
206 return {
207 resource_id: resource_kpi.utilization for resource_id, resource_kpi in self.resource_kpis.items()
208 }
210 @cached_property
211 def total_fixed_cost(self) -> float:
212 """Get the total fixed cost of the simulation."""
213 return sum(self.total_fixed_cost_by_task.values())
215 @cached_property
216 def total_cost(self) -> float:
217 """Get the total cost of the simulation."""
218 return self.total_cost_for_worked_time + self.total_fixed_cost
220 @cached_property
221 def total_resource_idle_time(self) -> float:
222 """Get the total resource idle time of the simulation.
224 This is calculated by summing up the difference worked time and available time
225 for all resources.
226 """
227 return sum(
228 resource_kpi.worked_time - resource_kpi.available_time
229 for resource_kpi in self.resource_kpis.values()
230 )
232 @cached_property
233 def total_task_idle_time(self) -> float:
234 """Get the total task idle time of the simulation."""
235 return sum(task_kpi.idle_time.total for task_kpi in self.task_kpis.values())
237 @property
238 def pareto_x(self) -> float:
239 """Get the cost used for positioning the evaluation in the pareto front.
241 NOTE: This is depended on the global, static setting found in `Settings.COST_TYPE`
242 """
243 if Settings.COST_TYPE == CostType.FIXED_COST:
244 return float(self.total_fixed_cost)
245 elif Settings.COST_TYPE == CostType.RESOURCE_COST:
246 return float(self.total_cost_for_available_time)
247 elif Settings.COST_TYPE == CostType.TOTAL_COST:
248 return float(self.total_cost_for_worked_time)
249 elif Settings.COST_TYPE == CostType.WAITING_TIME_AND_PROCESSING_TIME:
250 return float(self.total_processing_time)
251 elif Settings.COST_TYPE == CostType.AVG_WT_AND_PT_PER_TASK_INSTANCE:
252 return float(self.avg_batch_processing_time_per_task_instance)
253 raise ValueError(f"Unknown cost type: {Settings.COST_TYPE}")
255 @property
256 def pareto_y(self) -> float:
257 """Get the duration used for positioning the evaluation in the pareto front."""
258 if Settings.COST_TYPE == CostType.WAITING_TIME_AND_PROCESSING_TIME:
259 return float(self.total_waiting_time + self.total_task_idle_time)
260 elif Settings.COST_TYPE == CostType.AVG_WT_AND_PT_PER_TASK_INSTANCE:
261 return float(self.avg_idle_wt_per_task_instance)
262 return float(self.total_duration)
264 def get_avg_waiting_time_of_task_id(self, task_id: str) -> float:
265 """Get the average waiting time of a task."""
266 return self.task_kpis[task_id].waiting_time.avg
268 def get_total_waiting_time_of_task_id(self, task_id: str) -> float:
269 """Get the total waiting time of a task."""
270 return self.task_kpis[task_id].waiting_time.total
272 def get_max_waiting_time_of_task_id(self, task_id: str) -> float:
273 """Get the maximum waiting time of a task."""
274 return self.task_kpis[task_id].waiting_time.max
276 def get_task_names_sorted_by_waiting_time_desc(self) -> list[str]:
277 """Get a list of task names sorted by the average waiting time in desc order."""
278 task_waiting_time = [
279 (task_name, task_kpi.waiting_time.avg) for task_name, task_kpi in self.task_kpis.items()
280 ]
281 return [task_name for task_name, _ in sorted(task_waiting_time, key=lambda x: x[1], reverse=True)]
283 def get_task_names_sorted_by_idle_time_desc(self) -> list[str]:
284 """Get a list of task names sorted by the average idle time in desc order."""
285 task_idle_time = [
286 (task_name, task_kpi.idle_time.avg) for task_name, task_kpi in self.task_kpis.items()
287 ]
288 return [task_name for task_name, _ in sorted(task_idle_time, key=lambda x: x[1], reverse=True)]
290 def get_most_frequent_enablement_weekdays(self, task_name: str) -> list[DAY]:
291 """Get a list of weekdays, on which the task was enabled.
293 The list is sorted by the most common weekday first.
294 """
295 return [
296 day
297 for day, _ in sorted(
298 self.task_enablement_weekdays[task_name].items(),
299 key=lambda x: sum(x[1].values()),
300 reverse=True,
301 )
302 ]
304 def get_most_frequent_resources(self, task_name: str) -> list[str]:
305 """Get a list of resources that executed the task the most."""
306 return self.get_resources_sorted_by_task_execution_count(task_name)
308 def get_least_utilized_resources(self) -> list[str]:
309 """Get a list of resources that have the least utilization."""
310 return [
311 resource_name
312 for resource_name, _ in sorted(
313 self.resource_utilizations.items(), key=lambda x: x[1], reverse=False
314 )
315 ]
317 def get_tasks_sorted_by_occurrences_of_wt_and_it(self) -> list[str]:
318 """Get a list of task names sorted by wt & it instances.
320 In clear words: Orders descending the tasks by the number of times
321 they were executed(number of events) and had either a waiting or idle time.
322 """
323 occurrences = self.task_execution_count_with_wt_or_it
324 return [task_name for task_name, _ in sorted(occurrences.items(), key=lambda x: x[1], reverse=True)]
326 def get_task_execution_count_by_resource(self, resource_id: str) -> dict[str, int]:
327 """Get the number of times each task was executed by a given resource."""
328 return self.task_execution_count_by_resource.get(resource_id, {})
330 def get_avg_processing_cost_per_task(self) -> dict[str, float]:
331 """Get the average processing cost per task."""
332 return {task_id: task_kpi.cost.avg for task_id, task_kpi in self.task_kpis.items()}
334 def get_avg_cost_per_task(self) -> dict[str, float]:
335 """Get the average total (fixed + processing) cost per task."""
336 return {
337 task_id: task_kpi.cost.avg + (self.total_fixed_cost_by_task.get(task_id, 0) / task_kpi.cost.count)
338 for task_id, task_kpi in self.task_kpis.items()
339 }
341 def get_total_cost_per_task(self) -> dict[str, float]:
342 """Get the total (fixed + processing) cost per task."""
343 return {
344 task_id: task_kpi.cost.total + self.total_fixed_cost_by_task.get(task_id, 0)
345 for task_id, task_kpi in self.task_kpis.items()
346 }
348 def get_resources_sorted_by_task_execution_count(self, task_id: str) -> list[str]:
349 """Get a list of resource_ids, that executed the given task.
351 The list is sorted by the most common resource first.
352 """
353 resource_counts = Counter(
354 {
355 resource_id: count[task_id]
356 for resource_id, count in self.task_execution_count_by_resource.items()
357 if task_id in count
358 }
359 )
361 # Return the resource_ids sorted by most common first
362 return [resource_id for resource_id, _ in resource_counts.most_common()]
364 def get_total_processing_time_per_task(self) -> dict[str, float]:
365 """Get the total processing time per task (excl. idle times)."""
366 return {task_id: task_kpi.processing_time.total for task_id, task_kpi in self.task_kpis.items()}
368 def get_average_processing_time_per_task(self) -> dict[str, float]:
369 """Get the average processing time per task (excl. idle times)."""
370 return {task_id: task_kpi.processing_time.avg for task_id, task_kpi in self.task_kpis.items()}
372 def get_total_duration_time_per_task(self) -> dict[str, float]:
373 """Get the total duration time per task (incl. idle times)."""
374 return {task_id: task_kpi.idle_processing_time.total for task_id, task_kpi in self.task_kpis.items()}
376 def get_avg_duration_time_per_task(self) -> dict[str, float]:
377 """Get the average duration time per task (incl. idle times & wt)."""
378 return {
379 task_id: task_kpi.idle_processing_time.avg + task_kpi.waiting_time.avg
380 for task_id, task_kpi in self.task_kpis.items()
381 }
383 def get_total_idle_time_of_task_id(self, task_id: str) -> float:
384 """Get the total idle time of a task."""
385 return self.task_kpis[task_id].idle_time.total
387 def get_total_cycle_time_of_task_id(self, task_id: str) -> float:
388 """Get the total cycle time of a task."""
389 return self.task_kpis[task_id].idle_cycle_time.total
391 def to_tuple(self) -> tuple[float, float]:
392 """Convert self to a tuple of cost for available time and total cycle time."""
393 return (self.pareto_x, self.pareto_y)
395 def distance_to(self, other: "Evaluation") -> float:
396 """Calculate the euclidean distance between two evaluations."""
397 return math.sqrt((self.pareto_x - other.pareto_x) ** 2 + (self.pareto_y - other.pareto_y) ** 2)
399 # Is this evaluation dominated by another evaluation?
400 # (Taking only the total cost & total cycle time into account)
401 def is_dominated_by(self, other: "Evaluation") -> bool:
402 """Check if this evaluation is dominated by another evaluation."""
403 if not Settings.EQUAL_DOMINATION_ALLOWED:
404 return other.pareto_x <= self.pareto_x and other.pareto_y <= self.pareto_y
405 return other.pareto_x < self.pareto_x and other.pareto_y < self.pareto_y
407 def __str__(self) -> str:
408 """Return a string representation of the evaluation."""
409 return f"{Settings.get_pareto_x_label()}: {self.pareto_x:.1f}, {Settings.get_pareto_y_label()}: {self.pareto_y:.1f}" # noqa: E501
411 @staticmethod
412 def get_task_enablement_weekdays(
413 cases: list[Trace],
414 ) -> dict[str, dict[DAY, dict[int, int]]]:
415 """Get the weekdays & time of day on which a task was enabled."""
416 weekdays: dict[str, dict[DAY, dict[int, int]]] = {}
417 for case in cases:
418 event_list: list[TaskEvent] = case.event_list
419 for event in event_list:
420 if event.task_id not in weekdays:
421 weekdays[event.task_id] = {}
422 if event.enabled_datetime is None:
423 continue
424 day = DAY.from_date(event.enabled_datetime)
425 hour = event.enabled_datetime.hour
426 if day not in weekdays[event.task_id]:
427 weekdays[event.task_id][day] = {}
428 if hour not in weekdays[event.task_id][day]:
429 weekdays[event.task_id][day][hour] = 0
430 weekdays[event.task_id][day][hour] += 1
431 return weekdays
433 @staticmethod
434 def get_task_started_at_weekdays(
435 cases: list[Trace],
436 ) -> dict[str, dict[DAY, dict[int, int]]]:
437 """Get the weekdays & time of day on which a task was started."""
438 weekdays: dict[str, dict[DAY, dict[int, int]]] = {}
439 for case in cases:
440 event_list: list[TaskEvent] = case.event_list
441 for event in event_list:
442 if event.task_id not in weekdays:
443 weekdays[event.task_id] = {}
444 if event.started_datetime is None:
445 continue
446 day = DAY.from_date(event.started_datetime)
447 hour = event.started_datetime.hour
448 if day not in weekdays[event.task_id]:
449 weekdays[event.task_id][day] = {}
450 if hour not in weekdays[event.task_id][day]:
451 weekdays[event.task_id][day][hour] = 0
452 weekdays[event.task_id][day][hour] += 1
453 return weekdays
455 @staticmethod
456 def _get_events_for_task(cases: list[Trace], task_name: str) -> list[TaskEvent]:
457 """Get all events for a task."""
458 return [event for case in cases for event in case.event_list if event.task_id == task_name]
460 @staticmethod
461 def get_resource_started_weekdays(
462 cases: list[Trace],
463 ) -> dict[str, dict[DAY, dict[int, int]]]:
464 """Get the weekdays & time of day on which a resource started a(/any) task."""
465 return {
466 resource_id: {
467 DAY(weekday): {hour: count for hour, count in task_start_times.items()}
468 for _, resource_start_times in task_start_times_by_day.items()
469 for weekday, task_start_times in resource_start_times.items()
470 }
471 for resource_id, task_start_times_by_day in Evaluation.get_resource_task_started_weekdays(
472 cases
473 ).items()
474 }
476 @staticmethod
477 def get_task_execution_counts(cases: list[Trace]) -> dict[str, int]:
478 """Get the count each task was executed."""
479 occurrences: dict[str, int] = {}
480 for case in cases:
481 for event in cast(list[TaskEvent], case.event_list):
482 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1
483 return occurrences
485 @staticmethod
486 def get_task_execution_count_with_wt_or_it(cases: list[Trace]) -> dict[str, int]:
487 """Get the count each task was executed with a waiting or idle time."""
488 occurrences: dict[str, int] = {}
489 for case in cases:
490 for event in cast(list[TaskEvent], case.event_list):
491 if event.waiting_time is not None and event.waiting_time > 0:
492 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1
493 if event.idle_time is not None and event.idle_time > 0:
494 occurrences[event.task_id] = occurrences.get(event.task_id, 0) + 1
495 return occurrences
497 @staticmethod
498 def get_task_execution_count_by_resources(
499 cases: list[Trace],
500 ) -> dict[str, dict[str, int]]:
501 """Get the number of times each task was executed by a given resource."""
502 occurrences: dict[str, dict[str, int]] = {}
503 for case in cases:
504 event_list: list[TaskEvent] = case.event_list
505 for event in cast(list[TaskEvent], event_list):
506 occurrences[event.resource_id] = occurrences.get(event.resource_id, {})
507 occurrences[event.resource_id][event.task_id] = (
508 occurrences[event.resource_id].get(event.task_id, 0) + 1
509 )
510 return occurrences
512 @staticmethod
513 def get_resource_task_started_weekdays(
514 cases: list[Trace],
515 ) -> dict[str, dict[str, dict[DAY, dict[int, int]]]]:
516 """Get the weekdays & time of day on which a task was started by a resource."""
517 weekdays: dict[str, dict[str, dict[DAY, dict[int, int]]]] = {}
518 for case in cases:
519 event_list: list[TaskEvent] = case.event_list
520 for event in event_list:
521 if event.resource_id not in weekdays:
522 weekdays[event.resource_id] = {}
523 if event.task_id not in weekdays[event.resource_id]:
524 weekdays[event.resource_id][event.task_id] = {}
525 if event.started_datetime is None:
526 continue
527 day = DAY.from_date(event.started_datetime)
528 hour = event.started_datetime.hour
529 if day not in weekdays[event.resource_id][event.task_id]:
530 weekdays[event.resource_id][event.task_id][day] = {}
531 if hour not in weekdays[event.resource_id][event.task_id][day]:
532 weekdays[event.resource_id][event.task_id][day][hour] = 0
533 weekdays[event.resource_id][event.task_id][day][hour] += 1
534 return weekdays
536 @staticmethod
537 def get_resource_allocation_ratio(
538 cases: list[Trace],
539 ) -> dict[str, float]:
540 """Get the allocation ratio of each task.
542 The allocation ratio is calculated =
543 (number of unique resources that executed the task) / (total number of resources)
544 """
545 resources_total = set()
546 resources_per_task: dict[str, set[str]] = {}
547 for case in cases:
548 event_list: list[TaskEvent] = case.event_list
549 for event in event_list:
550 resources_per_task[event.task_id] = resources_per_task.get(event.task_id, set())
551 resources_per_task[event.task_id].add(event.resource_id)
552 resources_total.add(event.resource_id)
554 return {
555 task_id: len(resources) / len(resources_total)
556 for task_id, resources in resources_per_task.items()
557 }
559 @staticmethod
560 def get_tasks_by_number_of_duplicate_enablement_dates(
561 cases: list[Trace],
562 ) -> dict[str, int]:
563 """Get the tasks sorted by the number of duplicate enablement dates.
565 Meaning the more often the same task is enabled at the same time, the higher the rank.
566 The granularity is one hour.
567 NOTE: tasks with only one enablement at a given time are not considered.
568 """
569 tasks_with_enablement_dates: dict[str, dict[DAY, dict[int, int]]] = dict()
570 for case in cases:
571 event_list: list[TaskEvent] = case.event_list
572 for event in event_list:
573 if event.task_id not in tasks_with_enablement_dates:
574 tasks_with_enablement_dates[event.task_id] = {}
575 if event.started_datetime is None:
576 continue
577 day = DAY.from_date(event.started_datetime)
578 hour = event.started_datetime.hour
579 if day not in tasks_with_enablement_dates[event.task_id]:
580 tasks_with_enablement_dates[event.task_id][day] = {}
581 if hour not in tasks_with_enablement_dates[event.task_id][day]:
582 tasks_with_enablement_dates[event.task_id][day][hour] = 0
583 tasks_with_enablement_dates[event.task_id][day][hour] += 1
585 return {
586 task_id: sum(
587 sum(count for count in day_counts.values() if count > 1)
588 for day_counts in tasks_with_enablement_dates[task_id].values()
589 )
590 for task_id in tasks_with_enablement_dates
591 }
593 @staticmethod
594 def get_batches_by_activity_with_idle(
595 batches: dict[BatchInfoKey, BatchInfo],
596 ) -> dict[str, list[SimpleBatchInfo]]:
597 """Get batches grouped by activity, only including those with idle time.
599 Returns a dict where:
600 - key: activity name
601 - value: list of dicts containing only the essential batch info:
602 {
603 'accumulation_begin': datetime,
604 'start': datetime,
605 'ideal_proc': float,
606 'idle_time': float
607 }
608 """
609 result: dict[str, list[SimpleBatchInfo]] = {}
610 for batch in batches.values():
611 if batch["idle_time"] == 0:
612 continue
614 activity = batch["activity"]
615 if activity not in result:
616 result[activity] = []
618 result[activity].append(
619 {
620 "accumulation_begin": batch["accumulation_begin"],
621 "start": batch["start"],
622 "ideal_proc": batch["ideal_proc"],
623 "idle_time": batch["idle_time"],
624 }
625 )
627 return result
629 @staticmethod
630 def get_average_batch_size_per_task(
631 batches: dict[BatchInfoKey, BatchInfo],
632 ) -> dict[str, float]:
633 """Get the average batch size per task."""
634 batches_by_task = {}
635 for batch in batches.values():
636 task_id = batch["activity"]
637 if task_id not in batches_by_task:
638 batches_by_task[task_id] = []
639 batches_by_task[task_id].append(batch)
641 return {
642 task_id: sum(batch["size"] for batch in batches) / len(batches)
643 for task_id, batches in batches_by_task.items()
644 }
646 @staticmethod
647 def get_avg_batch_size_for_batch_enabled_tasks(
648 batches: dict[BatchInfoKey, BatchInfo],
649 ) -> float:
650 """Get the average batch size over all batches."""
651 if not batches:
652 return 0
653 return sum(batch["size"] for batch in batches.values()) / len(batches)
655 @staticmethod
656 def empty() -> "Evaluation":
657 """Create an empty evaluation."""
658 return Evaluation(
659 hourly_rates={},
660 total_duration=0,
661 total_cycle_time=0,
662 avg_cycle_time_by_case=0,
663 is_empty=True,
664 task_kpis={},
665 resource_kpis={},
666 task_execution_count_with_wt_or_it={},
667 task_execution_count_by_resource={},
668 task_execution_counts={},
669 task_enablement_weekdays={},
670 task_started_weekdays={},
671 avg_batching_waiting_time_per_task={},
672 total_batching_waiting_time_per_task={},
673 total_batching_waiting_time_per_resource={},
674 avg_batching_waiting_time_by_case=0,
675 total_batching_waiting_time=0,
676 avg_waiting_time_by_case=0,
677 total_waiting_time=0,
678 total_processing_time=0,
679 sum_of_durations=0,
680 sum_of_cycle_times=0,
681 total_fixed_cost_by_task={},
682 avg_fixed_cost_per_case=0,
683 resource_allocation_ratio_task={},
684 batches_by_activity_with_idle={},
685 avg_batch_size_per_task={},
686 avg_batch_size_for_batch_enabled_tasks=0,
687 resource_started_weekdays={},
688 tasks_by_number_of_duplicate_enablement_dates={},
689 avg_idle_wt_per_task_instance=0,
690 avg_batch_processing_time_per_task_instance=0,
691 )
693 @staticmethod
694 def from_run_simulation_result(
695 hourly_rates: HourlyRates,
696 fixed_cost_fns: dict[str, Callable[[float], float]],
697 batching_rules_exist: bool,
698 result: RunSimulationResult,
699 ) -> "Evaluation":
700 """Create an evaluation from a simulation result."""
701 global_kpis, task_kpis, resource_kpis, log_info = result
702 cases: list[Trace] = [] if log_info is None else log_info.trace_list
704 all_cases_are_non_empty = all([len(trace.event_list) > 0 for trace in log_info.trace_list])
705 if not all_cases_are_non_empty:
706 return Evaluation.empty()
708 batches = get_batches_from_event_log(log_info, fixed_cost_fns, batching_rules_exist)
710 batches_greater_than_one = {
711 batch_key: batch for batch_key, batch in batches.items() if batch["size"] > 1
712 }
714 batch_pd = pd.DataFrame(
715 {
716 "batch_id": batch["batch_id"],
717 "activity": batch["activity"],
718 "case": batch["case"],
719 "resource": batch["resource"],
720 "batch_size": batch["size"],
721 "batch_waiting_time_seconds": batch["wt_batching"],
722 "fixed_cost": batch["fixed_cost"],
723 "processing_time": batch["ideal_proc"],
724 }
725 for batch in batches.values()
726 )
728 total_fixed_cost_by_task = batch_pd.groupby("activity")["fixed_cost"].sum().fillna(0).to_dict()
730 avg_fixed_cost_per_case = float(batch_pd.groupby("case")["fixed_cost"].sum().mean())
732 first_enablement = min(
733 [event.enabled_datetime for trace in log_info.trace_list for event in trace.event_list],
734 default=log_info.started_at,
735 )
736 last_completion = max(
737 [event.completed_datetime for trace in log_info.trace_list for event in trace.event_list],
738 default=log_info.ended_at,
739 )
740 total_cycle_time = (last_completion - first_enablement).total_seconds()
741 total_idle_time = sum(
742 [kpi.idle_time.total for kpi in task_kpis.values() if kpi.idle_time.total is not None]
743 )
744 total_processing_time = sum(
745 [kpi.processing_time.total for kpi in task_kpis.values() if kpi.processing_time.total is not None]
746 )
748 total_duration = total_idle_time + total_processing_time
750 # Calculate the average processing time per task_instance
751 # This means that we take sum of processing times per unique batch
752 # and divide by the number of task_instances
753 # Which in this case can be achieved by taking the sum of batch sizes
754 task_instance_count = batch_pd.groupby("batch_id")["batch_size"].first().sum()
756 if task_instance_count > 0:
757 avg_batch_processing_time_per_task_instance = (
758 batch_pd.groupby("batch_id")["processing_time"].first().sum() / task_instance_count
759 )
760 avg_idle_wt_per_task_instance = (
761 sum(kpi.idle_time.total + kpi.waiting_time.total for kpi in task_kpis.values())
762 / task_instance_count
763 )
765 else:
766 avg_batch_processing_time_per_task_instance = 0
767 avg_idle_wt_per_task_instance = 0
769 sum_of_durations = sum(
770 [
771 kpi.idle_processing_time.total
772 for kpi in task_kpis.values()
773 if kpi.idle_processing_time.total is not None
774 ]
775 )
777 sum_of_cycle_times = sum(
778 [kpi.idle_cycle_time.total for kpi in task_kpis.values() if kpi.idle_cycle_time.total is not None]
779 )
781 # print("\n".join([
782 # f"{event.started_datetime.isoformat()} -> {event.completed_datetime.isoformat()} "
783 # f"(enabled: {event.enabled_datetime.isoformat()}) "
784 # f"(I:{event.idle_time / 3600}, P:{event.processing_time/3600}, "
785 # f"WT:{event.waiting_time/3600}, C:{event.cycle_time / 3600})"
786 # for trace in log_info.trace_list for event in trace.event_list
787 # ]))
788 return Evaluation(
789 hourly_rates=hourly_rates,
790 total_duration=total_duration,
791 total_cycle_time=total_cycle_time,
792 total_waiting_time=global_kpis.waiting_time.total,
793 avg_cycle_time_by_case=global_kpis.cycle_time.avg,
794 avg_waiting_time_by_case=global_kpis.waiting_time.avg,
795 avg_idle_wt_per_task_instance=avg_idle_wt_per_task_instance,
796 avg_batch_processing_time_per_task_instance=avg_batch_processing_time_per_task_instance,
797 total_processing_time=total_processing_time,
798 sum_of_durations=sum_of_durations,
799 sum_of_cycle_times=sum_of_cycle_times,
800 is_empty=not cases,
801 task_kpis=task_kpis,
802 resource_kpis=resource_kpis,
803 task_execution_count_with_wt_or_it=Evaluation.get_task_execution_count_with_wt_or_it(cases),
804 task_execution_count_by_resource=Evaluation.get_task_execution_count_by_resources(cases),
805 task_execution_counts=Evaluation.get_task_execution_counts(cases),
806 task_enablement_weekdays=Evaluation.get_task_enablement_weekdays(cases),
807 task_started_weekdays=Evaluation.get_task_started_at_weekdays(cases),
808 resource_allocation_ratio_task=Evaluation.get_resource_allocation_ratio(cases),
809 avg_batching_waiting_time_per_task=(
810 batch_pd.groupby("activity")["batch_waiting_time_seconds"].mean().fillna(0).to_dict()
811 ),
812 total_batching_waiting_time_per_task=(
813 batch_pd.groupby("activity")["batch_waiting_time_seconds"].sum().fillna(0).to_dict()
814 ),
815 total_batching_waiting_time_per_resource=(
816 batch_pd.groupby("resource")["batch_waiting_time_seconds"].sum().fillna(0).to_dict()
817 ),
818 avg_batching_waiting_time_by_case=float(batch_pd["batch_waiting_time_seconds"].mean()),
819 total_batching_waiting_time=(batch_pd["batch_waiting_time_seconds"].sum()),
820 total_fixed_cost_by_task=total_fixed_cost_by_task,
821 avg_fixed_cost_per_case=avg_fixed_cost_per_case,
822 batches_by_activity_with_idle=Evaluation.get_batches_by_activity_with_idle(
823 batches_greater_than_one
824 ),
825 avg_batch_size_per_task=Evaluation.get_average_batch_size_per_task(batches_greater_than_one),
826 avg_batch_size_for_batch_enabled_tasks=Evaluation.get_avg_batch_size_for_batch_enabled_tasks(
827 batches_greater_than_one
828 ),
829 resource_started_weekdays=Evaluation.get_resource_started_weekdays(cases),
830 tasks_by_number_of_duplicate_enablement_dates=Evaluation.get_tasks_by_number_of_duplicate_enablement_dates(
831 cases
832 ),
833 )
835 @property
836 def achieved_cycle_time(self) -> float:
837 """Return the achieved cycle time."""
838 return self.total_cycle_time