Coverage for o2/util/waiting_time_helper.py: 99%

70 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1from collections import defaultdict 

2from datetime import datetime 

3from typing import Callable, TypedDict 

4 

5from prosimos.simulation_stats_calculator import ( 

6 LogInfo, 

7 TaskEvent, 

8) 

9 

10BatchInfoKey = tuple[str, str, datetime] 

11"""Activity, resource, start time""" 

12 

13 

14class BatchInfo(TypedDict): 

15 """Batch information.""" 

16 

17 batch_id: str 

18 activity: str 

19 resource: str 

20 case: int 

21 start: datetime 

22 """Starting time of the batch execution""" 

23 end: datetime 

24 """Ending time of the batch execution""" 

25 accumulation_begin: datetime 

26 """Time when the first task in the batch was enabled""" 

27 accumulation_end: datetime 

28 """Time when the last task in the batch was enabled""" 

29 wt_first: float 

30 """Time difference between enablement time of the first task in the batch to the 

31 starting time of the batch.""" 

32 wt_last: float 

33 """Sum of the difference between enablement of the last task and the starting 

34 time of the batch.""" 

35 wt_total: float 

36 """Sum of the difference between enablement of each task and the starting time of 

37 the batch.""" 

38 wt_batching: float 

39 """Sum of the difference between enablement of each task and the last enablement of 

40 the batch.""" 

41 idle_time: float 

42 """Idle time of the batch""" 

43 real_proc: float 

44 """Real processing time of the batch, i.e., processing + idle time""" 

45 ideal_proc: float 

46 """Ideal processing time without considering idle time, i.e., removing the idle 

47 time from the real processing""" 

48 size: int 

49 """Number of tasks in the batch""" 

50 fixed_cost: float 

51 """Fixed cost of the batch""" 

52 

53 

54class SimpleBatchInfo(TypedDict): 

55 """Simplified batch information. 

56 

57 Only contains some fields to preserve memory. 

58 """ 

59 

60 accumulation_begin: datetime 

61 start: datetime 

62 ideal_proc: float 

63 idle_time: float 

64 

65 

66def get_batches_from_event_log( 

67 log: LogInfo, 

68 fixed_cost_fns: dict[str, Callable[[float], float]], 

69 batching_rules_exist: bool = True, 

70) -> dict[BatchInfoKey, BatchInfo]: 

71 """Identify batches with their key statistics. 

72 

73 - wt_first: Time difference between enablement time of the first task in the batch 

74 to the starting time of the batch. 

75 - wt_last: Time difference between enablement time of the last task in the batch 

76 to the starting time of the batch. 

77 - real_proc: real processing time of the batch, i.e., duration from start to end 

78 - ideal_proc: ideal processing time without considering idle time, 

79 i.e., removing the idle time from the real processing time. 

80 

81 Additionally the activity, resource and start / end time are kept for each batch. 

82 """ 

83 batches: list[tuple[str, list[TaskEvent]]] = [] 

84 

85 events: list[TaskEvent] = [event for trace in log.trace_list for event in trace.event_list] 

86 

87 # Group events by batch_id 

88 batches_dict: dict[str, list[TaskEvent]] = defaultdict(list) 

89 for event in events: 

90 if event.resource_id is not None and event.batch_id is not None: 

91 batches_dict[event.batch_id].append(event) 

92 else: 

93 batches_dict[f"{event.task_id}_{event.resource_id}_{event.started_datetime}"].append(event) 

94 

95 # Convert to list of batches 

96 batches = list(batches_dict.items()) 

97 

98 result: dict[BatchInfoKey, BatchInfo] = {} 

99 

100 for batch_id, batch in batches: 

101 case = batch[0].p_case 

102 activity = batch[0].task_id 

103 resource = batch[0].resource_id 

104 execution_start: datetime = min([event.started_datetime for event in batch]) 

105 execution_end: datetime = max([event.completed_datetime for event in batch]) 

106 accumulation_begin: datetime = min( 

107 [event.enabled_datetime for event in batch if event.enabled_datetime is not None] 

108 ) 

109 accumulation_end: datetime = max( 

110 [event.enabled_datetime for event in batch if event.enabled_datetime is not None] 

111 ) 

112 if (activity, resource, execution_start) in result: 

113 continue 

114 batch_idle_time = batch[0].idle_time or 0 

115 processing_time = batch[0].idle_processing_time or 0 

116 ideal_processing_time = processing_time - batch_idle_time 

117 wt_total = sum([(event.started_datetime - event.enabled_datetime).total_seconds() for event in batch]) 

118 wt_batching = sum( 

119 [ 

120 (accumulation_end - event.enabled_datetime).total_seconds() 

121 for event in batch 

122 if event.enabled_datetime is not None 

123 ] 

124 ) 

125 

126 fixed_cost = fixed_cost_fns.get(activity, lambda _: 0)(len(batch)) 

127 

128 result[(activity, resource, execution_start)] = { 

129 "batch_id": batch_id, 

130 "case": case, 

131 "activity": activity, 

132 "resource": resource, 

133 "start": execution_start, 

134 "end": execution_end, 

135 "accumulation_begin": accumulation_begin, 

136 "accumulation_end": accumulation_end, 

137 "wt_first": (execution_start - accumulation_begin).seconds, 

138 "wt_last": (execution_start - accumulation_end).seconds, 

139 "wt_total": wt_total, 

140 "wt_batching": wt_batching, 

141 "idle_time": batch_idle_time, 

142 "real_proc": processing_time, 

143 "ideal_proc": ideal_processing_time, 

144 "size": len(batch), 

145 "fixed_cost": fixed_cost, 

146 } 

147 

148 return result