"""
Évaluateur partiel pour sous-réseaux
=====================================
Module pour évaluer formellement un sous-ensemble de couches d'un réseau.
Utile pour tester la soundness sur de grands modèles comme VGG16.
"""
import numpy as np
from typing import List, Dict, Tuple, Any, Optional
import onnxruntime as ort
from .affine_engine import AffineExpressionEngine, AffineExpression
from .onnx_parser import ONNXParser
from .bound_propagator import BoundPropagator
[docs]
class PartialNetworkEvaluator:
"""Évaluateur pour sous-réseaux - utilise BoundPropagator en interne"""
[docs]
def __init__(self, engine: AffineExpressionEngine = None):
"""
Initialiser l'évaluateur partiel
Args:
engine: Moteur d'expressions affines (optionnel, créé si absent)
"""
self.engine = engine or AffineExpressionEngine()
# Utiliser BoundPropagator au lieu de réécrire la logique
self.propagator = BoundPropagator(
relaxation_type='linear',
enable_reporting=False # Pas de rapport par défaut
)
[docs]
def propagate_through_layers(
self,
expressions: List[AffineExpression],
layers: List[Dict[str, Any]],
input_shape: Tuple[int, ...]
) -> Tuple[List[AffineExpression], List[Tuple[float, float]]]:
"""
Propager les expressions à travers un sous-ensemble de couches
Utilise BoundPropagator.propagate() en interne.
Args:
expressions: Expressions d'entrée
layers: Liste des couches à propager
input_shape: Forme de l'entrée (B, C, H, W) ou (C, H, W)
Returns:
Tuple (expressions_sortie, bornes_intermédiaires)
"""
print(f" Propagation à travers {len(layers)} couches...")
print(f" Forme d'entrée initiale : {input_shape}")
# Utiliser le propagateur existant
try:
output_expressions = self.propagator.propagate(
expressions,
layers,
input_shape=input_shape
)
# Extraire les bornes intermédiaires
all_bounds = []
for bound_info in self.propagator.intermediate_bounds:
all_bounds.extend(bound_info.get('bounds', []))
print(f" ✓ Propagation terminée : {len(output_expressions)} expressions")
return output_expressions, all_bounds
except Exception as e:
print(f" ✗ Erreur lors de la propagation : {e}")
import traceback
traceback.print_exc()
raise
[docs]
class ONNXPartialEvaluator:
"""Évaluateur utilisant ONNX Runtime pour les activations intermédiaires"""
[docs]
def __init__(self, model_path: str):
"""
Initialiser l'évaluateur ONNX
Args:
model_path: Chemin vers le modèle ONNX
"""
self.model_path = model_path
self.session = ort.InferenceSession(model_path)
self.parser = ONNXParser(model_path)
[docs]
def monte_carlo_sampling_at_layer(
self,
image: np.ndarray,
noise_level: float,
num_layers: int,
num_samples: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
"""
Échantillonnage Monte Carlo à une couche spécifique
Utilise ONNX Runtime pour l'inférence complète, puis compare
avec les résultats formels.
Args:
image: Image d'entrée (C, H, W) ou (B, C, H, W)
noise_level: Niveau de bruit
num_layers: Nombre de couches à traverser (ignoré pour full network)
num_samples: Nombre d'échantillons
Returns:
Tuple (bornes_min, bornes_max) des sorties du réseau
"""
print(f"\n Monte Carlo : échantillonnage sur le réseau complet...")
try:
# Stratégie 1: Essayer PyTorch pour VGG16
if 'vgg16' in self.model_path.lower():
return self._monte_carlo_vgg16(image, noise_level, num_layers, num_samples)
# Stratégie 2: Utiliser ONNX Runtime pour les autres modèles
return self._monte_carlo_onnx_runtime(image, noise_level, num_samples)
except Exception as e:
print(f" ✗ Erreur lors de l'extraction des activations : {e}")
raise
def _monte_carlo_vgg16(
self,
image: np.ndarray,
noise_level: float,
num_layers: int,
num_samples: int
) -> Tuple[np.ndarray, np.ndarray]:
"""Monte Carlo sampling spécifique pour VGG16 avec PyTorch"""
import torch
import torchvision.models as models
model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
model.eval()
# Extraire sous-réseau VGG16
all_layers = self.parser.parse()
target_layers = all_layers[:num_layers]
feature_index = self._map_vgg16_to_feature_index(target_layers)
print(f" → Extraction jusqu'à features[{feature_index}] (VGG16)")
sub_model = torch.nn.Sequential(*list(model.features[:feature_index+1]))
sub_model.eval()
samples = []
for _ in range(num_samples):
# Générer bruit aléatoire
noise = np.random.uniform(-noise_level, noise_level, image.shape).astype(np.float32)
noisy_image = (image + noise).astype(np.float32)
# Ajouter dimension batch si nécessaire
if len(noisy_image.shape) == 3:
noisy_image = np.expand_dims(noisy_image, axis=0)
# Inférence à travers le sous-modèle
with torch.no_grad():
input_tensor = torch.from_numpy(noisy_image)
intermediate_output = sub_model(input_tensor)
samples.append(intermediate_output.numpy().flatten())
samples_array = np.array(samples)
return samples_array.min(axis=0), samples_array.max(axis=0)
def _monte_carlo_onnx_runtime(
self,
image: np.ndarray,
noise_level: float,
num_samples: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Monte Carlo sampling générique utilisant ONNX Runtime
Fonctionne avec n'importe quel modèle ONNX en faisant
l'inférence complète du réseau.
"""
print(f" → Échantillonnage via ONNX Runtime (réseau complet)")
# Obtenir le nom de l'entrée depuis la session ONNX
input_name = self.session.get_inputs()[0].name
samples = []
for i in range(num_samples):
# Générer bruit aléatoire
noise = np.random.uniform(-noise_level, noise_level, image.shape).astype(np.float32)
noisy_image = (image + noise).astype(np.float32)
# Ajouter dimension batch si nécessaire
if len(noisy_image.shape) == 3:
noisy_image = np.expand_dims(noisy_image, axis=0)
# Inférence via ONNX Runtime
try:
outputs = self.session.run(None, {input_name: noisy_image})
# Prendre la première sortie (logits généralement)
output = outputs[0].flatten()
samples.append(output)
except Exception as e:
print(f" ✗ Erreur d'inférence ONNX à l'échantillon {i}: {e}")
# Continuer avec les autres échantillons
continue
# Afficher progression
if (i + 1) % 20 == 0:
print(f" Progression: {i+1}/{num_samples} échantillons")
if not samples:
raise RuntimeError("Aucun échantillon Monte Carlo n'a réussi")
samples_array = np.array(samples)
print(f" ✓ {len(samples)} échantillons collectés")
return samples_array.min(axis=0), samples_array.max(axis=0)
def _map_vgg16_to_feature_index(self, layers: List[Dict[str, Any]]) -> int:
"""
Mapper les couches parsées vers l'index dans vgg16.features
Args:
layers: Liste des couches parsées
Returns:
Index dans vgg16.features
"""
feature_idx = -1
for layer in layers:
layer_type = layer['type']
if layer_type == 'Conv':
feature_idx += 1
elif layer_type == 'Relu':
feature_idx += 1
elif layer_type == 'MaxPool':
feature_idx += 1
elif layer_type in ['Shape', 'Concat', 'Reshape']:
continue
return feature_idx
[docs]
def monte_carlo_sampling(self, image, noise_level, num_samples=100):
"""Version originale - garde pour compatibilité"""
return self.monte_carlo_sampling_at_layer(image, noise_level, 999, num_samples)
[docs]
def verify_partial_soundness(
model_path: str,
image: np.ndarray,
noise_level: float,
num_layers: int = 5,
num_mc_samples: int = 100
) -> Dict[str, Any]:
"""
Vérifier la soundness sur un sous-réseau
Args:
model_path: Chemin vers le modèle ONNX
image: Image d'entrée (C, H, W) ou (B, C, H, W)
noise_level: Niveau de bruit
num_layers: Nombre de couches à tester
num_mc_samples: Nombre d'échantillons Monte Carlo
Returns:
Dictionnaire avec les résultats de vérification
"""
print(f"\n{'='*70}")
print(f"Vérification de soundness partielle")
print(f"{'='*70}")
# Parser le modèle
parser = ONNXParser(model_path)
all_layers = parser.parse()
layers = all_layers[:num_layers]
print(f"\nModèle : {model_path}")
print(f"Couches à évaluer : {len(layers)}")
print(f"Niveau de bruit : {noise_level}")
print(f"Échantillons MC : {num_mc_samples}")
# Créer l'évaluateur formel (utilise BoundPropagator en interne)
evaluator = PartialNetworkEvaluator()
# Propagation formelle
print(f"\n[1/3] Propagation formelle...")
input_shape = image.shape
try:
input_expressions = evaluator.create_input_expressions(image, noise_level)
print(f" ✓ {len(input_expressions)} expressions créées")
output_expressions, all_bounds = evaluator.propagate_through_layers(
input_expressions,
layers,
input_shape
)
# Extraire les bornes finales (limité pour perf)
sample_size = min(100, len(output_expressions))
formal_bounds = [expr.get_bounds() for expr in output_expressions[:sample_size]]
print(f" ✓ Propagation formelle terminée")
print(f" ✓ {len(formal_bounds)} bornes calculées")
except Exception as e:
print(f" ✗ Erreur lors de la propagation formelle : {e}")
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__
}
# Monte Carlo sampling À LA MÊME COUCHE
print(f"\n[2/3] Échantillonnage Monte Carlo (même couche)...")
onnx_evaluator = ONNXPartialEvaluator(model_path)
try:
mc_min, mc_max = onnx_evaluator.monte_carlo_sampling_at_layer(
image,
noise_level,
num_layers,
num_mc_samples
)
# Limiter aux mêmes indices que formel
mc_min = mc_min[:sample_size]
mc_max = mc_max[:sample_size]
print(f" ✓ {num_mc_samples} échantillons Monte Carlo collectés")
print(f" ✓ Activations extraites à la couche {num_layers}")
except Exception as e:
print(f" ✗ Erreur lors du Monte Carlo : {e}")
import traceback
traceback.print_exc()
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__
}
# Vérification de soundness
print(f"\n[3/3] Vérification de soundness...")
soundness_report = {
'is_sound': True,
'violations': [],
'total_outputs': len(formal_bounds),
'violation_count': 0,
'max_violation_lower': 0.0,
'max_violation_upper': 0.0,
'safety_margins_lower': [],
'safety_margins_upper': [],
'coverage_ratio': 0.0
}
for i, (formal_lower, formal_upper) in enumerate(formal_bounds):
observed_min = float(mc_min[i])
observed_max = float(mc_max[i])
# Vérifier soundness avec tolérance numérique
tolerance = 1e-6
lower_sound = observed_min >= formal_lower - tolerance
upper_sound = observed_max <= formal_upper + tolerance
is_sound = lower_sound and upper_sound
if not is_sound:
soundness_report['is_sound'] = False
soundness_report['violation_count'] += 1
violation = {
'index': i,
'observed_min': observed_min,
'formal_min': float(formal_lower),
'observed_max': observed_max,
'formal_max': float(formal_upper),
'violation_lower': max(0, formal_lower - observed_min),
'violation_upper': max(0, observed_max - formal_upper)
}
soundness_report['violations'].append(violation)
soundness_report['max_violation_lower'] = max(
soundness_report['max_violation_lower'],
violation['violation_lower']
)
soundness_report['max_violation_upper'] = max(
soundness_report['max_violation_upper'],
violation['violation_upper']
)
# Calculer marges de sécurité
margin_lower = observed_min - formal_lower
margin_upper = formal_upper - observed_max
soundness_report['safety_margins_lower'].append(float(margin_lower))
soundness_report['safety_margins_upper'].append(float(margin_upper))
# Calculer le ratio de couverture
soundness_report['coverage_ratio'] = (
1.0 - (soundness_report['violation_count'] / soundness_report['total_outputs'])
)
# Afficher résumé
print(f"\n{'─'*70}")
if soundness_report['is_sound']:
print(f"✅ SOUND : Toutes les valeurs observées sont dans les bornes formelles")
else:
print(f"❌ NOT SOUND : {soundness_report['violation_count']} violations détectées")
print(f" Taux de couverture : {soundness_report['coverage_ratio']*100:.2f}%")
avg_margin_lower = np.mean(soundness_report['safety_margins_lower'])
avg_margin_upper = np.mean(soundness_report['safety_margins_upper'])
print(f"\n📏 Marges de sécurité moyennes :")
print(f" Borne inférieure : {avg_margin_lower:.6f}")
print(f" Borne supérieure : {avg_margin_upper:.6f}")
print(f"\n💡 Comparaison correcte :")
print(f" Formel : Bornes après {num_layers} couches")
print(f" Monte Carlo : Activations après {num_layers} couches")
print(f" ✅ Même niveau de réseau comparé")
return {
'success': True,
'soundness_report': soundness_report,
'formal_bounds': formal_bounds,
'monte_carlo_bounds': (mc_min.tolist(), mc_max.tolist()),
'num_layers': len(layers),
'num_outputs': len(formal_bounds),
'noise_level': noise_level,
'num_mc_samples': num_mc_samples
}
[docs]
def quick_soundness_check(
model_path: str,
test_image: np.ndarray,
epsilon: float = 0.01,
num_layers: int = 5,
verbose: bool = True
) -> bool:
"""
Vérification rapide de soundness
Args:
model_path: Chemin vers le modèle ONNX
test_image: Image de test (C, H, W)
epsilon: Niveau de bruit
num_layers: Nombre de couches à tester
verbose: Afficher les détails
Returns:
True si sound, False sinon
"""
result = verify_partial_soundness(
model_path,
test_image,
epsilon,
num_layers=num_layers,
num_mc_samples=50
)
if not result['success']:
if verbose:
print(f"Erreur : {result['error']}")
return False
is_sound = result['soundness_report']['is_sound']
if verbose and not is_sound:
violations = result['soundness_report']['violation_count']
total = result['soundness_report']['total_outputs']
print(f"Violations : {violations}/{total}")
return is_sound