Coverage for o2/util/waiting_time_helper.py: 99%
70 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1from collections import defaultdict
2from datetime import datetime
3from typing import Callable, TypedDict
5from prosimos.simulation_stats_calculator import (
6 LogInfo,
7 TaskEvent,
8)
10BatchInfoKey = tuple[str, str, datetime]
11"""Activity, resource, start time"""
14class BatchInfo(TypedDict):
15 """Batch information."""
17 batch_id: str
18 activity: str
19 resource: str
20 case: int
21 start: datetime
22 """Starting time of the batch execution"""
23 end: datetime
24 """Ending time of the batch execution"""
25 accumulation_begin: datetime
26 """Time when the first task in the batch was enabled"""
27 accumulation_end: datetime
28 """Time when the last task in the batch was enabled"""
29 wt_first: float
30 """Time difference between enablement time of the first task in the batch to the
31 starting time of the batch."""
32 wt_last: float
33 """Sum of the difference between enablement of the last task and the starting
34 time of the batch."""
35 wt_total: float
36 """Sum of the difference between enablement of each task and the starting time of
37 the batch."""
38 wt_batching: float
39 """Sum of the difference between enablement of each task and the last enablement of
40 the batch."""
41 idle_time: float
42 """Idle time of the batch"""
43 real_proc: float
44 """Real processing time of the batch, i.e., processing + idle time"""
45 ideal_proc: float
46 """Ideal processing time without considering idle time, i.e., removing the idle
47 time from the real processing"""
48 size: int
49 """Number of tasks in the batch"""
50 fixed_cost: float
51 """Fixed cost of the batch"""
54class SimpleBatchInfo(TypedDict):
55 """Simplified batch information.
57 Only contains some fields to preserve memory.
58 """
60 accumulation_begin: datetime
61 start: datetime
62 ideal_proc: float
63 idle_time: float
66def get_batches_from_event_log(
67 log: LogInfo,
68 fixed_cost_fns: dict[str, Callable[[float], float]],
69 batching_rules_exist: bool = True,
70) -> dict[BatchInfoKey, BatchInfo]:
71 """Identify batches with their key statistics.
73 - wt_first: Time difference between enablement time of the first task in the batch
74 to the starting time of the batch.
75 - wt_last: Time difference between enablement time of the last task in the batch
76 to the starting time of the batch.
77 - real_proc: real processing time of the batch, i.e., duration from start to end
78 - ideal_proc: ideal processing time without considering idle time,
79 i.e., removing the idle time from the real processing time.
81 Additionally the activity, resource and start / end time are kept for each batch.
82 """
83 batches: list[tuple[str, list[TaskEvent]]] = []
85 events: list[TaskEvent] = [event for trace in log.trace_list for event in trace.event_list]
87 # Group events by batch_id
88 batches_dict: dict[str, list[TaskEvent]] = defaultdict(list)
89 for event in events:
90 if event.resource_id is not None and event.batch_id is not None:
91 batches_dict[event.batch_id].append(event)
92 else:
93 batches_dict[f"{event.task_id}_{event.resource_id}_{event.started_datetime}"].append(event)
95 # Convert to list of batches
96 batches = list(batches_dict.items())
98 result: dict[BatchInfoKey, BatchInfo] = {}
100 for batch_id, batch in batches:
101 case = batch[0].p_case
102 activity = batch[0].task_id
103 resource = batch[0].resource_id
104 execution_start: datetime = min([event.started_datetime for event in batch])
105 execution_end: datetime = max([event.completed_datetime for event in batch])
106 accumulation_begin: datetime = min(
107 [event.enabled_datetime for event in batch if event.enabled_datetime is not None]
108 )
109 accumulation_end: datetime = max(
110 [event.enabled_datetime for event in batch if event.enabled_datetime is not None]
111 )
112 if (activity, resource, execution_start) in result:
113 continue
114 batch_idle_time = batch[0].idle_time or 0
115 processing_time = batch[0].idle_processing_time or 0
116 ideal_processing_time = processing_time - batch_idle_time
117 wt_total = sum([(event.started_datetime - event.enabled_datetime).total_seconds() for event in batch])
118 wt_batching = sum(
119 [
120 (accumulation_end - event.enabled_datetime).total_seconds()
121 for event in batch
122 if event.enabled_datetime is not None
123 ]
124 )
126 fixed_cost = fixed_cost_fns.get(activity, lambda _: 0)(len(batch))
128 result[(activity, resource, execution_start)] = {
129 "batch_id": batch_id,
130 "case": case,
131 "activity": activity,
132 "resource": resource,
133 "start": execution_start,
134 "end": execution_end,
135 "accumulation_begin": accumulation_begin,
136 "accumulation_end": accumulation_end,
137 "wt_first": (execution_start - accumulation_begin).seconds,
138 "wt_last": (execution_start - accumulation_end).seconds,
139 "wt_total": wt_total,
140 "wt_batching": wt_batching,
141 "idle_time": batch_idle_time,
142 "real_proc": processing_time,
143 "ideal_proc": ideal_processing_time,
144 "size": len(batch),
145 "fixed_cost": fixed_cost,
146 }
148 return result