"""
Moteur d'expressions affines pour la propagation formelle
"""
import numpy as np
from typing import Dict, Tuple, List
[docs]
class AffineExpression:
"""
Représente une expression affine : y = a0 + sum(aj * xj)
avec xj ∈ [lj, uj]
"""
[docs]
def __init__(self, constant: float, coefficients: Dict[int, float],
bounds: Dict[int, Tuple[float, float]]):
self.constant = constant
self.coefficients = coefficients # {index: coefficient}
self.bounds = bounds # {index: (lower, upper)}
[docs]
def get_bounds(self) -> Tuple[float, float]:
"""Calcule les bornes [min, max] de l'expression affine"""
lower = self.constant
upper = self.constant
for idx, coeff in self.coefficients.items():
l_bound, u_bound = self.bounds[idx]
if coeff >= 0:
lower += coeff * l_bound
upper += coeff * u_bound
else:
lower += coeff * u_bound
upper += coeff * l_bound
return (lower, upper)
[docs]
def __add__(self, other):
"""Addition de deux expressions affines"""
if isinstance(other, (int, float)):
return AffineExpression(
self.constant + other,
self.coefficients.copy(),
self.bounds.copy()
)
new_coeffs = self.coefficients.copy()
for idx, coeff in other.coefficients.items():
new_coeffs[idx] = new_coeffs.get(idx, 0.0) + coeff
return AffineExpression(
self.constant + other.constant,
new_coeffs,
{**self.bounds, **other.bounds}
)
[docs]
def __mul__(self, scalar):
"""Multiplication par un scalaire"""
return AffineExpression(
self.constant * scalar,
{idx: coeff * scalar for idx, coeff in self.coefficients.items()},
self.bounds.copy()
)
def __rmul__(self, scalar):
return self.__mul__(scalar)
[docs]
class AffineExpressionEngine:
"""Moteur de manipulation d'expressions affines"""
[docs]
@staticmethod
def linear_layer(expressions: List[AffineExpression],
weight: np.ndarray,
bias: np.ndarray = None) -> List[AffineExpression]:
"""Propage à travers une couche linéaire (MatMul + bias)"""
n_outputs = weight.shape[0]
result = []
for i in range(n_outputs):
# y_i = sum(w_ij * x_j) + b_i
constant = 0.0
coefficients = {}
bounds = {}
for j, expr in enumerate(expressions):
w = weight[i, j]
constant += w * expr.constant
for idx, coeff in expr.coefficients.items():
coefficients[idx] = coefficients.get(idx, 0.0) + w * coeff
if idx in expr.bounds:
bounds[idx] = expr.bounds[idx]
if bias is not None:
constant += bias[i]
result.append(AffineExpression(constant, coefficients, bounds))
return result
[docs]
@staticmethod
def conv2d_layer(expressions: List[AffineExpression],
weight: np.ndarray,
bias: np.ndarray = None,
input_shape: Tuple[int, int, int, int] = None,
stride: Tuple[int, int] = (1, 1),
padding: Tuple[int, int] = (0, 0),
dilation: Tuple[int, int] = (1, 1)) -> Tuple[List[AffineExpression], Tuple[int, int, int, int]]:
"""
Propage à travers une couche convolutionnelle 2D
Args:
expressions: Liste d'expressions affines (input flattened)
weight: Poids de convolution de forme (out_channels, in_channels, kH, kW)
bias: Biais de forme (out_channels,)
input_shape: (batch, in_channels, height, width)
stride: (stride_h, stride_w)
padding: (pad_h, pad_w)
dilation: (dilation_h, dilation_w)
Returns:
(expressions de sortie, output_shape)
"""
if input_shape is None:
# Essaie de déduire la forme depuis les expressions
raise ValueError("input_shape doit être spécifié pour Conv2D")
batch, in_channels, in_h, in_w = input_shape
out_channels, _, kernel_h, kernel_w = weight.shape
stride_h, stride_w = stride
pad_h, pad_w = padding
dil_h, dil_w = dilation
# Calcule les dimensions de sortie
out_h = (in_h + 2 * pad_h - dil_h * (kernel_h - 1) - 1) // stride_h + 1
out_w = (in_w + 2 * pad_w - dil_w * (kernel_w - 1) - 1) // stride_w + 1
# Reshape les expressions en tenseur 4D
input_tensor = np.array(expressions).reshape(batch, in_channels, in_h, in_w)
result = []
# Pour chaque position de sortie
for b in range(batch):
for oc in range(out_channels):
for oh in range(out_h):
for ow in range(out_w):
# Calcule la position de départ dans l'entrée
h_start = oh * stride_h - pad_h
w_start = ow * stride_w - pad_w
# Initialise l'expression de sortie
constant = 0.0
coefficients = {}
bounds = {}
# Applique le noyau de convolution
for ic in range(in_channels):
for kh in range(kernel_h):
for kw in range(kernel_w):
h_pos = h_start + kh * dil_h
w_pos = w_start + kw * dil_w
# Vérifie les limites (padding implicite = 0)
if 0 <= h_pos < in_h and 0 <= w_pos < in_w:
input_idx = b * (in_channels * in_h * in_w) + \
ic * (in_h * in_w) + \
h_pos * in_w + w_pos
expr = expressions[input_idx]
w_val = weight[oc, ic, kh, kw]
constant += w_val * expr.constant
for idx, coeff in expr.coefficients.items():
coefficients[idx] = coefficients.get(idx, 0.0) + w_val * coeff
if idx in expr.bounds:
bounds[idx] = expr.bounds[idx]
# Ajoute le biais
if bias is not None:
constant += bias[oc]
result.append(AffineExpression(constant, coefficients, bounds))
output_shape = (batch, out_channels, out_h, out_w)
return result, output_shape
[docs]
@staticmethod
def maxpool2d_layer(expressions: List[AffineExpression],
input_shape: Tuple[int, int, int, int],
kernel_size: Tuple[int, int],
stride: Tuple[int, int] = None,
padding: Tuple[int, int] = (0, 0)) -> Tuple[List[AffineExpression], Tuple[int, int, int, int]]:
"""
Propage à travers une couche MaxPool2D
Pour MaxPool, on utilise une relaxation: on prend l'expression avec la borne supérieure maximale
(approximation conservative)
Args:
expressions: Liste d'expressions affines
input_shape: (batch, channels, height, width)
kernel_size: (pool_h, pool_w)
stride: (stride_h, stride_w), par défaut = kernel_size
padding: (pad_h, pad_w)
Returns:
(expressions de sortie, output_shape)
"""
if stride is None:
stride = kernel_size
batch, channels, in_h, in_w = input_shape
pool_h, pool_w = kernel_size
stride_h, stride_w = stride
pad_h, pad_w = padding
# Calcule les dimensions de sortie
out_h = (in_h + 2 * pad_h - pool_h) // stride_h + 1
out_w = (in_w + 2 * pad_w - pool_w) // stride_w + 1
result = []
# Pour chaque position de sortie
for b in range(batch):
for c in range(channels):
for oh in range(out_h):
for ow in range(out_w):
h_start = oh * stride_h - pad_h
w_start = ow * stride_w - pad_w
# Collecte toutes les expressions dans la fenêtre de pooling
pool_expressions = []
for ph in range(pool_h):
for pw in range(pool_w):
h_pos = h_start + ph
w_pos = w_start + pw
# Vérifie les limites
if 0 <= h_pos < in_h and 0 <= w_pos < in_w:
input_idx = b * (channels * in_h * in_w) + \
c * (in_h * in_w) + \
h_pos * in_w + w_pos
pool_expressions.append(expressions[input_idx])
# Relaxation MaxPool: prend l'union des bornes
# Pour chaque variable de bruit, on prend le max des bornes supérieures
if pool_expressions:
result_expr = AffineExpressionEngine._maxpool_relaxation(pool_expressions)
result.append(result_expr)
output_shape = (batch, channels, out_h, out_w)
return result, output_shape
@staticmethod
def _maxpool_relaxation(expressions: List[AffineExpression]) -> AffineExpression:
"""
Relaxation pour MaxPool: calcule une sur-approximation conservative
Pour max(y1, y2, ..., yn), on utilise:
- Borne inférieure: max des bornes inférieures
- Borne supérieure: max des bornes supérieures
Pour l'expression affine, on utilise une approximation linéaire basée sur
les bornes.
"""
if len(expressions) == 1:
return expressions[0]
# Calcule les bornes de chaque expression
bounds_list = [expr.get_bounds() for expr in expressions]
# Trouve l'expression avec la borne supérieure maximale
max_upper_idx = max(range(len(bounds_list)), key=lambda i: bounds_list[i][1])
# Utilise cette expression comme base (approximation conservative)
base_expr = expressions[max_upper_idx]
# Ajuste les bornes pour être conservatives
all_bounds = {}
for expr in expressions:
for idx, bound in expr.bounds.items():
if idx not in all_bounds:
all_bounds[idx] = bound
else:
# Union des bornes
old_l, old_u = all_bounds[idx]
new_l, new_u = bound
all_bounds[idx] = (min(old_l, new_l), max(old_u, new_u))
# Crée une nouvelle expression avec les bornes élargies
return AffineExpression(
base_expr.constant,
base_expr.coefficients.copy(),
all_bounds
)
[docs]
@staticmethod
def avgpool2d_layer(expressions: List[AffineExpression],
input_shape: Tuple[int, int, int, int],
kernel_size: Tuple[int, int],
stride: Tuple[int, int] = None,
padding: Tuple[int, int] = (0, 0)) -> Tuple[List[AffineExpression], Tuple[int, int, int, int]]:
"""
Propage à travers une couche AvgPool2D (opération linéaire)
Args:
expressions: Liste d'expressions affines
input_shape: (batch, channels, height, width)
kernel_size: (pool_h, pool_w)
stride: (stride_h, stride_w)
padding: (pad_h, pad_w)
Returns:
(expressions de sortie, output_shape)
"""
if stride is None:
stride = kernel_size
batch, channels, in_h, in_w = input_shape
pool_h, pool_w = kernel_size
stride_h, stride_w = stride
pad_h, pad_w = padding
out_h = (in_h + 2 * pad_h - pool_h) // stride_h + 1
out_w = (in_w + 2 * pad_w - pool_w) // stride_w + 1
result = []
for b in range(batch):
for c in range(channels):
for oh in range(out_h):
for ow in range(out_w):
h_start = oh * stride_h - pad_h
w_start = ow * stride_w - pad_w
# Initialise l'expression moyenne
constant = 0.0
coefficients = {}
bounds = {}
count = 0
for ph in range(pool_h):
for pw in range(pool_w):
h_pos = h_start + ph
w_pos = w_start + pw
if 0 <= h_pos < in_h and 0 <= w_pos < in_w:
input_idx = b * (channels * in_h * in_w) + \
c * (in_h * in_w) + \
h_pos * in_w + w_pos
expr = expressions[input_idx]
constant += expr.constant
for idx, coeff in expr.coefficients.items():
coefficients[idx] = coefficients.get(idx, 0.0) + coeff
if idx in expr.bounds:
bounds[idx] = expr.bounds[idx]
count += 1
# Moyenne
if count > 0:
constant /= count
coefficients = {idx: coeff / count for idx, coeff in coefficients.items()}
result.append(AffineExpression(constant, coefficients, bounds))
output_shape = (batch, channels, out_h, out_w)
return result, output_shape