Source code for multimin.fitting

##################################################################
#                                                                #
# MultiMin: Multivariate Gaussian fitting                        #
#                                                                #
# Authors: Jorge I. Zuluaga                                      #
#                                                                #
##################################################################
# License: GNU Affero General Public License v3 (AGPL-3.0)       #
##################################################################

"""
Fitting classes for MoG to data and functions.

Contains:
- FitMoG: Fit MoG to dataset
- FitFunctionMoG: Fit MoG to function
"""

import pickle
import os
from hashlib import md5
import numpy as np
from scipy.optimize import minimize
from scipy import signal
from scipy.ndimage import uniform_filter1d

# Import from package modules
from .base import MultiMinBase
from .util import Util, Stats
from .mog import MixtureOfGaussians
from .plotting import multimin_watermark, MultiPlot


[docs] class FitMoG(MultiMinBase): r""" MoG Fitting handler. Theory ------ The fitting procedure is based on the Maximum Likelihood Estimation (MLE) method. Given a dataset of :math:`S` points :math:`\tilde U_k` (e.g., unbound orbital elements), the likelihood :math:`\mathcal{L}` of the MoG parameters is given by: .. math:: \mathcal{L}(\{w_k\}_M, \{\mu_k\}_M, \{\Sigma_k\}_M | \{\tilde U_k\}_S) = \prod_{i=1}^{S} \mathcal{C}_M(\tilde U_i) To find the optimal parameters, we minimize the negative normalized log-likelihood: .. math:: -\frac{\log \mathcal{L}}{S} = -\frac{1}{S} \sum_{i=1}^{S} \log \mathcal{C}_M(\tilde U_i) This quantity represents the average negative log-probability density of the data under the model. Minimizing this value is equivalent to maximizing the likelihood that the observed data was generated by the MoG model. Attributes ---------- ngauss : int Number of fitting MND. nvars : int Number of variables in each MND. mog : MixtureOfGaussians Fitting object of the class MoG. This object will have the result of the fitting procedure. solution : scipy.optimize.OptimizeResult Once the fitting is completed the solution object is returned. Attributes include: - fun: value of the function in the minimum. - x: value of the minimization parameters in the minimum. - nit: number of iterations of the minimization algorithm. - nfev: number of evaluations of the function. - success: if True it implies that the minimization fullfills all the conditions. Ndim : int Number of mus. Ncorr : int Number of correlations. minparams : numpy.ndarray Array of minimization parameters at any stage in minimization process. scales : numpy.ndarray Array of scales to convert minparams to uparams (unbound) and viceversa. uparams : numpy.ndarray Array of unbound minimization parameters. Notes ----- Objects of this class must be always initialized with the number of gaussians and the number of random variables. Examples -------- Example: Fitting a synthetic dataset. >>> # 1. Generate synthetic data >>> np.random.seed(1) >>> weights = [[1.0]] >>> mus = [[1.0, 0.5, -0.5]] >>> sigmas = [[1, 1.2, 2.3]] >>> angles = [[10*Angle.Deg, 30*Angle.Deg, 20*Angle.Deg]] >>> Sigmas = Stats.calc_covariance_from_rotation(sigmas, angles) >>> MoG = MixtureOfGaussians(mus=mus, weights=weights, Sigmas=Sigmas) >>> data = MoG.rvs(10000) >>> # 2. Initialize the fitter >>> F = FitMoG(ngauss=MoG.ngauss, nvars=MoG.nvars) >>> # 3. Set bounds (optional but recommended) >>> bounds = F.set_bounds(bounds=(0.1, 0.9*F._sigmax)) >>> # 4. Run the fit >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True), method=None, bounds=bounds) >>> # 5. Inspect results >>> print(F.mog) >>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor='None', color='r')) >>> # 6. Save/Load results >>> F.save_fit("/tmp/fit.pkl", useprefix=False) >>> F._load_fit("/tmp/fit.pkl") """ # Constants # Maximum value of sigma _sigmax = 10 _ignoreWarnings = True
[docs] def update_params(self, weights=None, mus=None, sigmas=None, rhos=None): """ Update the parameters of the MoG. Parameters ---------- weights : array_like, optional Weights of the gaussians. mus : array_like, optional Means of the gaussians. sigmas : array_like, optional Standard deviations of the gaussians. rhos : array_like, optional Correlation coefficients of the gaussians. """ if weights is not None: self.weights = np.array(weights) if self.normalize_weights: self.weights /= np.sum(self.weights) if mus is not None: mus = np.array(mus) if mus.shape != self.mus.shape: raise ValueError( f"Shape of mus {mus.shape} does not match current shape {self.mus.shape}" ) self.mus = mus update_cov = False if sigmas is not None: sigmas = np.array(sigmas) if sigmas.shape != self.sigmas.shape: raise ValueError( f"Shape of sigmas {sigmas.shape} does not match current shape {self.sigmas.shape}" ) self.sigmas = sigmas update_cov = True if rhos is not None: rhos = np.array(rhos) if rhos.shape != self.rhos.shape: raise ValueError( f"Shape of rhos {rhos.shape} does not match current shape {self.rhos.shape}" ) self.rhos = rhos update_cov = True if update_cov: self.Sigmas = Stats.calc_covariance_from_correlations( self.sigmas, self.rhos ) self.params = self.flatten_params()
def __init__(self, data=None, ngauss=1, nvars=None, domain=None, objfile=None): """ Initialize FitMoG object. Parameters ---------- data : numpy.ndarray, optional Array with data (Nsam x nvars). If provided, nvars is inferred unless explicitly given. ngauss : int Number of Gaussian components. nvars : int, optional Number of variables. If None and data is provided, inferred from data. domain : list, optional Domain for each variable: None (unbounded) or [low, high] per variable. Example: [None, [0, 1], None] for variable 1 bounded to [0, 1]. objfile : str, optional Path to a pickled fit to load. """ if objfile is not None: self._load_fit(objfile) elif isinstance(data, str): self._load_fit(data) else: # Check data and infer nvars self.data = None if data is not None: self.data = np.asarray(data) if self.data.ndim != 2: raise ValueError( f"Data must be 2D array (Nsam x nvars), got shape {self.data.shape}" ) data_nvars = self.data.shape[1] if nvars is None: nvars = data_nvars elif nvars != data_nvars: raise ValueError( f"Provided nvars={nvars} does not match data dimension {data_nvars}" ) if nvars is None: # If no data and no nvars, default to 2 (backward compatibility or default) nvars = 2 # Basic attributes self.ngauss = ngauss self.nvars = nvars self.Ndim = ngauss * nvars self.Ncorr = int(nvars * (nvars - 1) / 2) self.domain = domain # Define the model mogs self.mog = MixtureOfGaussians(ngauss=ngauss, nvars=nvars, domain=domain) # Set parameters self.set_params() # Report print("Loading a FitMoG object.") print(f"Number of gaussians: {self.ngauss}") print(f"Number of variables: {self.nvars}") print(f"Number of dimensions: {self.Ndim}") if self.data is not None: print(f"Number of samples: {len(self.data)}") else: print("Number of samples: 0 (No data provided)") if self.domain is not None: print(f"Domain: {self.domain}") # Compute the log-likelihood of the data if self.data is not None: print(f"Log-likelihood per point (-log L/N): {self.norm_log_l()}") # Other self.fig = None self.prefix = ""
[docs] def set_params(self, mu=0.5, sigma=1.0, rho=0.5): """ Set the value of the basic params (minparams, scales, etc.). It updates minparams, scales and uprams. Ex. F.set_params(mu=0.5, sigma=1.0, rho=0.5) Parameters ---------- mu : float, optional Value of all initial mus (default 0.5). sigma : float, optional Value of all initial sigmas (default 1.0). rho : float, optional Value of all initial rhos (default 0.5). Returns ------- None """ # Define the initial parameters # mus sigmas correlations minparams = ( [mu] * self.Ndim + [sigma] * self.Ndim + [1 + rho] * self.ngauss * self.Ncorr ) scales = ( [0] * self.Ndim + [self._sigmax] * self.Ndim + [2] * self.ngauss * self.Ncorr ) if self.ngauss > 1: self.extrap = [] minparams = [1 / self.ngauss] * self.ngauss + minparams scales = [1] * self.ngauss + scales else: self.extrap = [1] self.minparams = np.array(minparams) # When domain is finite, spread initial mus inside each variable's domain bounds = getattr(self.mog, "_domain_bounds", None) if bounds is not None and self.ngauss > 1: for j in range(self.nvars): lo, hi = bounds[j][0], bounds[j][1] if np.isfinite(lo) or np.isfinite(hi): a = lo if np.isfinite(lo) else hi - 1.0 b = hi if np.isfinite(hi) else lo + 1.0 for i in range(self.ngauss): idx = self.ngauss + i * self.nvars + j self.minparams[idx] = a + (i + 1) / (self.ngauss + 1) * (b - a) self.scales = np.array(scales) self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = False
[docs] def set_initial_params(self, mus=None, sigmas=None, rhos=None, from_mog=None): """ Set initial values for the minimization parameters (means, standard deviations, correlations). Only the arguments provided are updated; the rest keep their current values. Ex. F.set_initial_params(mus=[[0.2, 0.3], [0.8, 0.7]], sigmas=[[0.1, 0.2], [0.1, 0.2]]) Parameters ---------- mus : array-like, optional Initial means. Shape (ngauss, nvars) or (nvars,) to use the same means for all components. Example: [0.2, 0.3] or [[0.2, 0.3], [0.8, 0.7]]. sigmas : array-like, optional Initial standard deviations. Shape (ngauss, nvars) or (nvars,) to use the same for all components. rhos : array-like, optional Initial correlation coefficients (upper triangle). Shape (ngauss, Ncorr) or (Ncorr,) to use the same for all. Ncorr = nvars*(nvars-1)/2. from_mog : MixtureOfGaussians, optional If provided, initialize parameters from this MoG object. Must have same ngauss and nvars as check. Any other provided arguments (mus, sigmas, rhos) will override values from from_mog. Returns ------- None Examples -------- >>> F = FitMoG(ngauss=2, nvars=2) >>> F.set_initial_params(mus=[0.2, 0.3], sigmas=[0.1, 0.1]) # same for both components >>> F.set_initial_params(mus=[[0.2, 0.3], [0.8, 0.7]], sigmas=[[0.1, 0.2], [0.1, 0.2]]) >>> F.fit_data(data) """ if from_mog is not None: if from_mog.ngauss != self.ngauss or from_mog.nvars != self.nvars: raise ValueError( f"from_mog dimensions (ngauss={from_mog.ngauss}, nvars={from_mog.nvars}) " f"do not match FitMoG (ngauss={self.ngauss}, nvars={self.nvars})" ) # Use parameters from the provided mog as base # We want to use set_initial_params logic to properly broadcast/set minparams # So we extract lists/arrays from from_mog mus_mog = from_mog.mus sigmas_mog = from_mog.sigmas rhos_mog = from_mog.rhos # If mus/sigmas/rhos are NOT provided in arguments, use from_mog if mus is None: mus = mus_mog if sigmas is None: sigmas = sigmas_mog if rhos is None: rhos = rhos_mog mus = np.asarray(mus, dtype=float) if mus is not None else None sigmas = np.asarray(sigmas, dtype=float) if sigmas is not None else None rhos = np.asarray(rhos, dtype=float) if rhos is not None else None def _broadcast_2d(arr, shape_2d, name, dims_desc): """Broadcast to (ngauss, nvars) or (ngauss, Ncorr). If 1D, same row for all components.""" if arr is None: return None arr = np.atleast_1d(arr) if arr.ndim == 1: # Same values for all components if arr.shape[0] != shape_2d[1]: raise ValueError( "{} 1D must have length {} ({}), got {}".format( name, shape_2d[1], dims_desc, arr.shape[0] ) ) arr = np.tile(arr, (self.ngauss, 1)) else: arr = np.atleast_2d(arr) if arr.shape != shape_2d: raise ValueError( "{} must have shape {} or ({}), got {}".format( name, shape_2d, dims_desc, arr.shape ) ) return arr # minparams layout: [weights] + [mus] + [sigmas] + [1+rhos] if self.ngauss > 1: off_mu = self.ngauss off_sig = self.ngauss + self.Ndim off_rho = self.ngauss + 2 * self.Ndim else: off_mu = 0 off_sig = self.nvars off_rho = self.nvars * 2 # Also update weights if from_mog is used? # The user request only mentioned: "Cuando se pasa una mog como opción de la from_mog se fijan los parámetros iniciales del ajuste con los parámetros de esa mog." # Usually weights are also parameters. The weights are the first 'ngauss' params in minparams (if ngauss > 1). if from_mog is not None: if self.ngauss > 1: # Weights are at the beginning # Normalize just in case w = from_mog.weights if from_mog.normalize_weights: w = w / w.sum() self.minparams[0 : self.ngauss] = w if mus is not None: mus = _broadcast_2d(mus, (self.ngauss, self.nvars), "mus", "nvars") if self.ngauss > 1: self.minparams[off_mu : off_mu + self.Ndim] = mus.ravel() else: self.minparams[off_mu : off_mu + self.nvars] = mus.ravel() if sigmas is not None: sigmas = _broadcast_2d(sigmas, (self.ngauss, self.nvars), "sigmas", "nvars") if self.ngauss > 1: self.minparams[off_sig : off_sig + self.Ndim] = sigmas.ravel() else: self.minparams[off_sig : off_sig + self.nvars] = sigmas.ravel() if rhos is not None: rhos = _broadcast_2d(rhos, (self.ngauss, self.Ncorr), "rhos", "Ncorr") # Internally we store 1+rho so the unbound param is in a good range if self.ngauss > 1: self.minparams[off_rho : off_rho + self.ngauss * self.Ncorr] = ( 1.0 + rhos.ravel() ) else: self.minparams[off_rho : off_rho + self.Ncorr] = 1.0 + rhos.ravel() self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = True params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars)
def _stdcorr_to_minparams(self, stdcorr): """Inverse of pmap: stdcorr -> minparams (for use in normalized fit).""" minparams = np.array(stdcorr[len(self.extrap) :], dtype=float) if self.ngauss * self.Ncorr > 0: minparams[-self.ngauss * self.Ncorr :] += 1 return minparams def _stdcorr_from_weights_mus_sigmas_rhos(self, weights, mus, sigmas, rhos): """Build stdcorr from unpacked arrays (rhos as correlations, not 1+rho). Returns stdcorr without extrap (for use with mog.set_stdcorr). To get FitMoG's stdcorr format (with extrap), prepend self.extrap.""" return np.concatenate( [ np.asarray(weights).ravel(), np.asarray(mus).ravel(), np.asarray(sigmas).ravel(), np.asarray(rhos).ravel(), ] ) def _init_params_from_data_finite_domain(self, data): """When domain is finite, set initial mus from data percentiles and modest sigmas so the optimizer starts near the peaks.""" bounds = getattr(self.mog, "_domain_bounds", None) if bounds is None or self.ngauss < 2: return data = np.asarray(data) if data.size == 0: return # Flatten so we can index by variable if data.ndim == 1: data = np.reshape(data, (-1, 1)) n = data.shape[0] for j in range(self.nvars): lo, hi = bounds[j][0], bounds[j][1] if not (np.isfinite(lo) and np.isfinite(hi)): continue col = data[:, j] # Initial mus = percentiles of data along this dimension, sorted and clipped pct = np.linspace(5, 95, self.ngauss) mus_init = np.percentile(col, pct) mus_init = np.sort(mus_init) mus_init = np.clip(mus_init, lo, hi) for i in range(self.ngauss): idx = self.ngauss + i * self.nvars + j self.minparams[idx] = mus_init[i] # Initial sigmas: start with a fraction of domain width so we don't drift to flat width = hi - lo sigma_init = min(0.25 * width, max(0.05 * width, np.std(col) * 0.5)) for i in range(self.ngauss): idx = self.ngauss + self.Ndim + i * self.nvars + j if idx < len(self.minparams): self.minparams[idx] = sigma_init self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
[docs] def pmap(self, minparams): """ Mapping routine used in sample_mog_likelihood. Mapping may change depending on the complexity of the parameters to be minimized. Ex. stdcorr = F.pmap(minparams) Parameters ---------- minparams : numpy.ndarray Minimization parameters. Returns ------- stdcorr : numpy.ndarray Flatten parameters with correlations. """ stdcorr = np.array(self.extrap + list(minparams)) if self.ngauss * self.Ncorr > 0: stdcorr[-self.ngauss * self.Ncorr :] -= 1 return stdcorr
[docs] def log_l(self, data=None): """ Value of the -log(Likelihood). Ex. F.log_l(data) Parameters ---------- data : numpy.ndarray, optional Array with data (Nsam x nvars). If None, uses self.data. Returns ------- log_l : float Value of the -log(Likelihood). """ if data is None: data = self.data if data is None: raise ValueError("No data provided") log_l = self.mog.sample_mog_likelihood( self.uparams, data=data, pmap=self.pmap, tset="stdcorr", scales=self.scales ) return log_l
[docs] def norm_log_l(self, data=None): """ Calculate the normalized log-likelihood of the data. Parameters ---------- data : numpy.ndarray, optional Data for which to calculate the log-likelihood. If None, uses self.data. Returns ------- float Normalized log-likelihood per point (-log L/N). """ if data is None: data = self.data if data is None: raise ValueError("No data provided") data = np.asarray(data) log_likelihood = self.log_l(data) / len(data) return log_likelihood
@Util.timer def fit_data( self, data=None, verbose=0, advance=0, normalize=False, progress=False, **args ): """ Minimization procedure. It updates the solution attribute. Ex. F.fit_data(data, verbose=0, tol=1e-3) Parameters ---------- data : numpy.ndarray, optional Array with data (Nsam x nvars). If None, uses self.data. verbose : int, optional Verbosity level for the sample_mog_likelihood routine (default 0). advance : int, optional If larger than 0 show advance each "advance" iterations (default 0). normalize : bool, optional If True and domain is finite, fit in normalized space (each variable scaled to [0, 1] using the domain bounds). Improves conditioning when variables have very different scales (e.g. [0, 1.3], [0, 1], [0, 180]) and can yield more stable, reproducible minima. progress : str or bool, optional - "tqdm" or True: show a tqdm progress bar (requires tqdm). - "text": show text-based progress (equivalent to setting advance > 0). - False: no progress output (unless advance is set). **args : dict Options of the minimize routine (eg. tol=1e-6). A particularly interesting parameter is the minimization method. Available methods: * Slow but sure: Powell * Fast but unsure: CG, BFGS, COBYLA, SLSQP Returns ------- None Examples -------- >>> F = FitMoG(1, 3) >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True)) >>> F.fit_data(data, normalize=True) # recommended when variable scales differ a lot """ if data is None: data = self.data if data is None: raise ValueError("No data provided") # Handle progress options use_tqdm = False if progress == "tqdm" or (isinstance(progress, bool) and progress): use_tqdm = True elif progress == "text": if not advance: advance = 1 pbar = None if use_tqdm: try: from tqdm.auto import tqdm maxiter = args.get("maxiter") or args.get("options", {}).get("maxiter") pbar = tqdm(total=maxiter, desc="Minimizing") except ImportError: print("tqdm not installed, progress bar disabled.") if advance or pbar is not None: advance = int(advance) if advance else 0 self.neval = 0 def _advance(X, show=False): if pbar is not None: pbar.update(1) if advance > 0: if self.neval == 0: print(f"Iterations:") if self.neval % advance == 0 or show: vars = np.array2string( X, separator=", ", precision=4, max_line_width=np.inf, formatter={"float_kind": lambda x: f"{x:.2g}"}, ) fun = self.mog.sample_mog_likelihood( X, data, self.pmap, "stdcorr", self.scales, verbose ) print( f"Iter {self.neval}:\n\tVars: {vars}\n\tLogL/N: {fun / len(data)}" ) self.neval += 1 else: _advance = None self.data = np.copy(data) data = np.asarray(data) if data.ndim == 1: data = data.reshape(-1, 1) self.mog._ignoreWarnings = self._ignoreWarnings self.minargs = dict(method="Powell") self.minargs.update(args) # When domain is finite, set bounds so mus stay inside the domain and sigmas stay > 0 bounds_orig = getattr(self.mog, "_domain_bounds", None) has_finite_domain = bounds_orig is not None and any( np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1]) for j in range(self.nvars) ) # Optional: fit in normalized space [0,1] per variable for better conditioning if normalize and has_finite_domain: scale = np.zeros(self.nvars) offset = np.zeros(self.nvars) for j in range(self.nvars): lo, hi = bounds_orig[j][0], bounds_orig[j][1] if np.isfinite(lo) and np.isfinite(hi): offset[j], scale[j] = lo, hi - lo else: offset[j] = np.nanmin(data[:, j]) scale[j] = max(np.nanmax(data[:, j]) - offset[j], 1e-10) data_fit = (data - offset) / scale domain_norm = tuple([0.0, 1.0] for _ in range(self.nvars)) self.mog._domain_bounds = [tuple(x) for x in domain_norm] # Transform current minparams to normalized space stdcorr = self.pmap(self.minparams) i = len(self.extrap) # For ngauss==1 there are no weights in stdcorr after extrap; next are mus w = np.array([1.0]) if self.ngauss == 1 else stdcorr[i : i + self.ngauss] mus = stdcorr[ i + (self.ngauss if self.ngauss > 1 else 0) : i + (self.ngauss if self.ngauss > 1 else 0) + self.Ndim ].reshape(self.ngauss, self.nvars) sigmas = stdcorr[ i + (self.ngauss if self.ngauss > 1 else 0) + self.Ndim : i + (self.ngauss if self.ngauss > 1 else 0) + 2 * self.Ndim ].reshape(self.ngauss, self.nvars) rhos = stdcorr[ i + (self.ngauss if self.ngauss > 1 else 0) + 2 * self.Ndim : i + (self.ngauss if self.ngauss > 1 else 0) + 2 * self.Ndim + self.ngauss * self.Ncorr ].reshape(self.ngauss, self.Ncorr) mus_norm = (mus - offset) / scale sigmas_norm = sigmas / scale stdcorr_norm_no_extrap = self._stdcorr_from_weights_mus_sigmas_rhos( w, mus_norm, sigmas_norm, rhos ) # For ngauss==1, minparams has no weight; for ngauss>1, use _stdcorr_to_minparams if self.ngauss == 1: # stdcorr_norm_no_extrap = [w, mus, sigmas, rhos] = [1, 3, 3, 3] = 10 elements # minparams = [mus, sigmas, 1+rhos] = [3, 3, 3] = 9 elements self.minparams = np.concatenate( [ stdcorr_norm_no_extrap[1 : 1 + self.Ndim], # mus stdcorr_norm_no_extrap[ 1 + self.Ndim : 1 + 2 * self.Ndim ], # sigmas stdcorr_norm_no_extrap[1 + 2 * self.Ndim :] + 1.0, # 1+rhos ] ) else: stdcorr_norm = np.concatenate([self.extrap, stdcorr_norm_no_extrap]) self.minparams = self._stdcorr_to_minparams(stdcorr_norm) self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) sigma_hi_norm = min(0.99 * self._sigmax, 2.0) bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi_norm)) self.minargs["bounds"] = bounds_tuple if self.minargs.get("method") == "Powell": self.minargs["method"] = "L-BFGS-B" try: self.solution = minimize( self.mog.sample_mog_likelihood, self.uparams, callback=_advance, args=(data_fit, self.pmap, "stdcorr", self.scales, verbose), **self.minargs, ) finally: if pbar is not None: pbar.close() if advance: _advance(self.solution.x, show=True) self.minparams = Util.t_if(self.solution.x, self.scales, Util.u2f) stdcorr_norm = self.pmap(self.minparams) i = len(self.extrap) off = i + (self.ngauss if self.ngauss > 1 else 0) w = ( np.array([1.0]) if self.ngauss == 1 else stdcorr_norm[i : i + self.ngauss] ) mus_norm = stdcorr_norm[off : off + self.Ndim].reshape( self.ngauss, self.nvars ) sigmas_norm = stdcorr_norm[off + self.Ndim : off + 2 * self.Ndim].reshape( self.ngauss, self.nvars ) rhos = stdcorr_norm[ off + 2 * self.Ndim : off + 2 * self.Ndim + self.ngauss * self.Ncorr ].reshape(self.ngauss, self.Ncorr) mus_orig = mus_norm * scale + offset sigmas_orig = sigmas_norm * scale stdcorr_orig = self._stdcorr_from_weights_mus_sigmas_rhos( w, mus_orig, sigmas_orig, rhos ) self.mog._domain_bounds = bounds_orig self.mog.set_stdcorr(stdcorr_orig, self.nvars) # Convert mog's stdcorr to FitMoG's minparams (for ngauss=1, minparams has no weight) if self.ngauss == 1: mus_flat = mus_orig.ravel() sigmas_flat = sigmas_orig.ravel() rhos_flat = (rhos + 1.0).ravel() self.minparams = np.concatenate([mus_flat, sigmas_flat, rhos_flat]) else: stdcorr_orig_with_extrap = np.concatenate([self.extrap, stdcorr_orig]) self.minparams = self._stdcorr_to_minparams(stdcorr_orig_with_extrap) self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._update_prefix() return if has_finite_domain and "bounds" not in args: # bounds from domain; sigma: lower 1e-6 to avoid div by zero; upper limited so fit cannot go flat sigma_hi = 0.99 * self._sigmax domain_widths = [ self.mog._domain_bounds[j][1] - self.mog._domain_bounds[j][0] for j in range(self.nvars) if np.isfinite(self.mog._domain_bounds[j][0]) and np.isfinite(self.mog._domain_bounds[j][1]) ] if domain_widths: # Cap sigma at ~2x largest bounded dimension width to avoid flat (uniform) optimum sigma_hi = min(sigma_hi, 2.0 * max(domain_widths)) bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi)) self.minargs["bounds"] = bounds_tuple if self.minargs.get("method") == "Powell": self.minargs["method"] = "L-BFGS-B" # Data-based init: set initial mus (and sigmas) from data unless user set them via set_initial_params if not getattr(self, "_initial_params_set_by_user", False): self._init_params_from_data_finite_domain(data) try: self.solution = minimize( self.mog.sample_mog_likelihood, self.uparams, callback=_advance, args=(data, self.pmap, "stdcorr", self.scales, verbose), **self.minargs, ) finally: if pbar is not None: pbar.close() if advance: _advance(self.solution.x, show=True) self.uparams = self.solution.x # Set the new params # Set the new params self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f) params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars) self._update_prefix() def _load_fit(self, objfile): """ Load a fit object from file. """ F = pickle.load(open(objfile, "rb")) for k in F.__dict__.keys(): setattr(self, k, getattr(F, k)) self._update_prefix() def _gen_qsample(self, n_sim, resample=False): """ Generate or reuse the synthetic sample for Q-Q plots and quality checks. """ if resample or not hasattr(self, "mog_qsample") or self.mog_qsample is None: self.mog_qsample = self.mog.rvs(n_sim) # Check size consistency if we are reusing elif len(self.mog_qsample) != n_sim: # If the requested size is different, we must regenerate self.mog_qsample = self.mog.rvs(n_sim) return self.mog_qsample
[docs] def quality_of_fit( self, data=None, n_sim=None, resample=False, plot_qq=False, figsize=5, ax=None, plot_line=True, title="Q-Q Plot", verbose=False, ): r""" Calculate goodness-of-fit statistics using the Kolmogorov-Smirnov distance and the Coefficient of Determination (:math:`R^2`) relative to the identity line. Optionally generates a Q-Q plot to visually assess the quality of the fit. The **Kolmogorov-Smirnov Statistic** (:math:`D_{KS}`) measures the maximum absolute distance between the Cumulative Distribution Function (CDF) of the observed data and that of the model. It represents the "worst point" of local deviation. .. math:: D_{KS} = \sup_{x} | F_{obs}(x) - F_{model}(x) | The range of the statistic is :math:`D_{KS} \in [0, 1]`, where an acceptable value is :math:`D_{KS} \approx 0` (ideally < 0.05). The **Q-Q plot** is a non-parametric visual diagnostic tool used to assess whether a set of observed data follows a specific theoretical distribution. It compares the distribution of the **Negative Log-Likelihood** (:math:`S = -2 \ln \mathcal{L}`) of the real data against the model's prediction. If the fit is ideal, the points should align on the identity line :math:`y=x`. Deviations indicate: * **Curved ends**: tail deviations ("Heavy" or "Light" tails). * **Shift or slope change**: systematic errors in mean or variance. The **Coefficient of Determination on Identity** (:math:`R^2`) measures the variance explained by the model with respect to the ideal line :math:`y=x`. It penalizes both dispersion and bias. .. math:: R^2 = 1 - \frac{\sum (y_{obs} - x_{model})^2}{\sum (y_{obs} - \bar{y}_{obs})^2} The range is :math:`R^2 \in (-\infty, 1]`, where an acceptable value is :math:`R^2 \approx 1` (ideally > 0.9). Parameters ---------- data : numpy.ndarray, optional Observed data. If None, uses self.data. n_sim : int, optional Number of synthetic data points to generate. Default is 10 * len(data). resample : bool, optional If True, regenerate the synthetic sample even if one exists. plot_qq : bool, optional If True, generate the Q-Q plot. Default is False. figsize : int or tuple, optional Size of the figure (if creating a new one). ax : matplotlib.axes.Axes, optional Axes object to plot on. If None and plot_qq=True, creates a new figure. plot_line : bool, optional If True, plot the identity line y=x. title : str, optional Title of the plot. verbose : bool, optional If True, print the results. Returns ------- stats : dict Dictionary containing: - 'ks_dist': Kolmogorov-Smirnov distance between S_obs and S_sim (lower is better) - 'ks_pval': p-value from KS test (note: not strictly valid as parameters are estimated) - 'r2_identity': R-squared coefficient regarding identity line (closer to 1 is better) - 'ax': The axes object if plot_qq=True, else None. """ from scipy.stats import ks_2samp if data is None: data = self.data if data is None: raise ValueError("No data provided") data = np.asarray(data) n_samples = len(data) if n_sim is None: n_sim = 10 * n_samples if verbose: print( f"Calculating fit quality statistics for {n_samples} observed points..." ) # 1. Calculate S_obs = -2 * ln(L(x_obs)) pdf_vals_obs = self.mog.pdf(data) pdf_vals_obs = np.maximum(pdf_vals_obs, 1e-300) S_obs = -2 * np.log(pdf_vals_obs) S_obs.sort() # 2. Generate synthetic data and calculate S_sim if verbose: print(f"Generating {n_sim} synthetic points (resample={resample})...") data_sim = self._gen_qsample(n_sim, resample=resample) pdf_vals_sim = self.mog.pdf(data_sim) pdf_vals_sim = np.maximum(pdf_vals_sim, 1e-300) S_sim = -2 * np.log(pdf_vals_sim) S_sim.sort() # 3. Kolmogorov-Smirnov Statistic d_ks, p_val = ks_2samp(S_obs, S_sim) # 4. R2 relative to identity (y=x) # Interpolate S_sim to match quantiles of S_obs for pairwise comparison percentiles = np.linspace(0, 100, n_samples) S_sim_interp = np.percentile(S_sim, percentiles) # Residuals from identity line (y=x means S_obs = S_sim_interp) residuos = S_obs - S_sim_interp ss_res = np.sum(residuos**2) # Total sum of squares of S_obs around its mean ss_tot = np.sum((S_obs - np.mean(S_obs)) ** 2) # Avoid division by zero if ss_tot is 0 if ss_tot == 0: r2_identity = 0.0 # or nan else: r2_identity = 1 - (ss_res / ss_tot) if verbose: print(f"Fit Quality Results:") print(f" KS Distance (lower is better) : {d_ks:.4f}") print(f" R2 vs Identity (closer to 1 is better): {r2_identity:.4f}") # 5. Plot ret_ax = None if plot_qq: from matplotlib import pyplot as plt if ax is None: if isinstance(figsize, int): figsize = (figsize, figsize) fig, ax = plt.subplots(figsize=figsize) else: fig = ax.figure self.fig_qq = fig self.ax_qq = ax label = f"Data vs Model ($R^2$={r2_identity:.4f})" self.ax_qq.scatter(S_sim_interp, S_obs, alpha=0.5, label=label) if plot_line: # Identity line range min_val = min(S_obs.min(), S_sim_interp.min()) max_val = max(S_obs.max(), S_sim_interp.max()) self.ax_qq.plot( [min_val, max_val], [min_val, max_val], "r--", label="Perfect Fit (y=x)", ) self.ax_qq.set_xlabel("Theoretical Quantiles (-2 Log Likelihood)") self.ax_qq.set_ylabel("Observed Quantiles (-2 Log Likelihood)") self.ax_qq.set_title(title) self.ax_qq.legend(loc="upper left") self.ax_qq.grid(True, alpha=0.3) # Add KS statistic to the plot self.ax_qq.text( 0.95, 0.05, rf"$D_{{KS}} = {d_ks:.3f}$", transform=self.ax_qq.transAxes, ha="right", va="bottom", bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), ) multimin_watermark(self.ax_qq) return {"ks_dist": d_ks, "r2_identity": r2_identity}
[docs] def plot_fit( self, N=10000, figsize=2, properties=None, ranges=None, hargs=None, sargs=None, pargs=True, cargs=None, marginals=False, ): """ Plot the result of the fitting procedure. Ex. F.plot_fit(figsize=3, hargs=dict(bins=30), sargs=dict(s=0.5, color='r')) Parameters ---------- N : int, optional number of points used to build a representation of the marginal distributions (default 10000). figsize : int, optional Size of each axis (default 2). properties : list or dict, optional Property names or MultiPlot-style properties. List: each element as axis label, range=None. Dict: same as MultiPlot (keys with 'label' and optional 'range'). ranges : list, optional Ranges per variable; used only when properties is a list or None. hargs : dict, optional Dictionary with options for the hist2d function (or 1D hist when nvars=1). If provided, the sample is shown as a histogram. sargs : dict, optional Dictionary with options for the scatter plot. If provided, the sample is shown as a scatter plot. pargs : dict, optional Dictionary with options for the PDF plot. If None (default), defaults are used (color='k', lw=2, label='PDF'). To suppress PDF, pass an empty dict is not sufficient, but pargs is default. cargs : dict, optional Dictionary with options for the contour plot (mog_contour). If None (default), contours are not plotted. marginals : bool, optional Include marginal distributions on diagonal panels (default False). Returns ------- G : matplotlib.figure.Figure or MultiPlot Graphic handle. Examples -------- >>> F = FitMoG(1, 3) >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True)) >>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor=None, color='r')) """ # Set defaults if pargs is True: pargs = dict(color="k", lw=2, label="PDF") elif isinstance(pargs, dict): pargs = dict(pargs) pargs.setdefault("label", "PDF") # If pargs is None (or False), it remains None/False and we skip plotting if pargs: # Default zorder for PDF (background) pargs.setdefault("zorder", -100) # hargs/sargs are None by default as per user request (no plot unless specified) # "Escoger desde el principio el rango del pcolormesh para que coincida con el de los datos" # If ranges is not provided, compute it from data to ensure plot covers all points if ranges is None and properties is not None: # Check if properties is dict and has ranges (skip) # But if properties is list, calculate ranges from data if ( isinstance(properties, list) and self.data is not None and len(self.data) > 0 ): ranges = [] for i in range(self.nvars): dmin, dmax = self.data[:, i].min(), self.data[:, i].max() width = dmax - dmin margin = 0.05 * width if width > 0 else 1.0 ranges.append((float(dmin - margin), float(dmax + margin))) properties = Util.props_to_properties(properties, self.nvars, ranges) from matplotlib import pyplot as plt if self.nvars == 1: # Univariate: show fitted PDF; show sample as histogram (if hargs) and/or scatter (if sargs) G = MultiPlot(properties, figsize=figsize, marginals=marginals) ax = G.axs[0][0] if hargs is not None: G.sample_hist(self.data, **hargs) if sargs is not None: G.sample_scatter(self.data, **sargs) # Overlay fitted PDF # Check if we have data to determine range? If fitting was done, self.data or bounds are known. # Use MultiPlot logic mostly, but here we plot manually. # Or we can delegate to G.mog_pdf(self.mog)? # But FitMoG contains self.mog. # And self.data is the training data. # We want to plot the fitted regular PDF. # G.mog_pdf(self.mog, **pargs) would work and be cleaner! # But let's keep existing logic to be safe, just updated with pargs. # Overlay fitted PDF if pargs: x_min, x_max = self.data[:, 0].min(), self.data[:, 0].max() margin = max(1e-6, 0.1 * (x_max - x_min)) x_curve = np.linspace(x_min - margin, x_max + margin, 300) pdf_vals = self.mog.pdf(x_curve.reshape(-1, 1)) ax.plot(x_curve, pdf_vals, **pargs) # If no data plotted, set basic labels/limits if not hargs and not sargs: ax.set_ylabel("PDF") ax.set_ylim(0, None) # Legend handles, labels = ax.get_legend_handles_labels() if getattr(G, "_ax_twin", None) is not None: h2, l2 = G._ax_twin.get_legend_handles_labels() handles, labels = handles + h2, labels + l2 if handles: ax.legend( handles, labels, loc="lower center", bbox_to_anchor=(0.5, 1.02), ncol=len(handles), frameon=False, ) G.fig.subplots_adjust(top=0.88) if not getattr(G, "_watermark_added", False): multimin_watermark(G.axs[0][0], frac=0.5) self.fig = G.fig return G if self.nvars >= 2: G = MultiPlot(properties, figsize=figsize, marginals=marginals) # 1. Plot Scatter of Data (if requested) if sargs is not None: G.sample_scatter(self.data, **sargs) # 3. Plot Histogram of RVS (if requested) if hargs is not None: Xfits = self.mog.rvs(N) G.sample_hist(Xfits, **hargs) # 4. Plot MoG Contours (if requested) if cargs is not None: G.mog_contour(self.mog, **cargs) # 2. Plot MoG PDF (default) # Use pargs defaults if provided (which they are by default in this method) if pargs: G.mog_pdf(self.mog, **pargs) # If no layers, maybe warn? But mog_pdf is default. # SET RANGES ACCORDING TO DATA if self.data is not None: for i, prop in enumerate(G.properties): dmin, dmax = self.data[:, i].min(), self.data[:, i].max() # Force range to data limits (overriding default 4-sigma extents of PDF) G.dproperties[prop]["range"] = [dmin, dmax] G.set_ranges() G.fig.tight_layout() if not getattr(G, "_watermark_added", False): multimin_watermark(G.axs[0][0]) G._watermark_added = True self.fig = G.fig return G
def _inv_params(self, stdcorr): """ Invert stdcorr to minparams. """ minparams = np.copy(stdcorr) minparams[-self.ngauss * self.Ncorr :] += 1 self.minparams = minparams[1:] if self.ngauss == 1 else minparams def _update_prefix(self, myprefix=None): """ Update prefix of fit. Prefix has two parts: the number of gaussians used and a hash computed from the object. Prefix change if: - Data change. - Initial minimization parameters change (e.g. if the fit is ran twice) - Minimization parameters are changed. - Bounds are changed. Alternative prefix: >>> self.hash = md5(pickle.dumps([self.ngauss, self.data])).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.__dict__)).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.minparams)).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.mog)).hexdigest()[:5] """ self.hash = md5(pickle.dumps(self)).hexdigest()[:5] prefix_str = "" if myprefix is not None: prefix_str = f"_{myprefix}" self.prefix = f"{self.ngauss}mog{prefix_str}_{self.hash}"
[docs] def save_fit(self, objfile=None, useprefix=True, myprefix=None): """ Pickle the result of a fit. Ex. F.save_fit(objfile='fit.pkl') Parameters ---------- objfile : str, optional Name of the file where the fit will be stored. If None, the name is set by the routine as FitMoG.pkl. useprefix : bool, optional Use a prefix in the filename of the pickle file (default True). The prefix is normally {ngauss}mog_{hash}. myprefix : str, optional Custom prefix. Examples -------- If objfile="fit.pkl", the final filename will be fit-1mnd_asa33.pkl """ self.fig = None self._update_prefix(myprefix) if objfile is None: objfile = f"/tmp/FitMoG.pkl" if useprefix: parts = os.path.splitext(objfile) objfile = f"{parts[0]}-{self.prefix}{parts[1]}" # Avoid saving the temporary sample if hasattr(self, "mog_qsample"): del self.mog_qsample # Avoid saving the Q-Q plot figures if hasattr(self, "fig_qq"): del self.fig_qq if hasattr(self, "ax_qq"): del self.ax_qq pickle.dump(self, open(objfile, "wb"))
[docs] def set_bounds(self, boundw=None, bounds=None, boundr=None, boundsm=None): """ Set the minimization parameters. Ex. F.set_bounds(boundsm=((-2, 1), (-3, 0))) Parameters ---------- boundw : tuple, optional Bound of weights (default (-np.inf, np.inf)). bounds : tuple or list, optional Bounds of weights, mus, sigmas and rhos of each variable. boundr : tuple, optional Bound of rhos (default (-np.inf, np.inf)). boundsm : tuple of tuples, optional Bounds of averages (default (-np.inf, np.inf)). Normally the bounds on averages must be expressed in this way: boundsm = ((-min_1, max_1), (-min_2, max_2), ...) Example for nvars = 2: boundsm = ((-2, 1), (-3, 0)) Returns ------- bounds : tuple Formatted bounds for minimization. """ if boundsm is None: # Use domain as bounds for means when domain is finite if getattr(self.mog, "_domain_bounds", None) is not None: boundsm = tuple( (self.mog._domain_bounds[j][0], self.mog._domain_bounds[j][1]) for j in range(self.nvars) ) else: boundsm = ((-np.inf, np.inf),) * self.nvars # Regular bounds if boundw is None: boundw = (-np.inf, np.inf) else: boundw = tuple([Util.f2u(bw, 1) for bw in boundw]) if bounds is None: bounds = (-np.inf, np.inf) else: bounds = tuple([Util.f2u(bs, self._sigmax) for bs in bounds]) if boundr is None: boundr = (-np.inf, np.inf) else: boundr = tuple([Util.f2u(1 + br, 2) for br in boundr]) bounds = ( *((boundw,) * self.ngauss), *(boundsm * self.ngauss), *((bounds,) * self.nvars * self.ngauss), *((boundr,) * self.ngauss * int(self.nvars * (self.nvars - 1) / 2)), ) self.bounds = bounds if self.ngauss == 1: bounds = bounds[1:] self.mog._str_params() return bounds
[docs] class FitFunctionMoG(MultiMinBase): """ Fit a MoG to a function evaluated on a mesh (instead of to data points). The objective is to approximate the function by a composed multivariate normal distribution (MoG) by minimizing a loss between the function values and the MoG PDF on the same grid. Initialize with data=(Xmesh, Fmesh) and ngauss: - Univariate: Xmesh and Fmesh are 1D arrays (same length). - Multivariate: Xmesh is a tuple of ndarrays (e.g. from np.meshgrid), Fmesh is an array of the same shape as the grid. Attributes ---------- X : numpy.ndarray Grid points, shape (N, nvars). F : numpy.ndarray Function values at the grid points, shape (N,). ngauss, nvars, mog, minparams, scales, uparams Same as in FitMoG (inherited). normalization : float Scale factor fitted so that normalization * mog.pdf(X) approximates F. Set by fit_data(); None before fitting. See Also -------- quality_of_fit : Report R² and RMSE after fitting (call after fit_data). """ _sigmax = 10 _ignoreWarnings = True def __init__(self, data, ngauss=1): """ Initialize FitFunctionMoG from function-on-mesh data. The domain is always set from the data (min/max of Xmesh per variable). Parameters ---------- data : tuple (Xmesh, Fmesh) Xmesh: 1D array (univariate) or tuple of arrays (meshgrid, multivariate). Fmesh: function values at those points; same length or shape as the grid. ngauss : int Number of Gaussian components. Examples -------- >>> F = mn.FitFunctionMoG(data=(Xmesh, Fmesh), ngauss=3) """ if data is None: raise ValueError("data must be provided as (Xmesh, Fmesh)") if not isinstance(data, (tuple, list)): raise ValueError( "data must be a tuple or list of exactly 2 elements (Xmesh, Fmesh); " f"got {type(data).__name__}" ) if len(data) != 2: raise ValueError( "data must have exactly 2 elements (Xmesh, Fmesh); " f"got {len(data)} elements" ) Xmesh, Fmesh = data[0], data[1] self._X, self._F, nvars = self._parse_function_mesh_data(Xmesh, Fmesh) self.X = self._X self.F = self._F # Internal normalization for better numerical stability during optimization self._Fmax = float(np.max(np.abs(self.F))) if self._Fmax > 0: self._Fnorm = self.F / self._Fmax else: self._Fnorm = self.F self._Fmax = 1.0 self.ngauss = ngauss self.nvars = nvars self.Ndim = ngauss * nvars self.Ncorr = int(nvars * (nvars - 1) / 2) self.domain = [ [float(self.X[:, j].min()), float(self.X[:, j].max())] for j in range(nvars) ] self.mog = MixtureOfGaussians( ngauss=ngauss, nvars=nvars, domain=self.domain, normalize_weights=False ) self.normalization = None self._weight_scale = 100.0 widths = [self.domain[j][1] - self.domain[j][0] for j in range(nvars)] if widths: self._sigmax = max(self._sigmax, 2.0 * max(widths)) self.set_params() print("Loading a FitFunctionMoG object.") print(f"Number of gaussians: {self.ngauss}") print(f"Number of variables: {self.nvars}") print(f"Number of dimensions: {self.Ndim}") print(f"Number of grid points: {len(self.F)}") if self.domain is not None: print(f"Domain: {self.domain}")
[docs] def set_params(self, mu=0.5, sigma=1.0, rho=0.5): """ Set the value of the basic params (minparams, scales, uparams). Ex. F.set_params(mu=0.5, sigma=1.0, rho=0.5) """ minparams = ( [mu] * self.Ndim + [sigma] * self.Ndim + [1 + rho] * self.ngauss * self.Ncorr ) scales = ( [0] * self.Ndim + [self._sigmax] * self.Ndim + [2] * self.ngauss * self.Ncorr ) if self.ngauss > 1: self.extrap = [] minparams = [1 / self.ngauss] * self.ngauss + minparams w_scale = getattr(self, "_weight_scale", 1.0) scales = [w_scale] * self.ngauss + scales else: self.extrap = [1] self.minparams = np.array(minparams) bounds = getattr(self.mog, "_domain_bounds", None) if bounds is not None and self.ngauss > 1: for j in range(self.nvars): lo, hi = bounds[j][0], bounds[j][1] if np.isfinite(lo) or np.isfinite(hi): a = lo if np.isfinite(lo) else hi - 1.0 b = hi if np.isfinite(hi) else lo + 1.0 for i in range(self.ngauss): idx = self.ngauss + i * self.nvars + j self.minparams[idx] = a + (i + 1) / (self.ngauss + 1) * (b - a) self.scales = np.array(scales) self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = False
[docs] def pmap(self, minparams): """Map minparams to stdcorr for mog.set_stdcorr.""" stdcorr = np.array(self.extrap + list(minparams)) if self.ngauss * self.Ncorr > 0: stdcorr[-self.ngauss * self.Ncorr :] -= 1 return stdcorr
[docs] def set_bounds(self, boundw=None, bounds=None, boundr=None, boundsm=None): """Set bounds for minimization. Returns formatted bounds tuple.""" if boundsm is None: if getattr(self.mog, "_domain_bounds", None) is not None: boundsm = tuple( (self.mog._domain_bounds[j][0], self.mog._domain_bounds[j][1]) for j in range(self.nvars) ) else: boundsm = ((-np.inf, np.inf),) * self.nvars if boundw is None: boundw = (-np.inf, np.inf) else: boundw = tuple([Util.f2u(bw, 1) for bw in boundw]) if bounds is None: bounds = (-np.inf, np.inf) else: bounds = tuple([Util.f2u(bs, self._sigmax) for bs in bounds]) if boundr is None: boundr = (-np.inf, np.inf) else: boundr = tuple([Util.f2u(1 + br, 2) for br in boundr]) bounds = ( *((boundw,) * self.ngauss), *(boundsm * self.ngauss), *((bounds,) * self.nvars * self.ngauss), *((boundr,) * self.ngauss * int(self.nvars * (self.nvars - 1) / 2)), ) self.bounds = bounds if self.ngauss == 1: bounds = bounds[1:] self.mog._str_params() return bounds
[docs] def set_initial_params(self, mus=None, sigmas=None, rhos=None, from_mog=None): """Set initial values for the minimization parameters.""" if from_mog is not None: if from_mog.ngauss != self.ngauss or from_mog.nvars != self.nvars: raise ValueError( f"from_mog dimensions (ngauss={from_mog.ngauss}, nvars={from_mog.nvars}) " f"do not match FitFunctionMoG (ngauss={self.ngauss}, nvars={self.nvars})" ) # Use parameters from the provided mog as base mus_mog = from_mog.mus sigmas_mog = from_mog.sigmas rhos_mog = from_mog.rhos # If mus/sigmas/rhos are NOT provided in arguments, use from_mog if mus is None: mus = mus_mog if sigmas is None: sigmas = sigmas_mog if rhos is None: rhos = rhos_mog # Update weights if applicable (when ngauss > 1) if self.ngauss > 1: w = from_mog.weights if from_mog.normalize_weights: w = w / w.sum() self.minparams[0 : self.ngauss] = w mus = np.asarray(mus, dtype=float) if mus is not None else None sigmas = np.asarray(sigmas, dtype=float) if sigmas is not None else None rhos = np.asarray(rhos, dtype=float) if rhos is not None else None def _broadcast_2d(arr, shape_2d, name, dims_desc): if arr is None: return None arr = np.atleast_1d(arr) if arr.ndim == 1: if arr.shape[0] != shape_2d[1]: raise ValueError( f"{name} 1D must have length {shape_2d[1]} ({dims_desc}), got {arr.shape[0]}" ) arr = np.tile(arr, (self.ngauss, 1)) else: arr = np.atleast_2d(arr) if arr.shape != shape_2d: raise ValueError( f"{name} must have shape {shape_2d} or ({dims_desc}), got {arr.shape}" ) return arr if self.ngauss > 1: off_mu = self.ngauss off_sig = self.ngauss + self.Ndim off_rho = self.ngauss + 2 * self.Ndim else: off_mu = 0 off_sig = self.nvars off_rho = self.nvars * 2 if mus is not None: mus = _broadcast_2d(mus, (self.ngauss, self.nvars), "mus", "nvars") if self.ngauss > 1: self.minparams[off_mu : off_mu + self.Ndim] = mus.ravel() else: self.minparams[off_mu : off_mu + self.nvars] = mus.ravel() if sigmas is not None: sigmas = _broadcast_2d(sigmas, (self.ngauss, self.nvars), "sigmas", "nvars") if self.ngauss > 1: self.minparams[off_sig : off_sig + self.Ndim] = sigmas.ravel() else: self.minparams[off_sig : off_sig + self.nvars] = sigmas.ravel() if rhos is not None: rhos = _broadcast_2d(rhos, (self.ngauss, self.Ncorr), "rhos", "Ncorr") if self.ngauss > 1: self.minparams[off_rho : off_rho + self.ngauss * self.Ncorr] = ( 1.0 + rhos.ravel() ) else: self.minparams[off_rho : off_rho + self.Ncorr] = 1.0 + rhos.ravel() self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = True params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars)
def _init_params_from_grid(self, use_function_support=True): """ Set initial mus/sigmas from grid (self.X) when domain is finite. If use_function_support and self.F is available, spread mus only over points where F > 0.01*max(F) so components start in the support of the function. """ bounds = getattr(self.mog, "_domain_bounds", None) if bounds is None or self.ngauss < 2: return X = np.asarray(self.X) if X.size == 0: return F = np.asarray(self.F).ravel() if use_function_support else None threshold = ( max(0.01 * np.nanmax(F), 1e-300) if F is not None and F.size == X.shape[0] else None ) for j in range(self.nvars): lo, hi = bounds[j][0], bounds[j][1] if not (np.isfinite(lo) and np.isfinite(hi)): continue col = X[:, j] if threshold is not None: mask = F >= threshold if np.sum(mask) >= 2: col = col[mask] pct = np.linspace(5, 95, self.ngauss) mus_init = np.percentile(col, pct) mus_init = np.sort(mus_init) mus_init = np.clip(mus_init, lo, hi) for i in range(self.ngauss): idx = self.ngauss + i * self.nvars + j self.minparams[idx] = mus_init[i] width = np.ptp(col) if len(col) >= 2 else (hi - lo) sigma_init = min( 0.25 * (hi - lo), 0.2 * (hi - lo) / max(1, self.ngauss), max(0.05 * width, np.std(col) * 0.5), ) for i in range(self.ngauss): idx = self.ngauss + self.Ndim + i * self.nvars + j if idx < len(self.minparams): self.minparams[idx] = sigma_init self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) if self.ngauss > 1: self._set_initial_weights_from_function() def _init_params_from_function_support(self, frac=0.01): """ Set initial mus/sigmas from the support of the function (where F > frac*max(F)). Used when domain is not set so the optimizer starts in the region where the function is non-zero. """ F = np.asarray(self.F).ravel() X = np.asarray(self.X) if X.size == 0 or F.size == 0: return threshold = max(frac * np.nanmax(F), 1e-300) mask = F >= threshold if not np.any(mask): mask = np.ones(F.size, dtype=bool) for j in range(self.nvars): col = X[mask, j] if len(col) < 2: col = X[:, j] pct = np.linspace(5, 95, self.ngauss) mus_init = np.percentile(col, pct) mus_init = np.sort(mus_init) for i in range(self.ngauss): idx = self.ngauss + i * self.nvars + j self.minparams[idx] = mus_init[i] width = np.ptp(col) sigma_init = min( self._sigmax * 0.5, 0.2 * width / max(1, self.ngauss), max(0.05 * width, np.std(col) * 0.5), ) if width <= 0: sigma_init = 1.0 for i in range(self.ngauss): idx = self.ngauss + self.Ndim + i * self.nvars + j if idx < len(self.minparams): self.minparams[idx] = sigma_init self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) if self.ngauss > 1: self._set_initial_weights_from_function() def _set_initial_weights_from_function(self, n_sigma_window=2.0): """ Set initial weights from the function F by integrating F in a window around each mu. So peaks with more area get larger initial weight; avoids starting with equal weights. """ if self.ngauss < 2: return X = np.asarray(self.X) F = np.asarray(self.F).ravel() if X.size == 0 or F.size != X.shape[0]: return mus = np.array( [self.minparams[self.ngauss + i * self.nvars] for i in range(self.ngauss)] ) sigmas = np.array( [ self.minparams[self.ngauss + self.Ndim + i * self.nvars] for i in range(self.ngauss) ] ) x = X[:, 0] if self.nvars == 1 else X if self.nvars == 1: x = np.asarray(x).ravel() masses = np.zeros(self.ngauss) for i in range(self.ngauss): if self.nvars == 1: half = n_sigma_window * max(sigmas[i], 1e-6) in_window = (x >= mus[i] - half) & (x <= mus[i] + half) else: dist = np.linalg.norm(X - mus[i], axis=1) half = n_sigma_window * max(np.mean(sigmas), 1e-6) in_window = dist <= half masses[i] = np.sum(F[in_window]) if np.sum(masses) <= 0: return weights = masses / np.sum(masses) weights = np.clip(weights, 1e-6, 1.0 - 1e-6) weights = weights / np.sum(weights) self.minparams[: self.ngauss] = weights self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) def _loss_function(self, uparams): """ Least-squares loss between F and (normalization * mog.pdf(X)). Normalization is computed analytically at each evaluation to minimize the residual sum of squares. Uses internally normalized F for better numerical stability. """ minparams = Util.t_if(uparams, self.scales, Util.u2f) params = self.pmap(minparams) self.mog.set_stdcorr(params, self.nvars) pdf = self.mog.pdf(self.X) pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300) c = np.dot(self._Fnorm, pdf) / np.dot(pdf, pdf) residual = self._Fnorm - c * pdf return np.sum(residual**2) @Util.timer def fit_data( self, verbose=0, advance=0, mode="lsq", atol=None, rtol=None, maxgauss=50, wtol=1e-2, **args, ): """ Fit the MoG to the function on the mesh by nonlinear least squares. Minimizes sum over grid points of (F - normalization * mog.pdf(X))^2. The normalization is obtained analytically at each step so that the residual sum of squares is minimized for the current MoG parameters. After fitting, self.normalization holds the scale so that normalization * mog.pdf(X) ≈ F. Ex. F.fit_data(tol=1e-6, options=dict(maxiter=500)) Parameters ---------- verbose : int, optional Verbosity level (default 0). advance : int, optional If > 0, print progress every advance iterations (default 0). mode : str, optional Mode of fitting. Options: "lsq" (default), "multimodal", "noisy", "noisy_multimodal", or "adaptive". - "lsq": Standard least-squares fit with all parameters free. - "multimodal": Detects all peaks, sets ngauss to number of peaks found, fixes means to peak positions, fits only sigmas and weights. - "noisy": Smooths data first (~3% of points), detects peaks, selects the ngauss most prominent peaks, uses them as initial values for means, but leaves all parameters FREE to optimize. Best for noisy data. - "noisy_multimodal": Smooths data first (~3% of points), detects ALL peaks, sets ngauss to number of peaks found, fixes means at those positions, fits only sigmas and weights. Like "multimodal" but with smoothing for noisy data. - "adaptive": Incrementally increases ngauss from 1 until convergence criteria are met (controlled by atol and rtol). Automatically finds optimal number of Gaussians. atol : float, optional Absolute tolerance for R² in adaptive mode. Stops when R² >= atol (e.g. 0.99). Only used when mode="adaptive". rtol : float, optional Relative tolerance for R² improvement in adaptive mode. Stops when the average improvement over the last 3 ngauss values is less than rtol (e.g. 0.01). Evaluates improvement trend to avoid stopping prematurely. Only used when mode="adaptive". maxgauss : int, optional Maximum number of Gaussians to try in adaptive mode (default 50). Only used when mode="adaptive". wtol : float, optional Weight tolerance for pruning spurious Gaussians in adaptive mode (default 1e-2). After finding optimal ngauss, removes Gaussians with weights < wtol and refits with remaining components. Helps eliminate spurious components. Only used when mode="adaptive". **args Passed to scipy.optimize.minimize (e.g. method, tol, options). """ from scipy.signal import find_peaks self.mog._ignoreWarnings = self._ignoreWarnings minargs = dict(method="Powell") minargs.update(args) # Handle adaptive mode if mode == "adaptive": if atol is None and rtol is None: raise ValueError( "adaptive mode requires at least one of 'atol' or 'rtol' to be specified" ) if verbose > 0: print(f"Adaptive mode: starting with ngauss=1, maxgauss={maxgauss}") if atol is not None: print(f" Stop criterion: R² >= {atol}") if rtol is not None: print(f" Stop criterion: avg(ΔR²) over last 3 < {rtol}") r2_history = [] # Keep history of last 3 R² values best_ngauss = 1 best_r2 = -np.inf for ng in range(1, maxgauss + 1): # Re-initialize with new ngauss self.ngauss = ng self.Ndim = self.ngauss * self.nvars self.Ncorr = int(self.nvars * (self.nvars - 1) / 2) self.mog = MixtureOfGaussians( ngauss=self.ngauss, nvars=self.nvars, domain=self.domain, normalize_weights=False, ) self.set_params() # Fit with standard lsq mode (reuse code by calling fit_data recursively with mode="lsq") # To avoid infinite recursion, we call the fitting code directly if verbose > 0: print(f"\n Trying ngauss={ng}...") # Initialize parameters if not set by user if not getattr(self, "_initial_params_set_by_user", False): bounds_orig = getattr(self.mog, "_domain_bounds", None) has_finite_domain = bounds_orig is not None and any( np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1]) for j in range(self.nvars) ) if has_finite_domain: self._init_params_from_grid() else: self._init_params_from_function_support() # Perform optimization self.neval = 0 def _advance_adaptive(x, show=False): if advance > 0 and (self.neval % advance == 0 or show): if self.neval == 0: print(f" Iterations:") loss = self._loss_function(x) print(f" Iter {self.neval}: loss = {loss:.6g}") self.neval += 1 callback = _advance_adaptive if advance else None self.solution = minimize( self._loss_function, self.uparams, callback=callback, **minargs, ) if advance: _advance_adaptive(self.solution.x, show=True) self.uparams = self.solution.x self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f) params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars) pdf = self.mog.pdf(self.X) pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300) self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf)) # Calculate R² out = self.quality_of_fit(verbose=False) r2_current = out["R2"] if verbose > 0: print(f" R² = {r2_current:.6f}") # Add to history r2_history.append(r2_current) # Update best if r2_current > best_r2: best_r2 = r2_current best_ngauss = ng # Check stopping criteria stop_atol = atol is not None and r2_current >= atol # Evaluate rtol based on average improvement over last 3 values stop_rtol = False if rtol is not None and len(r2_history) >= 3: # Get last 3 R² values last_3 = r2_history[-3:] # Calculate improvements between consecutive values improvements = [ last_3[i + 1] - last_3[i] for i in range(len(last_3) - 1) ] avg_improvement = np.mean(improvements) stop_rtol = avg_improvement < rtol if verbose > 1: print(f" Last 3 R²: {last_3}") print(f" Improvements: {improvements}") print(f" Avg improvement: {avg_improvement:.6f}") if stop_atol or stop_rtol: if verbose > 0: if stop_atol: print( f"\n✅ Stopped: R² = {r2_current:.6f} >= atol = {atol}" ) if stop_rtol: last_3 = r2_history[-3:] improvements = [ last_3[i + 1] - last_3[i] for i in range(len(last_3) - 1) ] avg_improvement = np.mean(improvements) print( f"\n✅ Stopped: avg(ΔR²) over last 3 = {avg_improvement:.6f} < rtol = {rtol}" ) print(f"Final ngauss = {ng}") # Prune spurious Gaussians with small weights self._prune_and_refit_adaptive(wtol, verbose, advance, minargs) return # Reached maxgauss if verbose > 0: print(f"\n⚠️ Reached maxgauss = {maxgauss}") print(f"Final R² = {best_r2:.6f} at ngauss = {best_ngauss}") # Prune spurious Gaussians with small weights self._prune_and_refit_adaptive(wtol, verbose, advance, minargs) return # Handle multimodal mode if mode == "multimodal": from scipy.signal import peak_widths, peak_prominences F_flat = np.asarray(self.F).ravel() # Detect peaks # Use a relative height threshold height_threshold = 0.05 * np.max(F_flat) if np.max(F_flat) > 0 else None peaks, properties = find_peaks(F_flat, height=height_threshold) if len(peaks) == 0: print( "Warning: No peaks detected in multimodal mode. Fallback to existing configuration." ) else: self.ngauss = len(peaks) # Calculate widths and prominences widths_samples, _, _, _ = peak_widths(F_flat, peaks, rel_height=0.5) prominences, _, _ = peak_prominences(F_flat, peaks) # Re-initialize MoG and parameters structure with new ngauss self.Ndim = self.ngauss * self.nvars self.Ncorr = int(self.nvars * (self.nvars - 1) / 2) self.mog = MixtureOfGaussians( ngauss=self.ngauss, nvars=self.nvars, domain=self.domain, normalize_weights=False, ) self.set_params() has_weights = self.ngauss > 1 off_mu = self.ngauss if has_weights else 0 off_sig = off_mu + self.Ndim # Set means to peak positions mus_peaks = self.X[peaks] # Set mus for i in range(self.ngauss): idx_base = off_mu + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i] # Set sigmas based on widths # widths_samples is FWHM in indices. # Convert to physical units. # Assuming approximate uniform grid for width estimation if len(self.X) > 1: dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1)) else: dx = 1.0 sigma_conversion = 1.0 / 2.355 # FWHM to sigma (approx) for i in range(self.ngauss): width_physical = widths_samples[i] * dx # Use a fraction of FWHM as sigma estimate sigma_est = float(width_physical * sigma_conversion) sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax)) idx_base = off_sig + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = sigma_est # Set weights based on prominences (or peak heights) # minparams[0:ngauss] are weights (if ngauss > 1) if has_weights: weights = prominences / np.sum(prominences) self.minparams[: self.ngauss] = weights # Function to create tight bounds for fixed parameters def fix_val(val): return (val - 1e-9, val + 1e-9) # Construct bounds # Recalculate component bounds # Weights boundw = (-np.inf, np.inf) # Transformed weights # Sigmas # We want standard bounds for sigmas. # In multimodal mode: restrict sigma to 2 * domain_width bounds_orig = getattr(self.mog, "_domain_bounds", None) domain_widths = [ bounds_orig[j][1] - bounds_orig[j][0] for j in range(self.nvars) if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1]) ] sigma_hi = 0.99 * self._sigmax if domain_widths: width_max = max(domain_widths) sigma_hi = min(sigma_hi, 2.0 * width_max) bound_sigma_trans = ( Util.f2u(1e-6, self._sigmax), Util.f2u(sigma_hi, self._sigmax), ) # Rhos bound_rho_trans = (-np.inf, np.inf) # Bounds list construction new_bounds = [] # Weights (if ngauss > 1) if has_weights: new_bounds.extend([boundw] * self.ngauss) # Means (Fixed) for i in range(self.ngauss): for j in range(self.nvars): mu_val = mus_peaks[i, j] new_bounds.append((mu_val - 1e-9, mu_val + 1e-9)) # Sigmas new_bounds.extend([bound_sigma_trans] * self.Ndim) # Rhos new_bounds.extend([bound_rho_trans] * (self.ngauss * self.Ncorr)) minargs["bounds"] = tuple(new_bounds) # Sync uparams self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = True if minargs.get("method") == "Powell": minargs["method"] = "L-BFGS-B" # Handle noisy mode (free means - uses peak positions as initial values) if mode == "noisy": from scipy.signal import peak_widths, peak_prominences from scipy.ndimage import uniform_filter1d if self.nvars != 1: raise ValueError( "mode='noisy' is only supported for univariate functions (nvars=1)" ) F_flat = np.asarray(self.F).ravel() # Smooth the data using a window ~2-3% of points n_points = len(F_flat) window_size = max(7, int(n_points * 0.03)) if window_size % 2 == 0: window_size += 1 F_smooth = uniform_filter1d(F_flat, size=window_size, mode="nearest") # Detect peaks in smoothed data height_threshold = 0.05 * np.max(F_smooth) if np.max(F_smooth) > 0 else None prominence_threshold = ( 0.005 * np.max(F_smooth) if np.max(F_smooth) > 0 else None ) peaks, properties = find_peaks( F_smooth, height=height_threshold, prominence=prominence_threshold ) if len(peaks) == 0: print( "Warning: No peaks detected in noisy mode. Using default initialization." ) else: # Select the ngauss most prominent peaks widths_samples, _, _, _ = peak_widths(F_smooth, peaks, rel_height=0.5) prominences, _, _ = peak_prominences(F_smooth, peaks) if len(peaks) < self.ngauss: print( f"Warning: Only {len(peaks)} peaks detected but ngauss={self.ngauss}. " f"Using all {len(peaks)} detected peaks." ) num_peaks_to_use = len(peaks) selected_indices = np.arange(len(peaks)) else: num_peaks_to_use = self.ngauss # Sort by prominence and select top ngauss sorted_indices = np.argsort(prominences)[::-1] selected_indices = sorted_indices[: self.ngauss] selected_peaks = peaks[selected_indices] selected_widths = widths_samples[selected_indices] selected_prominences = prominences[selected_indices] # Sort selected peaks by position for consistency position_order = np.argsort(selected_peaks) selected_peaks = selected_peaks[position_order] selected_widths = selected_widths[position_order] selected_prominences = selected_prominences[position_order] # Set initial parameters based on detected peaks has_weights = self.ngauss > 1 off_mu = self.ngauss if has_weights else 0 off_sig = off_mu + self.Ndim # Set means to peak positions (as INITIAL VALUES, not fixed) mus_peaks = self.X[selected_peaks] for i in range(num_peaks_to_use): idx_base = off_mu + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i] # Set sigmas based on widths if len(self.X) > 1: dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1)) else: dx = 1.0 sigma_conversion = 1.0 / 2.355 for i in range(num_peaks_to_use): width_physical = selected_widths[i] * dx sigma_est = float(width_physical * sigma_conversion) sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax)) idx_base = off_sig + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = sigma_est # Set weights based on prominences if has_weights: weights = selected_prominences / np.sum(selected_prominences) self.minparams[:num_peaks_to_use] = weights # Update uparams and mark as user-set self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = True # Handle noisy_multimodal mode (fixed means at peak positions) elif mode == "noisy_multimodal": from scipy.signal import peak_widths, peak_prominences from scipy.ndimage import uniform_filter1d if self.nvars != 1: raise ValueError( "mode='noisy' is only supported for univariate functions (nvars=1)" ) F_flat = np.asarray(self.F).ravel() # Smooth the data using a window ~2-3% of points for effective noise reduction # while preserving multiple peaks n_points = len(F_flat) # Window size: approximately 3% of points (balance between smoothing and peak preservation) window_size = max(7, int(n_points * 0.03)) if window_size % 2 == 0: # Make it odd for symmetry window_size += 1 F_smooth = uniform_filter1d(F_flat, size=window_size, mode="nearest") # Detect peaks in smoothed data with balanced thresholds # Height threshold: 5% of max to catch significant peaks height_threshold = 0.05 * np.max(F_smooth) if np.max(F_smooth) > 0 else None # Prominence threshold: 7% of max (balanced between catching real peaks and avoiding noise) prominence_threshold = ( 0.07 * np.max(F_smooth) if np.max(F_smooth) > 0 else None ) peaks, properties = find_peaks( F_smooth, height=height_threshold, prominence=prominence_threshold ) if len(peaks) == 0: print( "Warning: No peaks detected in noisy_multimodal mode. Fallback to existing configuration." ) else: # Use ALL detected peaks (like multimodal mode) self.ngauss = len(peaks) # Re-initialize with new ngauss self.Ndim = self.ngauss * self.nvars self.Ncorr = int(self.nvars * (self.nvars - 1) / 2) self.mog = MixtureOfGaussians( ngauss=self.ngauss, nvars=self.nvars, domain=self.domain, normalize_weights=False, ) self.set_params() # Calculate widths and prominences widths_samples, _, _, _ = peak_widths(F_smooth, peaks, rel_height=0.5) prominences, _, _ = peak_prominences(F_smooth, peaks) has_weights = self.ngauss > 1 off_mu = self.ngauss if has_weights else 0 off_sig = off_mu + self.Ndim # Set means to peak positions mus_peaks = self.X[peaks] for i in range(self.ngauss): idx_base = off_mu + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i] # Set sigmas based on widths if len(self.X) > 1: dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1)) else: dx = 1.0 sigma_conversion = 1.0 / 2.355 # FWHM to sigma for i in range(self.ngauss): width_physical = widths_samples[i] * dx sigma_est = float(width_physical * sigma_conversion) sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax)) idx_base = off_sig + i * self.nvars self.minparams[idx_base : idx_base + self.nvars] = sigma_est # Set weights based on prominences if has_weights: weights = prominences / np.sum(prominences) self.minparams[: self.ngauss] = weights # Construct bounds (means fixed, sigmas and weights free) boundw = (-np.inf, np.inf) bounds_orig = getattr(self.mog, "_domain_bounds", None) domain_widths = [ bounds_orig[j][1] - bounds_orig[j][0] for j in range(self.nvars) if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1]) ] sigma_hi = 0.99 * self._sigmax if domain_widths: width_max = max(domain_widths) sigma_hi = min(sigma_hi, 2.0 * width_max) bound_sigma_trans = ( Util.f2u(1e-6, self._sigmax), Util.f2u(sigma_hi, self._sigmax), ) bound_rho_trans = (-np.inf, np.inf) new_bounds = [] if has_weights: new_bounds.extend([boundw] * self.ngauss) # Fix means for i in range(self.ngauss): for j in range(self.nvars): mu_val = mus_peaks[i, j] new_bounds.append((mu_val - 1e-9, mu_val + 1e-9)) # Free sigmas new_bounds.extend([bound_sigma_trans] * self.Ndim) # Free rhos new_bounds.extend([bound_rho_trans] * (self.ngauss * self.Ncorr)) minargs["bounds"] = tuple(new_bounds) self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u) self._initial_params_set_by_user = True if minargs.get("method") == "Powell": minargs["method"] = "L-BFGS-B" bounds_orig = getattr(self.mog, "_domain_bounds", None) has_finite_domain = bounds_orig is not None and any( np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1]) for j in range(self.nvars) ) if ( has_finite_domain and "bounds" not in args and mode not in ["multimodal", "noisy_multimodal"] ): domain_widths = [ bounds_orig[j][1] - bounds_orig[j][0] for j in range(self.nvars) if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1]) ] sigma_hi = 0.99 * self._sigmax if domain_widths: width_max = max(domain_widths) sigma_hi = min(sigma_hi, 0.25 * width_max) bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi)) minargs["bounds"] = bounds_tuple if minargs.get("method") == "Powell": minargs["method"] = "L-BFGS-B" if not getattr(self, "_initial_params_set_by_user", False): self._init_params_from_grid() else: if not getattr(self, "_initial_params_set_by_user", False): self._init_params_from_function_support() self.neval = 0 def _advance(x, show=False): if advance > 0 and (self.neval % advance == 0 or show): if self.neval == 0: print("Iterations:") loss = self._loss_function(x) print(f" Iter {self.neval}: loss = {loss:.6g}") self.neval += 1 callback = _advance if advance else None self.solution = minimize( self._loss_function, self.uparams, callback=callback, **minargs, ) if advance: _advance(self.solution.x, show=True) self.uparams = self.solution.x self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f) params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars) pdf = self.mog.pdf(self.X) pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300) self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf)) def _prune_and_refit_adaptive(self, wtol, verbose, advance, minargs): """ Prune Gaussians with small weights and refit with remaining components. Used in adaptive mode to eliminate spurious Gaussians after convergence. Parameters ---------- wtol : float Weight threshold. Gaussians with weight < wtol are removed. verbose : int Verbosity level. advance : int Print progress every advance iterations. minargs : dict Minimization arguments. """ # Get current weights weights = self.mog.weights.ravel() if verbose > 0: print(f"\n🔍 Pruning Gaussians with weights < {wtol}:") print(f" Current weights: {weights}") # Identify Gaussians to keep (weight >= wtol) keep_mask = weights >= wtol n_keep = np.sum(keep_mask) if n_keep == self.ngauss: if verbose > 0: print( f" ✅ All {self.ngauss} Gaussians have sufficient weight. No pruning needed." ) return if n_keep == 0: if verbose > 0: print(f" ⚠️ All Gaussians have weight < {wtol}. Keeping at least 1.") # Keep the Gaussian with largest weight keep_mask = np.zeros_like(weights, dtype=bool) keep_mask[np.argmax(weights)] = True n_keep = 1 # Extract parameters of Gaussians to keep mus_keep = self.mog.mus[keep_mask] sigmas_keep = self.mog.sigmas[keep_mask] weights_keep = weights[keep_mask] if verbose > 0: n_removed = self.ngauss - n_keep print(f" 🗑️ Removing {n_removed} Gaussian(s) with small weights") print(f" ✅ Keeping {n_keep} Gaussian(s)") print(f" Refitting with pruned components...") # Re-initialize with reduced ngauss self.ngauss = n_keep self.Ndim = self.ngauss * self.nvars self.Ncorr = int(self.nvars * (self.nvars - 1) / 2) self.mog = MixtureOfGaussians( ngauss=self.ngauss, nvars=self.nvars, domain=self.domain, normalize_weights=False, ) self.set_params() # Set initial parameters from pruned components self.set_initial_params(mus=mus_keep, sigmas=sigmas_keep) # Perform optimization with standard lsq self.neval = 0 def _advance_prune(x, show=False): if advance > 0 and (self.neval % advance == 0 or show): if self.neval == 0: print(f" Iterations:") loss = self._loss_function(x) print(f" Iter {self.neval}: loss = {loss:.6g}") self.neval += 1 callback = _advance_prune if advance else None self.solution = minimize( self._loss_function, self.uparams, callback=callback, **minargs, ) if advance: _advance_prune(self.solution.x, show=True) self.uparams = self.solution.x self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f) params = self.pmap(self.minparams) self.mog.set_stdcorr(params, self.nvars) pdf = self.mog.pdf(self.X) pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300) self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf)) if verbose > 0: out = self.quality_of_fit(verbose=False) print(f" ✅ Refit complete: ngauss = {self.ngauss}, R² = {out['R2']:.6f}")
[docs] def quality_of_fit(self, verbose=True): """ Report quality-of-fit statistics after fitting. Computes the coefficient of determination (R²) and the root mean square error (RMSE) between the mesh function F and the fitted model (normalization * mog.pdf(X)). Call after fit_data(). R² (coefficient of determination) R² = 1 - SS_res / SS_tot, where SS_res = sum((F - F_pred)²), SS_tot = sum((F - mean(F))²). R² = 1 means a perfect fit; R² = 0 means the model explains no variance beyond the mean; R² < 0 is possible if the fit is worse than using the constant mean(F). RMSE (root mean square error) RMSE = sqrt(mean((F - F_pred)²)). Same units as F; lower is better. Parameters ---------- verbose : bool, optional If True (default), print a short report. Returns ------- dict Keys: 'R2', 'RMSE', 'SS_res', 'SS_tot', 'n_points'. Raises ------ ValueError If fit_data() has not been run (normalization is None). """ if self.normalization is None: raise ValueError( "quality_of_fit requires a fitted model; run fit_data() first" ) F_obs = np.asarray(self.F).ravel().astype(float) pdf = self.mog.pdf(self.X) pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300) F_pred = self.normalization * pdf F_pred = np.asarray(F_pred).ravel().astype(float) n = F_obs.size ss_res = float(np.sum((F_obs - F_pred) ** 2)) mean_f = float(np.mean(F_obs)) ss_tot = float(np.sum((F_obs - mean_f) ** 2)) if ss_tot > 0: r2 = 1.0 - ss_res / ss_tot else: r2 = float("nan") rmse = np.sqrt(np.mean((F_obs - F_pred) ** 2)) out = { "R2": r2, } if verbose: print("Quality of fit (after fit_data):") print(f" R² = {out['R2']:.6g} (1 = perfect, 0 = no better than mean)") return out
[docs] def plot_fit( self, figsize=(8, 5), xlabel=None, ylabel=None, n_curve=300, sargs=True, pargs=True, hargs=None, largs=None, dargs=None, **kwargs, ): """ Plot original function (data) and fitted MoG for univariate fits. Draws the mesh data (X, F) and the fitted model normalization * mog.pdf(X). Only implemented for nvars == 1. Ex. F.plot_fit(xlabel=r'$\\tau$', ylabel='ISR', sargs=dict(ms=2, color='C0')) Parameters ---------- figsize : tuple, optional Figure size (default (8, 5)). xlabel : str, optional Label for the x-axis. ylabel : str, optional Label for the y-axis. n_curve : int, optional Number of points for the smooth PDF curve (default 300). sargs : dict or bool or None, optional Options for the data (points) plot (passed to ax.plot or ax.scatter). - If True (default): plots data with defaults (marker='+', linestyle='-', label='data'). - If dict: plots data with provided options. - If None: data is not plotted. pargs : dict or bool or None, optional Options for the fit line (norm × MoG PDF). - If True (default): plots PDF with default options (color='r', lw=2, label='fit'). - If dict: plots PDF with provided options. - If None: PDF is not plotted. Passed to ax.plot. dargs : dict or bool or None, optional Options for the component lines. - If True or dict: plots individual components with defaults (color='k', ls='--', alpha=0.3). - If dict: updates defaults with provided options. - If None (default): components are not plotted. hargs : dict, optional Options for histogram (not currently used in plot_fit, but included for API consistency). largs : dict, optional Deprecated. Use pargs instead. Returns ------- fig : matplotlib.figure.Figure Figure handle. Examples -------- >>> F.plot_fit(xlabel=r'$\\tau$', ylabel='ISR') >>> F.plot_fit(sargs=dict(ms=2, color='C0'), pargs=dict(color='C1', lw=3)) """ if self.nvars != 1: raise NotImplementedError( "plot_fit is only implemented for univariate (nvars=1) fits" ) from matplotlib import pyplot as plt # Handle deprecation/alias if pargs is None and largs is not None: pargs = largs # Apply defaults for PDF (pargs) if pargs is True: pargs = dict(color="r", lw=2, label="fit (norm × MoG PDF)") elif isinstance(pargs, dict): pargs = dict(pargs) pargs.setdefault("label", "fit (norm × MoG PDF)") # If pargs is None (or False), it remains None/False and we skip plotting if pargs: # Default zorder for PDF (background) pargs.setdefault("zorder", -100) # Apply defaults for Data (sargs) if sargs is True: # User requested default: sargs=dict(marker='+',linestyle='-') sargs = dict(marker="+", ms=5, linestyle="-", label="data") elif isinstance(sargs, dict): sargs = dict(sargs) sargs.setdefault("label", "data") sargs.setdefault("marker", "+") sargs.setdefault("linestyle", "-") # If sargs is None (or False), it remains None/False and we skip plotting # Merge kwargs into pargs ?? Or sargs? # If user passes kwargs, they probably mean to affect the plot being shown (PDF). if kwargs and pargs: pargs.update(kwargs) fig, ax = plt.subplots(1, 1, figsize=figsize) x = np.asarray(self.X).ravel() f = np.asarray(self.F).ravel() # Only plot data if sargs is explicitly provided (or true default) if sargs: sargs_defaults = dict(ms=1.5, label="data", marker="+", linestyle="-") # We use update to overwrite defaults with user provided sargs (which might be the default dict from above) sargs_defaults.update(sargs) # Default zorder for data (foreground) sargs_defaults.setdefault("zorder", -100) ax.plot(x, f, **sargs_defaults) # Plot PDF (always plotted unless pargs somehow suppressed, but user wants it default) if self.normalization is not None: x_min, x_max = x.min(), x.max() margin = max(1e-6, 0.05 * (x_max - x_min)) x_curve = np.linspace(x_min - margin, x_max + margin, n_curve) # Plot components if requested if dargs is not None and dargs is not False: dargs_default = dict(color="k", ls="--", alpha=0.3) if isinstance(dargs, dict): dargs_default.update(dargs) dargs_default.setdefault("zorder", -10) # We need to construct single-component MoGs to use the robust pdf method # (which handles truncation and normalization correctly) for k in range(self.mog.ngauss): # Extract parameters for k-th component mu_k = self.mog.mus[k : k + 1] sigma_k = self.mog.Sigmas[k : k + 1] # Create temporary MoG with this single component # Note: we use weight=1 here and apply the actual weight manually # to ensure we get the component shape correctly scaled mog_k = MixtureOfGaussians( mus=mu_k, Sigmas=sigma_k, weights=[1.0], domain=self.domain, normalize_weights=False, ) pdf_k = mog_k.pdf(x_curve.reshape(-1, 1)) # Scale by normalization and the component's weight model_vals_k = ( self.normalization * self.mog.weights[k] * np.atleast_1d(pdf_k) ) # Determine styling for this component dargs_k = dargs_default.copy() # Color: cycle if not explicitly provided if "color" not in dargs and "c" not in dargs: # Use standard matplotlib cycle C0, C1, etc. dargs_k["color"] = f"C{k % 10}" # Label: construct default label with component details base_label = dargs_default.get("label", f"Comp {k + 1}") if base_label is None: # explicit None label = None else: mu_val = float(mu_k.ravel()[0]) sigma_val = np.sqrt(float(sigma_k.ravel()[0])) label = rf"{base_label} ($\mu$={mu_val:.2f}, $\sigma$={sigma_val:.2f})" dargs_k["label"] = label ax.plot(x_curve, model_vals_k, **dargs_k) if pargs: pdf_vals = self.mog.pdf(x_curve.reshape(-1, 1)) model_vals = self.normalization * np.atleast_1d(pdf_vals) ax.plot(x_curve, model_vals, **pargs) if xlabel is not None: ax.set_xlabel(xlabel) if ylabel is not None: ax.set_ylabel(ylabel) ax.legend(loc="best", frameon=True) ax.grid(True, alpha=0.3) ax.set_ylim(0, None) fig.tight_layout() if not getattr(self, "_watermark_added_plot_fit", False): multimin_watermark(ax, frac=0.5) self._watermark_added_plot_fit = True self.fig = fig return fig
def _parse_function_mesh_data(self, Xmesh, Fmesh): """ Convert (Xmesh, Fmesh) from mesh/function format to points X (N, nvars) and values F (N,). For univariate: Xmesh and Fmesh are 1D arrays of the same length. For multivariate: Xmesh is a tuple of ndarrays (e.g. from np.meshgrid), Fmesh is an array of the same shape as the grid. Parameters ---------- Xmesh : array-like or tuple of array-like Evaluation points. 1D array (n,) for univariate, or tuple of arrays for a grid. Fmesh : array-like Function values at those points. Shape (n,) for 1D or same shape as grid for mesh. Returns ------- X : numpy.ndarray Points, shape (N, nvars). F : numpy.ndarray Values, shape (N,). nvars : int Number of variables. """ Fmesh = np.asarray(Fmesh, dtype=float) if isinstance(Xmesh, (list, tuple)) and len(Xmesh) > 0: # Multivariate: meshgrid-style (x1, x2, ...) grids = [np.asarray(x, dtype=float) for x in Xmesh] nvars = len(grids) shape = grids[0].shape for g in grids: if g.shape != shape: raise ValueError( "Xmesh grid components must have the same shape; " f"got {[g.shape for g in grids]}" ) if Fmesh.shape != shape: raise ValueError( f"Fmesh shape {Fmesh.shape} must match grid shape {shape}" ) X = np.column_stack([g.ravel() for g in grids]) F = Fmesh.ravel() else: # Univariate: Xmesh and Fmesh are 1D Xmesh = np.asarray(Xmesh, dtype=float) if Xmesh.ndim != 1: raise ValueError( "For univariate, Xmesh must be 1D; for multivariate use a tuple of arrays" ) n = len(Xmesh) if Fmesh.size != n: raise ValueError( f"Xmesh length ({n}) and Fmesh size ({Fmesh.size}) must match" ) nvars = 1 X = Xmesh.reshape(-1, 1) F = np.asarray(Fmesh, dtype=float).ravel() return X, F, nvars