Coverage for o2/util/stat_calculation_helper.py: 97%

68 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-05-16 11:18 +0000

1import math 

2 

3import numpy as np 

4 

5from o2.models.solution import Solution 

6 

7 

8def distance(point1: tuple[float, float], point2: tuple[float, float]) -> float: 

9 """Calculate the Euclidean distance between two points.""" 

10 return float(np.linalg.norm(np.array(point1) - np.array(point2))) 

11 

12 

13def calculate_hyperarea(pareto_solutions: list[Solution], reference_point: tuple[float, float]) -> float: 

14 """Compute the hyperarea of the Pareto solutions. 

15 

16 The hyperarea is the hypervolume in 2D of the region dominated by the set of 

17 solutions and bounded above by the given reference_point. Formally: 

18 

19 HV(Y; r) = LebesgueMeasure( ∪_{y ∈ Y} [y, r] ) 

20 

21 where each y ∈ Y is a 2D point, and r = (r_x, r_y) is "worse" than all y. 

22 

23 NOTE: We are using a simplified version of the hyperarea calculation, which 

24 will only work will valid pareto fronts (e.g. no dominating solutions). 

25 """ 

26 # Quick exit if there are no solutions 

27 if not pareto_solutions: 

28 return 0.0 

29 

30 # Sort solutions in descending order by their x-coordinate 

31 sorted_solutions = sorted(pareto_solutions, key=lambda s: s.pareto_x, reverse=True) 

32 

33 ref_x, ref_y = reference_point 

34 

35 # Accumulate area by "slicing" from right to left 

36 area = 0.0 

37 # Start from the reference corner 

38 last_x = ref_x 

39 

40 # Traverse from the solution with the largest x to the smallest 

41 for sol in sorted_solutions: 

42 x, y = sol.point 

43 

44 width = last_x - x 

45 height = ref_y - y 

46 # Add the area of that rectangle 

47 area += width * height 

48 

49 # Move the x-boundary left 

50 last_x = x 

51 

52 return area 

53 

54 

55def generational_distance_p2(a: list[Solution], b: list[Solution]) -> float: 

56 """Compute GD₂(A,B). 

57 

58 Steps: 

59 - For each solution a in A, find the minimum Euclidean distance to any b in B. 

60 - Accumulate the square of that minimum distance. 

61 - Return the square root of the average of those squared distances (RMS). 

62 """ 

63 if not a: 

64 return 0.0 

65 sum_of_squares = 0.0 

66 for sol_a in a: 

67 a_pt = sol_a.point 

68 min_d = float("inf") 

69 for sol_b in b: 

70 b_pt = sol_b.point 

71 dist = distance(a_pt, b_pt) 

72 if dist < min_d: 

73 min_d = dist 

74 sum_of_squares += min_d**2 

75 

76 mean_sq = sum_of_squares / len(a) 

77 return math.sqrt(mean_sq) 

78 

79 

80def calculate_averaged_hausdorff_distance( 

81 pareto_front: list[Solution], reference_set: list[Solution] 

82) -> float: 

83 """Calculate the p=2 averaged Hausdorff distance between two sets of solutions. 

84 

85 The p=2 averaged Hausdorff distance (also called the modified Hausdorff distance) 

86 is defined as: 

87 

88 Δ₂(A, B) = avg{ GD₂(A, B), GD₂(B, A) }, 

89 

90 where GD₂(A, B) is the generational distance in the direction A->B: 

91 

92 GD₂(A, B) = sqrt( (1/|A|) * Σ ( min_{b ∈ B} d(a,b) )² ), for a ∈ A. 

93 

94 Here, d(a,b) is the usual Euclidean distance between points a and b. 

95 """ 

96 if not pareto_front: 

97 return 0.0 

98 

99 # Compute generational distance A->B and B->A 

100 gd_2 = generational_distance_p2(pareto_front, reference_set) 

101 igd_2 = generational_distance_p2(reference_set, pareto_front) 

102 

103 # The averaged Hausdorff distance is the avg of the two directions 

104 return (gd_2 + igd_2) / 2 

105 

106 

107def calculate_delta_metric(pareto_front: list, reference_set: list) -> float: 

108 """Calculate the Delta metric for diversity of the Pareto front. 

109 

110 Formula Definition: 

111 Δ = (d_f + d_l + sum(|d_i - d̄|)) / (d_f + d_l + (N - 1)*d̄) 

112 where: 

113 - d_f = distance(first Pareto solution, reference extreme) 

114 - d_l = distance(last Pareto solution, reference extreme) 

115 - d_i = distance between consecutive solutions (i and i+1) 

116 - d̄ = average of all d_i 

117 - N = number of solutions in the Pareto front 

118 """ 

119 # Handle edge case: no solutions 

120 if not pareto_front: 

121 return 0.0 

122 

123 # 1. Determine "extreme" reference points from the reference set 

124 extreme_x = min(reference_set, key=lambda s: s.pareto_x) 

125 extreme_y = min(reference_set, key=lambda s: s.pareto_y) 

126 

127 # 2. Sort the Pareto front along the same objective 

128 sorted_front_x = sorted(pareto_front, key=lambda s: s.pareto_x) 

129 sorted_front_y = sorted(pareto_front, key=lambda s: s.pareto_y) 

130 

131 # If there is only 1 solution, there's no "spread" 

132 if len(sorted_front_x) == 1: 

133 # You might define Δ = 0 if only one solution is present. 

134 return 0.0 

135 

136 # 3. Compute the distances according to the formula 

137 # d_f: distance between the first Pareto solution and the reference min 

138 d_f = distance(sorted_front_x[0].point, extreme_x.point) 

139 # d_l: distance between the last Pareto solution and the reference max 

140 d_l = distance(sorted_front_y[0].point, extreme_y.point) 

141 

142 # d_i: consecutive distances between solutions in the sorted Pareto front 

143 # We just go one "direction" (x), because euclidean distance is commutative 

144 consecutive_d = [] 

145 for i in range(1, len(sorted_front_x)): 

146 d_i = distance(sorted_front_x[i - 1].point, sorted_front_x[i].point) 

147 consecutive_d.append(d_i) 

148 

149 # Average gap d̄ 

150 d_bar = sum(consecutive_d) / len(consecutive_d) 

151 

152 # Sum of absolute deviations from the mean gap 

153 sum_abs_dev = sum(abs(d_i - d_bar) for d_i in consecutive_d) 

154 

155 # Numerator and denominator of the Delta formula 

156 numerator = d_f + d_l + sum_abs_dev 

157 denominator = d_f + d_l + (len(consecutive_d) * d_bar) 

158 

159 if denominator == 0.0: 

160 return float("inf") 

161 

162 delta = numerator / denominator 

163 return delta 

164 

165 

166def calculate_purity(pareto_front: list[Solution], reference_set: list[Solution]) -> float: 

167 """Calculate the Purity metric for the Pareto front.""" 

168 pareto_set = {tuple(sol.point) for sol in pareto_front} 

169 reference_set_points = {tuple(sol.point) for sol in reference_set} 

170 pure_points = pareto_set.intersection(reference_set_points) 

171 return len(pure_points) / len(reference_set_points) if pareto_front else 0.0