Coverage for o2/util/stat_calculation_helper.py: 97%
68 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-05-16 11:18 +0000
1import math
3import numpy as np
5from o2.models.solution import Solution
8def distance(point1: tuple[float, float], point2: tuple[float, float]) -> float:
9 """Calculate the Euclidean distance between two points."""
10 return float(np.linalg.norm(np.array(point1) - np.array(point2)))
13def calculate_hyperarea(pareto_solutions: list[Solution], reference_point: tuple[float, float]) -> float:
14 """Compute the hyperarea of the Pareto solutions.
16 The hyperarea is the hypervolume in 2D of the region dominated by the set of
17 solutions and bounded above by the given reference_point. Formally:
19 HV(Y; r) = LebesgueMeasure( ∪_{y ∈ Y} [y, r] )
21 where each y ∈ Y is a 2D point, and r = (r_x, r_y) is "worse" than all y.
23 NOTE: We are using a simplified version of the hyperarea calculation, which
24 will only work will valid pareto fronts (e.g. no dominating solutions).
25 """
26 # Quick exit if there are no solutions
27 if not pareto_solutions:
28 return 0.0
30 # Sort solutions in descending order by their x-coordinate
31 sorted_solutions = sorted(pareto_solutions, key=lambda s: s.pareto_x, reverse=True)
33 ref_x, ref_y = reference_point
35 # Accumulate area by "slicing" from right to left
36 area = 0.0
37 # Start from the reference corner
38 last_x = ref_x
40 # Traverse from the solution with the largest x to the smallest
41 for sol in sorted_solutions:
42 x, y = sol.point
44 width = last_x - x
45 height = ref_y - y
46 # Add the area of that rectangle
47 area += width * height
49 # Move the x-boundary left
50 last_x = x
52 return area
55def generational_distance_p2(a: list[Solution], b: list[Solution]) -> float:
56 """Compute GD₂(A,B).
58 Steps:
59 - For each solution a in A, find the minimum Euclidean distance to any b in B.
60 - Accumulate the square of that minimum distance.
61 - Return the square root of the average of those squared distances (RMS).
62 """
63 if not a:
64 return 0.0
65 sum_of_squares = 0.0
66 for sol_a in a:
67 a_pt = sol_a.point
68 min_d = float("inf")
69 for sol_b in b:
70 b_pt = sol_b.point
71 dist = distance(a_pt, b_pt)
72 if dist < min_d:
73 min_d = dist
74 sum_of_squares += min_d**2
76 mean_sq = sum_of_squares / len(a)
77 return math.sqrt(mean_sq)
80def calculate_averaged_hausdorff_distance(
81 pareto_front: list[Solution], reference_set: list[Solution]
82) -> float:
83 """Calculate the p=2 averaged Hausdorff distance between two sets of solutions.
85 The p=2 averaged Hausdorff distance (also called the modified Hausdorff distance)
86 is defined as:
88 Δ₂(A, B) = avg{ GD₂(A, B), GD₂(B, A) },
90 where GD₂(A, B) is the generational distance in the direction A->B:
92 GD₂(A, B) = sqrt( (1/|A|) * Σ ( min_{b ∈ B} d(a,b) )² ), for a ∈ A.
94 Here, d(a,b) is the usual Euclidean distance between points a and b.
95 """
96 if not pareto_front:
97 return 0.0
99 # Compute generational distance A->B and B->A
100 gd_2 = generational_distance_p2(pareto_front, reference_set)
101 igd_2 = generational_distance_p2(reference_set, pareto_front)
103 # The averaged Hausdorff distance is the avg of the two directions
104 return (gd_2 + igd_2) / 2
107def calculate_delta_metric(pareto_front: list, reference_set: list) -> float:
108 """Calculate the Delta metric for diversity of the Pareto front.
110 Formula Definition:
111 Δ = (d_f + d_l + sum(|d_i - d̄|)) / (d_f + d_l + (N - 1)*d̄)
112 where:
113 - d_f = distance(first Pareto solution, reference extreme)
114 - d_l = distance(last Pareto solution, reference extreme)
115 - d_i = distance between consecutive solutions (i and i+1)
116 - d̄ = average of all d_i
117 - N = number of solutions in the Pareto front
118 """
119 # Handle edge case: no solutions
120 if not pareto_front:
121 return 0.0
123 # 1. Determine "extreme" reference points from the reference set
124 extreme_x = min(reference_set, key=lambda s: s.pareto_x)
125 extreme_y = min(reference_set, key=lambda s: s.pareto_y)
127 # 2. Sort the Pareto front along the same objective
128 sorted_front_x = sorted(pareto_front, key=lambda s: s.pareto_x)
129 sorted_front_y = sorted(pareto_front, key=lambda s: s.pareto_y)
131 # If there is only 1 solution, there's no "spread"
132 if len(sorted_front_x) == 1:
133 # You might define Δ = 0 if only one solution is present.
134 return 0.0
136 # 3. Compute the distances according to the formula
137 # d_f: distance between the first Pareto solution and the reference min
138 d_f = distance(sorted_front_x[0].point, extreme_x.point)
139 # d_l: distance between the last Pareto solution and the reference max
140 d_l = distance(sorted_front_y[0].point, extreme_y.point)
142 # d_i: consecutive distances between solutions in the sorted Pareto front
143 # We just go one "direction" (x), because euclidean distance is commutative
144 consecutive_d = []
145 for i in range(1, len(sorted_front_x)):
146 d_i = distance(sorted_front_x[i - 1].point, sorted_front_x[i].point)
147 consecutive_d.append(d_i)
149 # Average gap d̄
150 d_bar = sum(consecutive_d) / len(consecutive_d)
152 # Sum of absolute deviations from the mean gap
153 sum_abs_dev = sum(abs(d_i - d_bar) for d_i in consecutive_d)
155 # Numerator and denominator of the Delta formula
156 numerator = d_f + d_l + sum_abs_dev
157 denominator = d_f + d_l + (len(consecutive_d) * d_bar)
159 if denominator == 0.0:
160 return float("inf")
162 delta = numerator / denominator
163 return delta
166def calculate_purity(pareto_front: list[Solution], reference_set: list[Solution]) -> float:
167 """Calculate the Purity metric for the Pareto front."""
168 pareto_set = {tuple(sol.point) for sol in pareto_front}
169 reference_set_points = {tuple(sol.point) for sol in reference_set}
170 pure_points = pareto_set.intersection(reference_set_points)
171 return len(pure_points) / len(reference_set_points) if pareto_front else 0.0