##################################################################
# #
# MultiMin: Multivariate Gaussian fitting #
# #
# Authors: Jorge I. Zuluaga #
# #
##################################################################
# License: GNU Affero General Public License v3 (AGPL-3.0) #
##################################################################
"""
Fitting classes for MoG to data and functions.
Contains:
- FitMoG: Fit MoG to dataset
- FitFunctionMoG: Fit MoG to function
"""
import pickle
import os
from hashlib import md5
import numpy as np
from scipy.optimize import minimize
from scipy import signal
from scipy.ndimage import uniform_filter1d
# Import from package modules
from .base import MultiMinBase
from .util import Util, Stats
from .mog import MixtureOfGaussians
from .plotting import multimin_watermark, MultiPlot
[docs]
class FitMoG(MultiMinBase):
r"""
MoG Fitting handler.
Theory
------
The fitting procedure is based on the Maximum Likelihood Estimation (MLE) method.
Given a dataset of :math:`S` points :math:`\tilde U_k` (e.g., unbound orbital elements), the likelihood
:math:`\mathcal{L}` of the MoG parameters is given by:
.. math::
\mathcal{L}(\{w_k\}_M, \{\mu_k\}_M, \{\Sigma_k\}_M | \{\tilde U_k\}_S) = \prod_{i=1}^{S} \mathcal{C}_M(\tilde U_i)
To find the optimal parameters, we minimize the negative normalized log-likelihood:
.. math::
-\frac{\log \mathcal{L}}{S} = -\frac{1}{S} \sum_{i=1}^{S} \log \mathcal{C}_M(\tilde U_i)
This quantity represents the average negative log-probability density of the data under the model.
Minimizing this value is equivalent to maximizing the likelihood that the observed data was
generated by the MoG model.
Attributes
----------
ngauss : int
Number of fitting MND.
nvars : int
Number of variables in each MND.
mog : MixtureOfGaussians
Fitting object of the class MoG. This object will have the result of the fitting procedure.
solution : scipy.optimize.OptimizeResult
Once the fitting is completed the solution object is returned. Attributes include:
- fun: value of the function in the minimum.
- x: value of the minimization parameters in the minimum.
- nit: number of iterations of the minimization algorithm.
- nfev: number of evaluations of the function.
- success: if True it implies that the minimization fullfills all the conditions.
Ndim : int
Number of mus.
Ncorr : int
Number of correlations.
minparams : numpy.ndarray
Array of minimization parameters at any stage in minimization process.
scales : numpy.ndarray
Array of scales to convert minparams to uparams (unbound) and viceversa.
uparams : numpy.ndarray
Array of unbound minimization parameters.
Notes
-----
Objects of this class must be always initialized with the number of gaussians and the number
of random variables.
Examples
--------
Example: Fitting a synthetic dataset.
>>> # 1. Generate synthetic data
>>> np.random.seed(1)
>>> weights = [[1.0]]
>>> mus = [[1.0, 0.5, -0.5]]
>>> sigmas = [[1, 1.2, 2.3]]
>>> angles = [[10*Angle.Deg, 30*Angle.Deg, 20*Angle.Deg]]
>>> Sigmas = Stats.calc_covariance_from_rotation(sigmas, angles)
>>> MoG = MixtureOfGaussians(mus=mus, weights=weights, Sigmas=Sigmas)
>>> data = MoG.rvs(10000)
>>> # 2. Initialize the fitter
>>> F = FitMoG(ngauss=MoG.ngauss, nvars=MoG.nvars)
>>> # 3. Set bounds (optional but recommended)
>>> bounds = F.set_bounds(bounds=(0.1, 0.9*F._sigmax))
>>> # 4. Run the fit
>>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True), method=None, bounds=bounds)
>>> # 5. Inspect results
>>> print(F.mog)
>>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor='None', color='r'))
>>> # 6. Save/Load results
>>> F.save_fit("/tmp/fit.pkl", useprefix=False)
>>> F._load_fit("/tmp/fit.pkl")
"""
# Constants
# Maximum value of sigma
_sigmax = 10
_ignoreWarnings = True
[docs]
def update_params(self, weights=None, mus=None, sigmas=None, rhos=None):
"""
Update the parameters of the MoG.
Parameters
----------
weights : array_like, optional
Weights of the gaussians.
mus : array_like, optional
Means of the gaussians.
sigmas : array_like, optional
Standard deviations of the gaussians.
rhos : array_like, optional
Correlation coefficients of the gaussians.
"""
if weights is not None:
self.weights = np.array(weights)
if self.normalize_weights:
self.weights /= np.sum(self.weights)
if mus is not None:
mus = np.array(mus)
if mus.shape != self.mus.shape:
raise ValueError(
f"Shape of mus {mus.shape} does not match current shape {self.mus.shape}"
)
self.mus = mus
update_cov = False
if sigmas is not None:
sigmas = np.array(sigmas)
if sigmas.shape != self.sigmas.shape:
raise ValueError(
f"Shape of sigmas {sigmas.shape} does not match current shape {self.sigmas.shape}"
)
self.sigmas = sigmas
update_cov = True
if rhos is not None:
rhos = np.array(rhos)
if rhos.shape != self.rhos.shape:
raise ValueError(
f"Shape of rhos {rhos.shape} does not match current shape {self.rhos.shape}"
)
self.rhos = rhos
update_cov = True
if update_cov:
self.Sigmas = Stats.calc_covariance_from_correlations(
self.sigmas, self.rhos
)
self.params = self.flatten_params()
def __init__(self, data=None, ngauss=1, nvars=None, domain=None, objfile=None):
"""
Initialize FitMoG object.
Parameters
----------
data : numpy.ndarray, optional
Array with data (Nsam x nvars). If provided, nvars is inferred unless explicitly given.
ngauss : int
Number of Gaussian components.
nvars : int, optional
Number of variables. If None and data is provided, inferred from data.
domain : list, optional
Domain for each variable: None (unbounded) or [low, high] per variable.
Example: [None, [0, 1], None] for variable 1 bounded to [0, 1].
objfile : str, optional
Path to a pickled fit to load.
"""
if objfile is not None:
self._load_fit(objfile)
elif isinstance(data, str):
self._load_fit(data)
else:
# Check data and infer nvars
self.data = None
if data is not None:
self.data = np.asarray(data)
if self.data.ndim != 2:
raise ValueError(
f"Data must be 2D array (Nsam x nvars), got shape {self.data.shape}"
)
data_nvars = self.data.shape[1]
if nvars is None:
nvars = data_nvars
elif nvars != data_nvars:
raise ValueError(
f"Provided nvars={nvars} does not match data dimension {data_nvars}"
)
if nvars is None:
# If no data and no nvars, default to 2 (backward compatibility or default)
nvars = 2
# Basic attributes
self.ngauss = ngauss
self.nvars = nvars
self.Ndim = ngauss * nvars
self.Ncorr = int(nvars * (nvars - 1) / 2)
self.domain = domain
# Define the model mogs
self.mog = MixtureOfGaussians(ngauss=ngauss, nvars=nvars, domain=domain)
# Set parameters
self.set_params()
# Report
print("Loading a FitMoG object.")
print(f"Number of gaussians: {self.ngauss}")
print(f"Number of variables: {self.nvars}")
print(f"Number of dimensions: {self.Ndim}")
if self.data is not None:
print(f"Number of samples: {len(self.data)}")
else:
print("Number of samples: 0 (No data provided)")
if self.domain is not None:
print(f"Domain: {self.domain}")
# Compute the log-likelihood of the data
if self.data is not None:
print(f"Log-likelihood per point (-log L/N): {self.norm_log_l()}")
# Other
self.fig = None
self.prefix = ""
[docs]
def set_params(self, mu=0.5, sigma=1.0, rho=0.5):
"""
Set the value of the basic params (minparams, scales, etc.). It updates minparams, scales
and uprams.
Ex. F.set_params(mu=0.5, sigma=1.0, rho=0.5)
Parameters
----------
mu : float, optional
Value of all initial mus (default 0.5).
sigma : float, optional
Value of all initial sigmas (default 1.0).
rho : float, optional
Value of all initial rhos (default 0.5).
Returns
-------
None
"""
# Define the initial parameters
# mus sigmas correlations
minparams = (
[mu] * self.Ndim
+ [sigma] * self.Ndim
+ [1 + rho] * self.ngauss * self.Ncorr
)
scales = (
[0] * self.Ndim
+ [self._sigmax] * self.Ndim
+ [2] * self.ngauss * self.Ncorr
)
if self.ngauss > 1:
self.extrap = []
minparams = [1 / self.ngauss] * self.ngauss + minparams
scales = [1] * self.ngauss + scales
else:
self.extrap = [1]
self.minparams = np.array(minparams)
# When domain is finite, spread initial mus inside each variable's domain
bounds = getattr(self.mog, "_domain_bounds", None)
if bounds is not None and self.ngauss > 1:
for j in range(self.nvars):
lo, hi = bounds[j][0], bounds[j][1]
if np.isfinite(lo) or np.isfinite(hi):
a = lo if np.isfinite(lo) else hi - 1.0
b = hi if np.isfinite(hi) else lo + 1.0
for i in range(self.ngauss):
idx = self.ngauss + i * self.nvars + j
self.minparams[idx] = a + (i + 1) / (self.ngauss + 1) * (b - a)
self.scales = np.array(scales)
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = False
[docs]
def set_initial_params(self, mus=None, sigmas=None, rhos=None, from_mog=None):
"""
Set initial values for the minimization parameters (means, standard deviations, correlations).
Only the arguments provided are updated; the rest keep their current values.
Ex. F.set_initial_params(mus=[[0.2, 0.3], [0.8, 0.7]], sigmas=[[0.1, 0.2], [0.1, 0.2]])
Parameters
----------
mus : array-like, optional
Initial means. Shape (ngauss, nvars) or (nvars,) to use the same means for all components.
Example: [0.2, 0.3] or [[0.2, 0.3], [0.8, 0.7]].
sigmas : array-like, optional
Initial standard deviations. Shape (ngauss, nvars) or (nvars,) to use the same for all components.
rhos : array-like, optional
Initial correlation coefficients (upper triangle). Shape (ngauss, Ncorr) or (Ncorr,) to use the same for all.
Ncorr = nvars*(nvars-1)/2.
from_mog : MixtureOfGaussians, optional
If provided, initialize parameters from this MoG object.
Must have same ngauss and nvars as check.
Any other provided arguments (mus, sigmas, rhos) will override values from from_mog.
Returns
-------
None
Examples
--------
>>> F = FitMoG(ngauss=2, nvars=2)
>>> F.set_initial_params(mus=[0.2, 0.3], sigmas=[0.1, 0.1]) # same for both components
>>> F.set_initial_params(mus=[[0.2, 0.3], [0.8, 0.7]], sigmas=[[0.1, 0.2], [0.1, 0.2]])
>>> F.fit_data(data)
"""
if from_mog is not None:
if from_mog.ngauss != self.ngauss or from_mog.nvars != self.nvars:
raise ValueError(
f"from_mog dimensions (ngauss={from_mog.ngauss}, nvars={from_mog.nvars}) "
f"do not match FitMoG (ngauss={self.ngauss}, nvars={self.nvars})"
)
# Use parameters from the provided mog as base
# We want to use set_initial_params logic to properly broadcast/set minparams
# So we extract lists/arrays from from_mog
mus_mog = from_mog.mus
sigmas_mog = from_mog.sigmas
rhos_mog = from_mog.rhos
# If mus/sigmas/rhos are NOT provided in arguments, use from_mog
if mus is None:
mus = mus_mog
if sigmas is None:
sigmas = sigmas_mog
if rhos is None:
rhos = rhos_mog
mus = np.asarray(mus, dtype=float) if mus is not None else None
sigmas = np.asarray(sigmas, dtype=float) if sigmas is not None else None
rhos = np.asarray(rhos, dtype=float) if rhos is not None else None
def _broadcast_2d(arr, shape_2d, name, dims_desc):
"""Broadcast to (ngauss, nvars) or (ngauss, Ncorr). If 1D, same row for all components."""
if arr is None:
return None
arr = np.atleast_1d(arr)
if arr.ndim == 1:
# Same values for all components
if arr.shape[0] != shape_2d[1]:
raise ValueError(
"{} 1D must have length {} ({}), got {}".format(
name, shape_2d[1], dims_desc, arr.shape[0]
)
)
arr = np.tile(arr, (self.ngauss, 1))
else:
arr = np.atleast_2d(arr)
if arr.shape != shape_2d:
raise ValueError(
"{} must have shape {} or ({}), got {}".format(
name, shape_2d, dims_desc, arr.shape
)
)
return arr
# minparams layout: [weights] + [mus] + [sigmas] + [1+rhos]
if self.ngauss > 1:
off_mu = self.ngauss
off_sig = self.ngauss + self.Ndim
off_rho = self.ngauss + 2 * self.Ndim
else:
off_mu = 0
off_sig = self.nvars
off_rho = self.nvars * 2
# Also update weights if from_mog is used?
# The user request only mentioned: "Cuando se pasa una mog como opción de la from_mog se fijan los parámetros iniciales del ajuste con los parámetros de esa mog."
# Usually weights are also parameters. The weights are the first 'ngauss' params in minparams (if ngauss > 1).
if from_mog is not None:
if self.ngauss > 1:
# Weights are at the beginning
# Normalize just in case
w = from_mog.weights
if from_mog.normalize_weights:
w = w / w.sum()
self.minparams[0 : self.ngauss] = w
if mus is not None:
mus = _broadcast_2d(mus, (self.ngauss, self.nvars), "mus", "nvars")
if self.ngauss > 1:
self.minparams[off_mu : off_mu + self.Ndim] = mus.ravel()
else:
self.minparams[off_mu : off_mu + self.nvars] = mus.ravel()
if sigmas is not None:
sigmas = _broadcast_2d(sigmas, (self.ngauss, self.nvars), "sigmas", "nvars")
if self.ngauss > 1:
self.minparams[off_sig : off_sig + self.Ndim] = sigmas.ravel()
else:
self.minparams[off_sig : off_sig + self.nvars] = sigmas.ravel()
if rhos is not None:
rhos = _broadcast_2d(rhos, (self.ngauss, self.Ncorr), "rhos", "Ncorr")
# Internally we store 1+rho so the unbound param is in a good range
if self.ngauss > 1:
self.minparams[off_rho : off_rho + self.ngauss * self.Ncorr] = (
1.0 + rhos.ravel()
)
else:
self.minparams[off_rho : off_rho + self.Ncorr] = 1.0 + rhos.ravel()
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = True
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
def _stdcorr_to_minparams(self, stdcorr):
"""Inverse of pmap: stdcorr -> minparams (for use in normalized fit)."""
minparams = np.array(stdcorr[len(self.extrap) :], dtype=float)
if self.ngauss * self.Ncorr > 0:
minparams[-self.ngauss * self.Ncorr :] += 1
return minparams
def _stdcorr_from_weights_mus_sigmas_rhos(self, weights, mus, sigmas, rhos):
"""Build stdcorr from unpacked arrays (rhos as correlations, not 1+rho).
Returns stdcorr without extrap (for use with mog.set_stdcorr).
To get FitMoG's stdcorr format (with extrap), prepend self.extrap."""
return np.concatenate(
[
np.asarray(weights).ravel(),
np.asarray(mus).ravel(),
np.asarray(sigmas).ravel(),
np.asarray(rhos).ravel(),
]
)
def _init_params_from_data_finite_domain(self, data):
"""When domain is finite, set initial mus from data percentiles and modest sigmas so the optimizer starts near the peaks."""
bounds = getattr(self.mog, "_domain_bounds", None)
if bounds is None or self.ngauss < 2:
return
data = np.asarray(data)
if data.size == 0:
return
# Flatten so we can index by variable
if data.ndim == 1:
data = np.reshape(data, (-1, 1))
n = data.shape[0]
for j in range(self.nvars):
lo, hi = bounds[j][0], bounds[j][1]
if not (np.isfinite(lo) and np.isfinite(hi)):
continue
col = data[:, j]
# Initial mus = percentiles of data along this dimension, sorted and clipped
pct = np.linspace(5, 95, self.ngauss)
mus_init = np.percentile(col, pct)
mus_init = np.sort(mus_init)
mus_init = np.clip(mus_init, lo, hi)
for i in range(self.ngauss):
idx = self.ngauss + i * self.nvars + j
self.minparams[idx] = mus_init[i]
# Initial sigmas: start with a fraction of domain width so we don't drift to flat
width = hi - lo
sigma_init = min(0.25 * width, max(0.05 * width, np.std(col) * 0.5))
for i in range(self.ngauss):
idx = self.ngauss + self.Ndim + i * self.nvars + j
if idx < len(self.minparams):
self.minparams[idx] = sigma_init
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
[docs]
def pmap(self, minparams):
"""
Mapping routine used in sample_mog_likelihood. Mapping may change depending on the
complexity of the parameters to be minimized.
Ex. stdcorr = F.pmap(minparams)
Parameters
----------
minparams : numpy.ndarray
Minimization parameters.
Returns
-------
stdcorr : numpy.ndarray
Flatten parameters with correlations.
"""
stdcorr = np.array(self.extrap + list(minparams))
if self.ngauss * self.Ncorr > 0:
stdcorr[-self.ngauss * self.Ncorr :] -= 1
return stdcorr
[docs]
def log_l(self, data=None):
"""
Value of the -log(Likelihood).
Ex. F.log_l(data)
Parameters
----------
data : numpy.ndarray, optional
Array with data (Nsam x nvars). If None, uses self.data.
Returns
-------
log_l : float
Value of the -log(Likelihood).
"""
if data is None:
data = self.data
if data is None:
raise ValueError("No data provided")
log_l = self.mog.sample_mog_likelihood(
self.uparams, data=data, pmap=self.pmap, tset="stdcorr", scales=self.scales
)
return log_l
[docs]
def norm_log_l(self, data=None):
"""
Calculate the normalized log-likelihood of the data.
Parameters
----------
data : numpy.ndarray, optional
Data for which to calculate the log-likelihood. If None, uses self.data.
Returns
-------
float
Normalized log-likelihood per point (-log L/N).
"""
if data is None:
data = self.data
if data is None:
raise ValueError("No data provided")
data = np.asarray(data)
log_likelihood = self.log_l(data) / len(data)
return log_likelihood
@Util.timer
def fit_data(
self, data=None, verbose=0, advance=0, normalize=False, progress=False, **args
):
"""
Minimization procedure. It updates the solution attribute.
Ex. F.fit_data(data, verbose=0, tol=1e-3)
Parameters
----------
data : numpy.ndarray, optional
Array with data (Nsam x nvars). If None, uses self.data.
verbose : int, optional
Verbosity level for the sample_mog_likelihood routine (default 0).
advance : int, optional
If larger than 0 show advance each "advance" iterations (default 0).
normalize : bool, optional
If True and domain is finite, fit in normalized space (each variable scaled to [0, 1]
using the domain bounds). Improves conditioning when variables have very different
scales (e.g. [0, 1.3], [0, 1], [0, 180]) and can yield more stable, reproducible minima.
progress : str or bool, optional
- "tqdm" or True: show a tqdm progress bar (requires tqdm).
- "text": show text-based progress (equivalent to setting advance > 0).
- False: no progress output (unless advance is set).
**args : dict
Options of the minimize routine (eg. tol=1e-6).
A particularly interesting parameter is the minimization method.
Available methods:
* Slow but sure: Powell
* Fast but unsure: CG, BFGS, COBYLA, SLSQP
Returns
-------
None
Examples
--------
>>> F = FitMoG(1, 3)
>>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True))
>>> F.fit_data(data, normalize=True) # recommended when variable scales differ a lot
"""
if data is None:
data = self.data
if data is None:
raise ValueError("No data provided")
# Handle progress options
use_tqdm = False
if progress == "tqdm" or (isinstance(progress, bool) and progress):
use_tqdm = True
elif progress == "text":
if not advance:
advance = 1
pbar = None
if use_tqdm:
try:
from tqdm.auto import tqdm
maxiter = args.get("maxiter") or args.get("options", {}).get("maxiter")
pbar = tqdm(total=maxiter, desc="Minimizing")
except ImportError:
print("tqdm not installed, progress bar disabled.")
if advance or pbar is not None:
advance = int(advance) if advance else 0
self.neval = 0
def _advance(X, show=False):
if pbar is not None:
pbar.update(1)
if advance > 0:
if self.neval == 0:
print(f"Iterations:")
if self.neval % advance == 0 or show:
vars = np.array2string(
X,
separator=", ",
precision=4,
max_line_width=np.inf,
formatter={"float_kind": lambda x: f"{x:.2g}"},
)
fun = self.mog.sample_mog_likelihood(
X, data, self.pmap, "stdcorr", self.scales, verbose
)
print(
f"Iter {self.neval}:\n\tVars: {vars}\n\tLogL/N: {fun / len(data)}"
)
self.neval += 1
else:
_advance = None
self.data = np.copy(data)
data = np.asarray(data)
if data.ndim == 1:
data = data.reshape(-1, 1)
self.mog._ignoreWarnings = self._ignoreWarnings
self.minargs = dict(method="Powell")
self.minargs.update(args)
# When domain is finite, set bounds so mus stay inside the domain and sigmas stay > 0
bounds_orig = getattr(self.mog, "_domain_bounds", None)
has_finite_domain = bounds_orig is not None and any(
np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1])
for j in range(self.nvars)
)
# Optional: fit in normalized space [0,1] per variable for better conditioning
if normalize and has_finite_domain:
scale = np.zeros(self.nvars)
offset = np.zeros(self.nvars)
for j in range(self.nvars):
lo, hi = bounds_orig[j][0], bounds_orig[j][1]
if np.isfinite(lo) and np.isfinite(hi):
offset[j], scale[j] = lo, hi - lo
else:
offset[j] = np.nanmin(data[:, j])
scale[j] = max(np.nanmax(data[:, j]) - offset[j], 1e-10)
data_fit = (data - offset) / scale
domain_norm = tuple([0.0, 1.0] for _ in range(self.nvars))
self.mog._domain_bounds = [tuple(x) for x in domain_norm]
# Transform current minparams to normalized space
stdcorr = self.pmap(self.minparams)
i = len(self.extrap)
# For ngauss==1 there are no weights in stdcorr after extrap; next are mus
w = np.array([1.0]) if self.ngauss == 1 else stdcorr[i : i + self.ngauss]
mus = stdcorr[
i + (self.ngauss if self.ngauss > 1 else 0) : i
+ (self.ngauss if self.ngauss > 1 else 0)
+ self.Ndim
].reshape(self.ngauss, self.nvars)
sigmas = stdcorr[
i + (self.ngauss if self.ngauss > 1 else 0) + self.Ndim : i
+ (self.ngauss if self.ngauss > 1 else 0)
+ 2 * self.Ndim
].reshape(self.ngauss, self.nvars)
rhos = stdcorr[
i + (self.ngauss if self.ngauss > 1 else 0) + 2 * self.Ndim : i
+ (self.ngauss if self.ngauss > 1 else 0)
+ 2 * self.Ndim
+ self.ngauss * self.Ncorr
].reshape(self.ngauss, self.Ncorr)
mus_norm = (mus - offset) / scale
sigmas_norm = sigmas / scale
stdcorr_norm_no_extrap = self._stdcorr_from_weights_mus_sigmas_rhos(
w, mus_norm, sigmas_norm, rhos
)
# For ngauss==1, minparams has no weight; for ngauss>1, use _stdcorr_to_minparams
if self.ngauss == 1:
# stdcorr_norm_no_extrap = [w, mus, sigmas, rhos] = [1, 3, 3, 3] = 10 elements
# minparams = [mus, sigmas, 1+rhos] = [3, 3, 3] = 9 elements
self.minparams = np.concatenate(
[
stdcorr_norm_no_extrap[1 : 1 + self.Ndim], # mus
stdcorr_norm_no_extrap[
1 + self.Ndim : 1 + 2 * self.Ndim
], # sigmas
stdcorr_norm_no_extrap[1 + 2 * self.Ndim :] + 1.0, # 1+rhos
]
)
else:
stdcorr_norm = np.concatenate([self.extrap, stdcorr_norm_no_extrap])
self.minparams = self._stdcorr_to_minparams(stdcorr_norm)
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
sigma_hi_norm = min(0.99 * self._sigmax, 2.0)
bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi_norm))
self.minargs["bounds"] = bounds_tuple
if self.minargs.get("method") == "Powell":
self.minargs["method"] = "L-BFGS-B"
try:
self.solution = minimize(
self.mog.sample_mog_likelihood,
self.uparams,
callback=_advance,
args=(data_fit, self.pmap, "stdcorr", self.scales, verbose),
**self.minargs,
)
finally:
if pbar is not None:
pbar.close()
if advance:
_advance(self.solution.x, show=True)
self.minparams = Util.t_if(self.solution.x, self.scales, Util.u2f)
stdcorr_norm = self.pmap(self.minparams)
i = len(self.extrap)
off = i + (self.ngauss if self.ngauss > 1 else 0)
w = (
np.array([1.0])
if self.ngauss == 1
else stdcorr_norm[i : i + self.ngauss]
)
mus_norm = stdcorr_norm[off : off + self.Ndim].reshape(
self.ngauss, self.nvars
)
sigmas_norm = stdcorr_norm[off + self.Ndim : off + 2 * self.Ndim].reshape(
self.ngauss, self.nvars
)
rhos = stdcorr_norm[
off + 2 * self.Ndim : off + 2 * self.Ndim + self.ngauss * self.Ncorr
].reshape(self.ngauss, self.Ncorr)
mus_orig = mus_norm * scale + offset
sigmas_orig = sigmas_norm * scale
stdcorr_orig = self._stdcorr_from_weights_mus_sigmas_rhos(
w, mus_orig, sigmas_orig, rhos
)
self.mog._domain_bounds = bounds_orig
self.mog.set_stdcorr(stdcorr_orig, self.nvars)
# Convert mog's stdcorr to FitMoG's minparams (for ngauss=1, minparams has no weight)
if self.ngauss == 1:
mus_flat = mus_orig.ravel()
sigmas_flat = sigmas_orig.ravel()
rhos_flat = (rhos + 1.0).ravel()
self.minparams = np.concatenate([mus_flat, sigmas_flat, rhos_flat])
else:
stdcorr_orig_with_extrap = np.concatenate([self.extrap, stdcorr_orig])
self.minparams = self._stdcorr_to_minparams(stdcorr_orig_with_extrap)
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._update_prefix()
return
if has_finite_domain and "bounds" not in args:
# bounds from domain; sigma: lower 1e-6 to avoid div by zero; upper limited so fit cannot go flat
sigma_hi = 0.99 * self._sigmax
domain_widths = [
self.mog._domain_bounds[j][1] - self.mog._domain_bounds[j][0]
for j in range(self.nvars)
if np.isfinite(self.mog._domain_bounds[j][0])
and np.isfinite(self.mog._domain_bounds[j][1])
]
if domain_widths:
# Cap sigma at ~2x largest bounded dimension width to avoid flat (uniform) optimum
sigma_hi = min(sigma_hi, 2.0 * max(domain_widths))
bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi))
self.minargs["bounds"] = bounds_tuple
if self.minargs.get("method") == "Powell":
self.minargs["method"] = "L-BFGS-B"
# Data-based init: set initial mus (and sigmas) from data unless user set them via set_initial_params
if not getattr(self, "_initial_params_set_by_user", False):
self._init_params_from_data_finite_domain(data)
try:
self.solution = minimize(
self.mog.sample_mog_likelihood,
self.uparams,
callback=_advance,
args=(data, self.pmap, "stdcorr", self.scales, verbose),
**self.minargs,
)
finally:
if pbar is not None:
pbar.close()
if advance:
_advance(self.solution.x, show=True)
self.uparams = self.solution.x
# Set the new params
# Set the new params
self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f)
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
self._update_prefix()
def _load_fit(self, objfile):
"""
Load a fit object from file.
"""
F = pickle.load(open(objfile, "rb"))
for k in F.__dict__.keys():
setattr(self, k, getattr(F, k))
self._update_prefix()
def _gen_qsample(self, n_sim, resample=False):
"""
Generate or reuse the synthetic sample for Q-Q plots and quality checks.
"""
if resample or not hasattr(self, "mog_qsample") or self.mog_qsample is None:
self.mog_qsample = self.mog.rvs(n_sim)
# Check size consistency if we are reusing
elif len(self.mog_qsample) != n_sim:
# If the requested size is different, we must regenerate
self.mog_qsample = self.mog.rvs(n_sim)
return self.mog_qsample
[docs]
def quality_of_fit(
self,
data=None,
n_sim=None,
resample=False,
plot_qq=False,
figsize=5,
ax=None,
plot_line=True,
title="Q-Q Plot",
verbose=False,
):
r"""
Calculate goodness-of-fit statistics using the Kolmogorov-Smirnov distance and the Coefficient of Determination (:math:`R^2`) relative to the identity line.
Optionally generates a Q-Q plot to visually assess the quality of the fit.
The **Kolmogorov-Smirnov Statistic** (:math:`D_{KS}`) measures the maximum absolute distance between the Cumulative Distribution Function (CDF) of the observed data and that of the model. It represents the "worst point" of local deviation.
.. math::
D_{KS} = \sup_{x} | F_{obs}(x) - F_{model}(x) |
The range of the statistic is :math:`D_{KS} \in [0, 1]`, where an acceptable value is :math:`D_{KS} \approx 0` (ideally < 0.05).
The **Q-Q plot** is a non-parametric visual diagnostic tool used to assess whether a set of observed data follows a specific theoretical distribution. It compares the distribution of the **Negative Log-Likelihood** (:math:`S = -2 \ln \mathcal{L}`) of the real data against the model's prediction.
If the fit is ideal, the points should align on the identity line :math:`y=x`. Deviations indicate:
* **Curved ends**: tail deviations ("Heavy" or "Light" tails).
* **Shift or slope change**: systematic errors in mean or variance.
The **Coefficient of Determination on Identity** (:math:`R^2`) measures the variance explained by the model with respect to the ideal line :math:`y=x`. It penalizes both dispersion and bias.
.. math::
R^2 = 1 - \frac{\sum (y_{obs} - x_{model})^2}{\sum (y_{obs} - \bar{y}_{obs})^2}
The range is :math:`R^2 \in (-\infty, 1]`, where an acceptable value is :math:`R^2 \approx 1` (ideally > 0.9).
Parameters
----------
data : numpy.ndarray, optional
Observed data. If None, uses self.data.
n_sim : int, optional
Number of synthetic data points to generate. Default is 10 * len(data).
resample : bool, optional
If True, regenerate the synthetic sample even if one exists.
plot_qq : bool, optional
If True, generate the Q-Q plot. Default is False.
figsize : int or tuple, optional
Size of the figure (if creating a new one).
ax : matplotlib.axes.Axes, optional
Axes object to plot on. If None and plot_qq=True, creates a new figure.
plot_line : bool, optional
If True, plot the identity line y=x.
title : str, optional
Title of the plot.
verbose : bool, optional
If True, print the results.
Returns
-------
stats : dict
Dictionary containing:
- 'ks_dist': Kolmogorov-Smirnov distance between S_obs and S_sim (lower is better)
- 'ks_pval': p-value from KS test (note: not strictly valid as parameters are estimated)
- 'r2_identity': R-squared coefficient regarding identity line (closer to 1 is better)
- 'ax': The axes object if plot_qq=True, else None.
"""
from scipy.stats import ks_2samp
if data is None:
data = self.data
if data is None:
raise ValueError("No data provided")
data = np.asarray(data)
n_samples = len(data)
if n_sim is None:
n_sim = 10 * n_samples
if verbose:
print(
f"Calculating fit quality statistics for {n_samples} observed points..."
)
# 1. Calculate S_obs = -2 * ln(L(x_obs))
pdf_vals_obs = self.mog.pdf(data)
pdf_vals_obs = np.maximum(pdf_vals_obs, 1e-300)
S_obs = -2 * np.log(pdf_vals_obs)
S_obs.sort()
# 2. Generate synthetic data and calculate S_sim
if verbose:
print(f"Generating {n_sim} synthetic points (resample={resample})...")
data_sim = self._gen_qsample(n_sim, resample=resample)
pdf_vals_sim = self.mog.pdf(data_sim)
pdf_vals_sim = np.maximum(pdf_vals_sim, 1e-300)
S_sim = -2 * np.log(pdf_vals_sim)
S_sim.sort()
# 3. Kolmogorov-Smirnov Statistic
d_ks, p_val = ks_2samp(S_obs, S_sim)
# 4. R2 relative to identity (y=x)
# Interpolate S_sim to match quantiles of S_obs for pairwise comparison
percentiles = np.linspace(0, 100, n_samples)
S_sim_interp = np.percentile(S_sim, percentiles)
# Residuals from identity line (y=x means S_obs = S_sim_interp)
residuos = S_obs - S_sim_interp
ss_res = np.sum(residuos**2)
# Total sum of squares of S_obs around its mean
ss_tot = np.sum((S_obs - np.mean(S_obs)) ** 2)
# Avoid division by zero if ss_tot is 0
if ss_tot == 0:
r2_identity = 0.0 # or nan
else:
r2_identity = 1 - (ss_res / ss_tot)
if verbose:
print(f"Fit Quality Results:")
print(f" KS Distance (lower is better) : {d_ks:.4f}")
print(f" R2 vs Identity (closer to 1 is better): {r2_identity:.4f}")
# 5. Plot
ret_ax = None
if plot_qq:
from matplotlib import pyplot as plt
if ax is None:
if isinstance(figsize, int):
figsize = (figsize, figsize)
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.figure
self.fig_qq = fig
self.ax_qq = ax
label = f"Data vs Model ($R^2$={r2_identity:.4f})"
self.ax_qq.scatter(S_sim_interp, S_obs, alpha=0.5, label=label)
if plot_line:
# Identity line range
min_val = min(S_obs.min(), S_sim_interp.min())
max_val = max(S_obs.max(), S_sim_interp.max())
self.ax_qq.plot(
[min_val, max_val],
[min_val, max_val],
"r--",
label="Perfect Fit (y=x)",
)
self.ax_qq.set_xlabel("Theoretical Quantiles (-2 Log Likelihood)")
self.ax_qq.set_ylabel("Observed Quantiles (-2 Log Likelihood)")
self.ax_qq.set_title(title)
self.ax_qq.legend(loc="upper left")
self.ax_qq.grid(True, alpha=0.3)
# Add KS statistic to the plot
self.ax_qq.text(
0.95,
0.05,
rf"$D_{{KS}} = {d_ks:.3f}$",
transform=self.ax_qq.transAxes,
ha="right",
va="bottom",
bbox=dict(boxstyle="round", facecolor="white", alpha=0.8),
)
multimin_watermark(self.ax_qq)
return {"ks_dist": d_ks, "r2_identity": r2_identity}
[docs]
def plot_fit(
self,
N=10000,
figsize=2,
properties=None,
ranges=None,
hargs=None,
sargs=None,
pargs=True,
cargs=None,
marginals=False,
):
"""
Plot the result of the fitting procedure.
Ex. F.plot_fit(figsize=3, hargs=dict(bins=30), sargs=dict(s=0.5, color='r'))
Parameters
----------
N : int, optional
number of points used to build a representation of the marginal distributions (default 10000).
figsize : int, optional
Size of each axis (default 2).
properties : list or dict, optional
Property names or MultiPlot-style properties. List: each element as axis label,
range=None. Dict: same as MultiPlot (keys with 'label' and optional 'range').
ranges : list, optional
Ranges per variable; used only when properties is a list or None.
hargs : dict, optional
Dictionary with options for the hist2d function (or 1D hist when nvars=1).
If provided, the sample is shown as a histogram.
sargs : dict, optional
Dictionary with options for the scatter plot.
If provided, the sample is shown as a scatter plot.
pargs : dict, optional
Dictionary with options for the PDF plot.
If None (default), defaults are used (color='k', lw=2, label='PDF').
To suppress PDF, pass an empty dict is not sufficient, but pargs is default.
cargs : dict, optional
Dictionary with options for the contour plot (mog_contour).
If None (default), contours are not plotted.
marginals : bool, optional
Include marginal distributions on diagonal panels (default False).
Returns
-------
G : matplotlib.figure.Figure or MultiPlot
Graphic handle.
Examples
--------
>>> F = FitMoG(1, 3)
>>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True))
>>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor=None, color='r'))
"""
# Set defaults
if pargs is True:
pargs = dict(color="k", lw=2, label="PDF")
elif isinstance(pargs, dict):
pargs = dict(pargs)
pargs.setdefault("label", "PDF")
# If pargs is None (or False), it remains None/False and we skip plotting
if pargs:
# Default zorder for PDF (background)
pargs.setdefault("zorder", -100)
# hargs/sargs are None by default as per user request (no plot unless specified)
# "Escoger desde el principio el rango del pcolormesh para que coincida con el de los datos"
# If ranges is not provided, compute it from data to ensure plot covers all points
if ranges is None and properties is not None:
# Check if properties is dict and has ranges (skip)
# But if properties is list, calculate ranges from data
if (
isinstance(properties, list)
and self.data is not None
and len(self.data) > 0
):
ranges = []
for i in range(self.nvars):
dmin, dmax = self.data[:, i].min(), self.data[:, i].max()
width = dmax - dmin
margin = 0.05 * width if width > 0 else 1.0
ranges.append((float(dmin - margin), float(dmax + margin)))
properties = Util.props_to_properties(properties, self.nvars, ranges)
from matplotlib import pyplot as plt
if self.nvars == 1:
# Univariate: show fitted PDF; show sample as histogram (if hargs) and/or scatter (if sargs)
G = MultiPlot(properties, figsize=figsize, marginals=marginals)
ax = G.axs[0][0]
if hargs is not None:
G.sample_hist(self.data, **hargs)
if sargs is not None:
G.sample_scatter(self.data, **sargs)
# Overlay fitted PDF
# Check if we have data to determine range? If fitting was done, self.data or bounds are known.
# Use MultiPlot logic mostly, but here we plot manually.
# Or we can delegate to G.mog_pdf(self.mog)?
# But FitMoG contains self.mog.
# And self.data is the training data.
# We want to plot the fitted regular PDF.
# G.mog_pdf(self.mog, **pargs) would work and be cleaner!
# But let's keep existing logic to be safe, just updated with pargs.
# Overlay fitted PDF
if pargs:
x_min, x_max = self.data[:, 0].min(), self.data[:, 0].max()
margin = max(1e-6, 0.1 * (x_max - x_min))
x_curve = np.linspace(x_min - margin, x_max + margin, 300)
pdf_vals = self.mog.pdf(x_curve.reshape(-1, 1))
ax.plot(x_curve, pdf_vals, **pargs)
# If no data plotted, set basic labels/limits
if not hargs and not sargs:
ax.set_ylabel("PDF")
ax.set_ylim(0, None)
# Legend
handles, labels = ax.get_legend_handles_labels()
if getattr(G, "_ax_twin", None) is not None:
h2, l2 = G._ax_twin.get_legend_handles_labels()
handles, labels = handles + h2, labels + l2
if handles:
ax.legend(
handles,
labels,
loc="lower center",
bbox_to_anchor=(0.5, 1.02),
ncol=len(handles),
frameon=False,
)
G.fig.subplots_adjust(top=0.88)
if not getattr(G, "_watermark_added", False):
multimin_watermark(G.axs[0][0], frac=0.5)
self.fig = G.fig
return G
if self.nvars >= 2:
G = MultiPlot(properties, figsize=figsize, marginals=marginals)
# 1. Plot Scatter of Data (if requested)
if sargs is not None:
G.sample_scatter(self.data, **sargs)
# 3. Plot Histogram of RVS (if requested)
if hargs is not None:
Xfits = self.mog.rvs(N)
G.sample_hist(Xfits, **hargs)
# 4. Plot MoG Contours (if requested)
if cargs is not None:
G.mog_contour(self.mog, **cargs)
# 2. Plot MoG PDF (default)
# Use pargs defaults if provided (which they are by default in this method)
if pargs:
G.mog_pdf(self.mog, **pargs)
# If no layers, maybe warn? But mog_pdf is default.
# SET RANGES ACCORDING TO DATA
if self.data is not None:
for i, prop in enumerate(G.properties):
dmin, dmax = self.data[:, i].min(), self.data[:, i].max()
# Force range to data limits (overriding default 4-sigma extents of PDF)
G.dproperties[prop]["range"] = [dmin, dmax]
G.set_ranges()
G.fig.tight_layout()
if not getattr(G, "_watermark_added", False):
multimin_watermark(G.axs[0][0])
G._watermark_added = True
self.fig = G.fig
return G
def _inv_params(self, stdcorr):
"""
Invert stdcorr to minparams.
"""
minparams = np.copy(stdcorr)
minparams[-self.ngauss * self.Ncorr :] += 1
self.minparams = minparams[1:] if self.ngauss == 1 else minparams
def _update_prefix(self, myprefix=None):
"""
Update prefix of fit.
Prefix has two parts: the number of gaussians used and a hash computed from the object.
Prefix change if:
- Data change.
- Initial minimization parameters change (e.g. if the fit is ran twice)
- Minimization parameters are changed.
- Bounds are changed.
Alternative prefix:
>>> self.hash = md5(pickle.dumps([self.ngauss, self.data])).hexdigest()[:5]
>>> self.hash = md5(pickle.dumps(self.__dict__)).hexdigest()[:5]
>>> self.hash = md5(pickle.dumps(self.minparams)).hexdigest()[:5]
>>> self.hash = md5(pickle.dumps(self.mog)).hexdigest()[:5]
"""
self.hash = md5(pickle.dumps(self)).hexdigest()[:5]
prefix_str = ""
if myprefix is not None:
prefix_str = f"_{myprefix}"
self.prefix = f"{self.ngauss}mog{prefix_str}_{self.hash}"
[docs]
def save_fit(self, objfile=None, useprefix=True, myprefix=None):
"""
Pickle the result of a fit.
Ex. F.save_fit(objfile='fit.pkl')
Parameters
----------
objfile : str, optional
Name of the file where the fit will be stored. If None, the name is set
by the routine as FitMoG.pkl.
useprefix : bool, optional
Use a prefix in the filename of the pickle file (default True).
The prefix is normally {ngauss}mog_{hash}.
myprefix : str, optional
Custom prefix.
Examples
--------
If objfile="fit.pkl", the final filename will be fit-1mnd_asa33.pkl
"""
self.fig = None
self._update_prefix(myprefix)
if objfile is None:
objfile = f"/tmp/FitMoG.pkl"
if useprefix:
parts = os.path.splitext(objfile)
objfile = f"{parts[0]}-{self.prefix}{parts[1]}"
# Avoid saving the temporary sample
if hasattr(self, "mog_qsample"):
del self.mog_qsample
# Avoid saving the Q-Q plot figures
if hasattr(self, "fig_qq"):
del self.fig_qq
if hasattr(self, "ax_qq"):
del self.ax_qq
pickle.dump(self, open(objfile, "wb"))
[docs]
def set_bounds(self, boundw=None, bounds=None, boundr=None, boundsm=None):
"""
Set the minimization parameters.
Ex. F.set_bounds(boundsm=((-2, 1), (-3, 0)))
Parameters
----------
boundw : tuple, optional
Bound of weights (default (-np.inf, np.inf)).
bounds : tuple or list, optional
Bounds of weights, mus, sigmas and rhos of each variable.
boundr : tuple, optional
Bound of rhos (default (-np.inf, np.inf)).
boundsm : tuple of tuples, optional
Bounds of averages (default (-np.inf, np.inf)).
Normally the bounds on averages must be expressed in this way:
boundsm = ((-min_1, max_1), (-min_2, max_2), ...)
Example for nvars = 2:
boundsm = ((-2, 1), (-3, 0))
Returns
-------
bounds : tuple
Formatted bounds for minimization.
"""
if boundsm is None:
# Use domain as bounds for means when domain is finite
if getattr(self.mog, "_domain_bounds", None) is not None:
boundsm = tuple(
(self.mog._domain_bounds[j][0], self.mog._domain_bounds[j][1])
for j in range(self.nvars)
)
else:
boundsm = ((-np.inf, np.inf),) * self.nvars
# Regular bounds
if boundw is None:
boundw = (-np.inf, np.inf)
else:
boundw = tuple([Util.f2u(bw, 1) for bw in boundw])
if bounds is None:
bounds = (-np.inf, np.inf)
else:
bounds = tuple([Util.f2u(bs, self._sigmax) for bs in bounds])
if boundr is None:
boundr = (-np.inf, np.inf)
else:
boundr = tuple([Util.f2u(1 + br, 2) for br in boundr])
bounds = (
*((boundw,) * self.ngauss),
*(boundsm * self.ngauss),
*((bounds,) * self.nvars * self.ngauss),
*((boundr,) * self.ngauss * int(self.nvars * (self.nvars - 1) / 2)),
)
self.bounds = bounds
if self.ngauss == 1:
bounds = bounds[1:]
self.mog._str_params()
return bounds
[docs]
class FitFunctionMoG(MultiMinBase):
"""
Fit a MoG to a function evaluated on a mesh (instead of to data points).
The objective is to approximate the function by a composed multivariate normal
distribution (MoG) by minimizing a loss between the function values and the
MoG PDF on the same grid.
Initialize with data=(Xmesh, Fmesh) and ngauss:
- Univariate: Xmesh and Fmesh are 1D arrays (same length).
- Multivariate: Xmesh is a tuple of ndarrays (e.g. from np.meshgrid),
Fmesh is an array of the same shape as the grid.
Attributes
----------
X : numpy.ndarray
Grid points, shape (N, nvars).
F : numpy.ndarray
Function values at the grid points, shape (N,).
ngauss, nvars, mog, minparams, scales, uparams
Same as in FitMoG (inherited).
normalization : float
Scale factor fitted so that normalization * mog.pdf(X) approximates F.
Set by fit_data(); None before fitting.
See Also
--------
quality_of_fit : Report R² and RMSE after fitting (call after fit_data).
"""
_sigmax = 10
_ignoreWarnings = True
def __init__(self, data, ngauss=1):
"""
Initialize FitFunctionMoG from function-on-mesh data.
The domain is always set from the data (min/max of Xmesh per variable).
Parameters
----------
data : tuple (Xmesh, Fmesh)
Xmesh: 1D array (univariate) or tuple of arrays (meshgrid, multivariate).
Fmesh: function values at those points; same length or shape as the grid.
ngauss : int
Number of Gaussian components.
Examples
--------
>>> F = mn.FitFunctionMoG(data=(Xmesh, Fmesh), ngauss=3)
"""
if data is None:
raise ValueError("data must be provided as (Xmesh, Fmesh)")
if not isinstance(data, (tuple, list)):
raise ValueError(
"data must be a tuple or list of exactly 2 elements (Xmesh, Fmesh); "
f"got {type(data).__name__}"
)
if len(data) != 2:
raise ValueError(
"data must have exactly 2 elements (Xmesh, Fmesh); "
f"got {len(data)} elements"
)
Xmesh, Fmesh = data[0], data[1]
self._X, self._F, nvars = self._parse_function_mesh_data(Xmesh, Fmesh)
self.X = self._X
self.F = self._F
# Internal normalization for better numerical stability during optimization
self._Fmax = float(np.max(np.abs(self.F)))
if self._Fmax > 0:
self._Fnorm = self.F / self._Fmax
else:
self._Fnorm = self.F
self._Fmax = 1.0
self.ngauss = ngauss
self.nvars = nvars
self.Ndim = ngauss * nvars
self.Ncorr = int(nvars * (nvars - 1) / 2)
self.domain = [
[float(self.X[:, j].min()), float(self.X[:, j].max())] for j in range(nvars)
]
self.mog = MixtureOfGaussians(
ngauss=ngauss, nvars=nvars, domain=self.domain, normalize_weights=False
)
self.normalization = None
self._weight_scale = 100.0
widths = [self.domain[j][1] - self.domain[j][0] for j in range(nvars)]
if widths:
self._sigmax = max(self._sigmax, 2.0 * max(widths))
self.set_params()
print("Loading a FitFunctionMoG object.")
print(f"Number of gaussians: {self.ngauss}")
print(f"Number of variables: {self.nvars}")
print(f"Number of dimensions: {self.Ndim}")
print(f"Number of grid points: {len(self.F)}")
if self.domain is not None:
print(f"Domain: {self.domain}")
[docs]
def set_params(self, mu=0.5, sigma=1.0, rho=0.5):
"""
Set the value of the basic params (minparams, scales, uparams).
Ex. F.set_params(mu=0.5, sigma=1.0, rho=0.5)
"""
minparams = (
[mu] * self.Ndim
+ [sigma] * self.Ndim
+ [1 + rho] * self.ngauss * self.Ncorr
)
scales = (
[0] * self.Ndim
+ [self._sigmax] * self.Ndim
+ [2] * self.ngauss * self.Ncorr
)
if self.ngauss > 1:
self.extrap = []
minparams = [1 / self.ngauss] * self.ngauss + minparams
w_scale = getattr(self, "_weight_scale", 1.0)
scales = [w_scale] * self.ngauss + scales
else:
self.extrap = [1]
self.minparams = np.array(minparams)
bounds = getattr(self.mog, "_domain_bounds", None)
if bounds is not None and self.ngauss > 1:
for j in range(self.nvars):
lo, hi = bounds[j][0], bounds[j][1]
if np.isfinite(lo) or np.isfinite(hi):
a = lo if np.isfinite(lo) else hi - 1.0
b = hi if np.isfinite(hi) else lo + 1.0
for i in range(self.ngauss):
idx = self.ngauss + i * self.nvars + j
self.minparams[idx] = a + (i + 1) / (self.ngauss + 1) * (b - a)
self.scales = np.array(scales)
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = False
[docs]
def pmap(self, minparams):
"""Map minparams to stdcorr for mog.set_stdcorr."""
stdcorr = np.array(self.extrap + list(minparams))
if self.ngauss * self.Ncorr > 0:
stdcorr[-self.ngauss * self.Ncorr :] -= 1
return stdcorr
[docs]
def set_bounds(self, boundw=None, bounds=None, boundr=None, boundsm=None):
"""Set bounds for minimization. Returns formatted bounds tuple."""
if boundsm is None:
if getattr(self.mog, "_domain_bounds", None) is not None:
boundsm = tuple(
(self.mog._domain_bounds[j][0], self.mog._domain_bounds[j][1])
for j in range(self.nvars)
)
else:
boundsm = ((-np.inf, np.inf),) * self.nvars
if boundw is None:
boundw = (-np.inf, np.inf)
else:
boundw = tuple([Util.f2u(bw, 1) for bw in boundw])
if bounds is None:
bounds = (-np.inf, np.inf)
else:
bounds = tuple([Util.f2u(bs, self._sigmax) for bs in bounds])
if boundr is None:
boundr = (-np.inf, np.inf)
else:
boundr = tuple([Util.f2u(1 + br, 2) for br in boundr])
bounds = (
*((boundw,) * self.ngauss),
*(boundsm * self.ngauss),
*((bounds,) * self.nvars * self.ngauss),
*((boundr,) * self.ngauss * int(self.nvars * (self.nvars - 1) / 2)),
)
self.bounds = bounds
if self.ngauss == 1:
bounds = bounds[1:]
self.mog._str_params()
return bounds
[docs]
def set_initial_params(self, mus=None, sigmas=None, rhos=None, from_mog=None):
"""Set initial values for the minimization parameters."""
if from_mog is not None:
if from_mog.ngauss != self.ngauss or from_mog.nvars != self.nvars:
raise ValueError(
f"from_mog dimensions (ngauss={from_mog.ngauss}, nvars={from_mog.nvars}) "
f"do not match FitFunctionMoG (ngauss={self.ngauss}, nvars={self.nvars})"
)
# Use parameters from the provided mog as base
mus_mog = from_mog.mus
sigmas_mog = from_mog.sigmas
rhos_mog = from_mog.rhos
# If mus/sigmas/rhos are NOT provided in arguments, use from_mog
if mus is None:
mus = mus_mog
if sigmas is None:
sigmas = sigmas_mog
if rhos is None:
rhos = rhos_mog
# Update weights if applicable (when ngauss > 1)
if self.ngauss > 1:
w = from_mog.weights
if from_mog.normalize_weights:
w = w / w.sum()
self.minparams[0 : self.ngauss] = w
mus = np.asarray(mus, dtype=float) if mus is not None else None
sigmas = np.asarray(sigmas, dtype=float) if sigmas is not None else None
rhos = np.asarray(rhos, dtype=float) if rhos is not None else None
def _broadcast_2d(arr, shape_2d, name, dims_desc):
if arr is None:
return None
arr = np.atleast_1d(arr)
if arr.ndim == 1:
if arr.shape[0] != shape_2d[1]:
raise ValueError(
f"{name} 1D must have length {shape_2d[1]} ({dims_desc}), got {arr.shape[0]}"
)
arr = np.tile(arr, (self.ngauss, 1))
else:
arr = np.atleast_2d(arr)
if arr.shape != shape_2d:
raise ValueError(
f"{name} must have shape {shape_2d} or ({dims_desc}), got {arr.shape}"
)
return arr
if self.ngauss > 1:
off_mu = self.ngauss
off_sig = self.ngauss + self.Ndim
off_rho = self.ngauss + 2 * self.Ndim
else:
off_mu = 0
off_sig = self.nvars
off_rho = self.nvars * 2
if mus is not None:
mus = _broadcast_2d(mus, (self.ngauss, self.nvars), "mus", "nvars")
if self.ngauss > 1:
self.minparams[off_mu : off_mu + self.Ndim] = mus.ravel()
else:
self.minparams[off_mu : off_mu + self.nvars] = mus.ravel()
if sigmas is not None:
sigmas = _broadcast_2d(sigmas, (self.ngauss, self.nvars), "sigmas", "nvars")
if self.ngauss > 1:
self.minparams[off_sig : off_sig + self.Ndim] = sigmas.ravel()
else:
self.minparams[off_sig : off_sig + self.nvars] = sigmas.ravel()
if rhos is not None:
rhos = _broadcast_2d(rhos, (self.ngauss, self.Ncorr), "rhos", "Ncorr")
if self.ngauss > 1:
self.minparams[off_rho : off_rho + self.ngauss * self.Ncorr] = (
1.0 + rhos.ravel()
)
else:
self.minparams[off_rho : off_rho + self.Ncorr] = 1.0 + rhos.ravel()
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = True
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
def _init_params_from_grid(self, use_function_support=True):
"""
Set initial mus/sigmas from grid (self.X) when domain is finite.
If use_function_support and self.F is available, spread mus only over
points where F > 0.01*max(F) so components start in the support of the function.
"""
bounds = getattr(self.mog, "_domain_bounds", None)
if bounds is None or self.ngauss < 2:
return
X = np.asarray(self.X)
if X.size == 0:
return
F = np.asarray(self.F).ravel() if use_function_support else None
threshold = (
max(0.01 * np.nanmax(F), 1e-300)
if F is not None and F.size == X.shape[0]
else None
)
for j in range(self.nvars):
lo, hi = bounds[j][0], bounds[j][1]
if not (np.isfinite(lo) and np.isfinite(hi)):
continue
col = X[:, j]
if threshold is not None:
mask = F >= threshold
if np.sum(mask) >= 2:
col = col[mask]
pct = np.linspace(5, 95, self.ngauss)
mus_init = np.percentile(col, pct)
mus_init = np.sort(mus_init)
mus_init = np.clip(mus_init, lo, hi)
for i in range(self.ngauss):
idx = self.ngauss + i * self.nvars + j
self.minparams[idx] = mus_init[i]
width = np.ptp(col) if len(col) >= 2 else (hi - lo)
sigma_init = min(
0.25 * (hi - lo),
0.2 * (hi - lo) / max(1, self.ngauss),
max(0.05 * width, np.std(col) * 0.5),
)
for i in range(self.ngauss):
idx = self.ngauss + self.Ndim + i * self.nvars + j
if idx < len(self.minparams):
self.minparams[idx] = sigma_init
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
if self.ngauss > 1:
self._set_initial_weights_from_function()
def _init_params_from_function_support(self, frac=0.01):
"""
Set initial mus/sigmas from the support of the function (where F > frac*max(F)).
Used when domain is not set so the optimizer starts in the region where the function is non-zero.
"""
F = np.asarray(self.F).ravel()
X = np.asarray(self.X)
if X.size == 0 or F.size == 0:
return
threshold = max(frac * np.nanmax(F), 1e-300)
mask = F >= threshold
if not np.any(mask):
mask = np.ones(F.size, dtype=bool)
for j in range(self.nvars):
col = X[mask, j]
if len(col) < 2:
col = X[:, j]
pct = np.linspace(5, 95, self.ngauss)
mus_init = np.percentile(col, pct)
mus_init = np.sort(mus_init)
for i in range(self.ngauss):
idx = self.ngauss + i * self.nvars + j
self.minparams[idx] = mus_init[i]
width = np.ptp(col)
sigma_init = min(
self._sigmax * 0.5,
0.2 * width / max(1, self.ngauss),
max(0.05 * width, np.std(col) * 0.5),
)
if width <= 0:
sigma_init = 1.0
for i in range(self.ngauss):
idx = self.ngauss + self.Ndim + i * self.nvars + j
if idx < len(self.minparams):
self.minparams[idx] = sigma_init
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
if self.ngauss > 1:
self._set_initial_weights_from_function()
def _set_initial_weights_from_function(self, n_sigma_window=2.0):
"""
Set initial weights from the function F by integrating F in a window around each mu.
So peaks with more area get larger initial weight; avoids starting with equal weights.
"""
if self.ngauss < 2:
return
X = np.asarray(self.X)
F = np.asarray(self.F).ravel()
if X.size == 0 or F.size != X.shape[0]:
return
mus = np.array(
[self.minparams[self.ngauss + i * self.nvars] for i in range(self.ngauss)]
)
sigmas = np.array(
[
self.minparams[self.ngauss + self.Ndim + i * self.nvars]
for i in range(self.ngauss)
]
)
x = X[:, 0] if self.nvars == 1 else X
if self.nvars == 1:
x = np.asarray(x).ravel()
masses = np.zeros(self.ngauss)
for i in range(self.ngauss):
if self.nvars == 1:
half = n_sigma_window * max(sigmas[i], 1e-6)
in_window = (x >= mus[i] - half) & (x <= mus[i] + half)
else:
dist = np.linalg.norm(X - mus[i], axis=1)
half = n_sigma_window * max(np.mean(sigmas), 1e-6)
in_window = dist <= half
masses[i] = np.sum(F[in_window])
if np.sum(masses) <= 0:
return
weights = masses / np.sum(masses)
weights = np.clip(weights, 1e-6, 1.0 - 1e-6)
weights = weights / np.sum(weights)
self.minparams[: self.ngauss] = weights
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
def _loss_function(self, uparams):
"""
Least-squares loss between F and (normalization * mog.pdf(X)).
Normalization is computed analytically at each evaluation to minimize the residual sum of squares.
Uses internally normalized F for better numerical stability.
"""
minparams = Util.t_if(uparams, self.scales, Util.u2f)
params = self.pmap(minparams)
self.mog.set_stdcorr(params, self.nvars)
pdf = self.mog.pdf(self.X)
pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300)
c = np.dot(self._Fnorm, pdf) / np.dot(pdf, pdf)
residual = self._Fnorm - c * pdf
return np.sum(residual**2)
@Util.timer
def fit_data(
self,
verbose=0,
advance=0,
mode="lsq",
atol=None,
rtol=None,
maxgauss=50,
wtol=1e-2,
**args,
):
"""
Fit the MoG to the function on the mesh by nonlinear least squares.
Minimizes sum over grid points of (F - normalization * mog.pdf(X))^2.
The normalization is obtained analytically at each step so that the residual
sum of squares is minimized for the current MoG parameters. After fitting,
self.normalization holds the scale so that normalization * mog.pdf(X) ≈ F.
Ex. F.fit_data(tol=1e-6, options=dict(maxiter=500))
Parameters
----------
verbose : int, optional
Verbosity level (default 0).
advance : int, optional
If > 0, print progress every advance iterations (default 0).
mode : str, optional
Mode of fitting. Options: "lsq" (default), "multimodal", "noisy", "noisy_multimodal", or "adaptive".
- "lsq": Standard least-squares fit with all parameters free.
- "multimodal": Detects all peaks, sets ngauss to number of peaks found,
fixes means to peak positions, fits only sigmas and weights.
- "noisy": Smooths data first (~3% of points), detects peaks, selects
the ngauss most prominent peaks, uses them as initial values for means,
but leaves all parameters FREE to optimize. Best for noisy data.
- "noisy_multimodal": Smooths data first (~3% of points), detects ALL peaks,
sets ngauss to number of peaks found, fixes means at those positions,
fits only sigmas and weights. Like "multimodal" but with smoothing for noisy data.
- "adaptive": Incrementally increases ngauss from 1 until convergence criteria
are met (controlled by atol and rtol). Automatically finds optimal number
of Gaussians.
atol : float, optional
Absolute tolerance for R² in adaptive mode. Stops when R² >= atol (e.g. 0.99).
Only used when mode="adaptive".
rtol : float, optional
Relative tolerance for R² improvement in adaptive mode. Stops when
the average improvement over the last 3 ngauss values is less than rtol
(e.g. 0.01). Evaluates improvement trend to avoid stopping prematurely.
Only used when mode="adaptive".
maxgauss : int, optional
Maximum number of Gaussians to try in adaptive mode (default 50).
Only used when mode="adaptive".
wtol : float, optional
Weight tolerance for pruning spurious Gaussians in adaptive mode (default 1e-2).
After finding optimal ngauss, removes Gaussians with weights < wtol and
refits with remaining components. Helps eliminate spurious components.
Only used when mode="adaptive".
**args
Passed to scipy.optimize.minimize (e.g. method, tol, options).
"""
from scipy.signal import find_peaks
self.mog._ignoreWarnings = self._ignoreWarnings
minargs = dict(method="Powell")
minargs.update(args)
# Handle adaptive mode
if mode == "adaptive":
if atol is None and rtol is None:
raise ValueError(
"adaptive mode requires at least one of 'atol' or 'rtol' to be specified"
)
if verbose > 0:
print(f"Adaptive mode: starting with ngauss=1, maxgauss={maxgauss}")
if atol is not None:
print(f" Stop criterion: R² >= {atol}")
if rtol is not None:
print(f" Stop criterion: avg(ΔR²) over last 3 < {rtol}")
r2_history = [] # Keep history of last 3 R² values
best_ngauss = 1
best_r2 = -np.inf
for ng in range(1, maxgauss + 1):
# Re-initialize with new ngauss
self.ngauss = ng
self.Ndim = self.ngauss * self.nvars
self.Ncorr = int(self.nvars * (self.nvars - 1) / 2)
self.mog = MixtureOfGaussians(
ngauss=self.ngauss,
nvars=self.nvars,
domain=self.domain,
normalize_weights=False,
)
self.set_params()
# Fit with standard lsq mode (reuse code by calling fit_data recursively with mode="lsq")
# To avoid infinite recursion, we call the fitting code directly
if verbose > 0:
print(f"\n Trying ngauss={ng}...")
# Initialize parameters if not set by user
if not getattr(self, "_initial_params_set_by_user", False):
bounds_orig = getattr(self.mog, "_domain_bounds", None)
has_finite_domain = bounds_orig is not None and any(
np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1])
for j in range(self.nvars)
)
if has_finite_domain:
self._init_params_from_grid()
else:
self._init_params_from_function_support()
# Perform optimization
self.neval = 0
def _advance_adaptive(x, show=False):
if advance > 0 and (self.neval % advance == 0 or show):
if self.neval == 0:
print(f" Iterations:")
loss = self._loss_function(x)
print(f" Iter {self.neval}: loss = {loss:.6g}")
self.neval += 1
callback = _advance_adaptive if advance else None
self.solution = minimize(
self._loss_function,
self.uparams,
callback=callback,
**minargs,
)
if advance:
_advance_adaptive(self.solution.x, show=True)
self.uparams = self.solution.x
self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f)
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
pdf = self.mog.pdf(self.X)
pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300)
self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf))
# Calculate R²
out = self.quality_of_fit(verbose=False)
r2_current = out["R2"]
if verbose > 0:
print(f" R² = {r2_current:.6f}")
# Add to history
r2_history.append(r2_current)
# Update best
if r2_current > best_r2:
best_r2 = r2_current
best_ngauss = ng
# Check stopping criteria
stop_atol = atol is not None and r2_current >= atol
# Evaluate rtol based on average improvement over last 3 values
stop_rtol = False
if rtol is not None and len(r2_history) >= 3:
# Get last 3 R² values
last_3 = r2_history[-3:]
# Calculate improvements between consecutive values
improvements = [
last_3[i + 1] - last_3[i] for i in range(len(last_3) - 1)
]
avg_improvement = np.mean(improvements)
stop_rtol = avg_improvement < rtol
if verbose > 1:
print(f" Last 3 R²: {last_3}")
print(f" Improvements: {improvements}")
print(f" Avg improvement: {avg_improvement:.6f}")
if stop_atol or stop_rtol:
if verbose > 0:
if stop_atol:
print(
f"\n✅ Stopped: R² = {r2_current:.6f} >= atol = {atol}"
)
if stop_rtol:
last_3 = r2_history[-3:]
improvements = [
last_3[i + 1] - last_3[i]
for i in range(len(last_3) - 1)
]
avg_improvement = np.mean(improvements)
print(
f"\n✅ Stopped: avg(ΔR²) over last 3 = {avg_improvement:.6f} < rtol = {rtol}"
)
print(f"Final ngauss = {ng}")
# Prune spurious Gaussians with small weights
self._prune_and_refit_adaptive(wtol, verbose, advance, minargs)
return
# Reached maxgauss
if verbose > 0:
print(f"\n⚠️ Reached maxgauss = {maxgauss}")
print(f"Final R² = {best_r2:.6f} at ngauss = {best_ngauss}")
# Prune spurious Gaussians with small weights
self._prune_and_refit_adaptive(wtol, verbose, advance, minargs)
return
# Handle multimodal mode
if mode == "multimodal":
from scipy.signal import peak_widths, peak_prominences
F_flat = np.asarray(self.F).ravel()
# Detect peaks
# Use a relative height threshold
height_threshold = 0.05 * np.max(F_flat) if np.max(F_flat) > 0 else None
peaks, properties = find_peaks(F_flat, height=height_threshold)
if len(peaks) == 0:
print(
"Warning: No peaks detected in multimodal mode. Fallback to existing configuration."
)
else:
self.ngauss = len(peaks)
# Calculate widths and prominences
widths_samples, _, _, _ = peak_widths(F_flat, peaks, rel_height=0.5)
prominences, _, _ = peak_prominences(F_flat, peaks)
# Re-initialize MoG and parameters structure with new ngauss
self.Ndim = self.ngauss * self.nvars
self.Ncorr = int(self.nvars * (self.nvars - 1) / 2)
self.mog = MixtureOfGaussians(
ngauss=self.ngauss,
nvars=self.nvars,
domain=self.domain,
normalize_weights=False,
)
self.set_params()
has_weights = self.ngauss > 1
off_mu = self.ngauss if has_weights else 0
off_sig = off_mu + self.Ndim
# Set means to peak positions
mus_peaks = self.X[peaks]
# Set mus
for i in range(self.ngauss):
idx_base = off_mu + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i]
# Set sigmas based on widths
# widths_samples is FWHM in indices.
# Convert to physical units.
# Assuming approximate uniform grid for width estimation
if len(self.X) > 1:
dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1))
else:
dx = 1.0
sigma_conversion = 1.0 / 2.355 # FWHM to sigma (approx)
for i in range(self.ngauss):
width_physical = widths_samples[i] * dx
# Use a fraction of FWHM as sigma estimate
sigma_est = float(width_physical * sigma_conversion)
sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax))
idx_base = off_sig + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = sigma_est
# Set weights based on prominences (or peak heights)
# minparams[0:ngauss] are weights (if ngauss > 1)
if has_weights:
weights = prominences / np.sum(prominences)
self.minparams[: self.ngauss] = weights
# Function to create tight bounds for fixed parameters
def fix_val(val):
return (val - 1e-9, val + 1e-9)
# Construct bounds
# Recalculate component bounds
# Weights
boundw = (-np.inf, np.inf) # Transformed weights
# Sigmas
# We want standard bounds for sigmas.
# In multimodal mode: restrict sigma to 2 * domain_width
bounds_orig = getattr(self.mog, "_domain_bounds", None)
domain_widths = [
bounds_orig[j][1] - bounds_orig[j][0]
for j in range(self.nvars)
if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1])
]
sigma_hi = 0.99 * self._sigmax
if domain_widths:
width_max = max(domain_widths)
sigma_hi = min(sigma_hi, 2.0 * width_max)
bound_sigma_trans = (
Util.f2u(1e-6, self._sigmax),
Util.f2u(sigma_hi, self._sigmax),
)
# Rhos
bound_rho_trans = (-np.inf, np.inf)
# Bounds list construction
new_bounds = []
# Weights (if ngauss > 1)
if has_weights:
new_bounds.extend([boundw] * self.ngauss)
# Means (Fixed)
for i in range(self.ngauss):
for j in range(self.nvars):
mu_val = mus_peaks[i, j]
new_bounds.append((mu_val - 1e-9, mu_val + 1e-9))
# Sigmas
new_bounds.extend([bound_sigma_trans] * self.Ndim)
# Rhos
new_bounds.extend([bound_rho_trans] * (self.ngauss * self.Ncorr))
minargs["bounds"] = tuple(new_bounds)
# Sync uparams
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = True
if minargs.get("method") == "Powell":
minargs["method"] = "L-BFGS-B"
# Handle noisy mode (free means - uses peak positions as initial values)
if mode == "noisy":
from scipy.signal import peak_widths, peak_prominences
from scipy.ndimage import uniform_filter1d
if self.nvars != 1:
raise ValueError(
"mode='noisy' is only supported for univariate functions (nvars=1)"
)
F_flat = np.asarray(self.F).ravel()
# Smooth the data using a window ~2-3% of points
n_points = len(F_flat)
window_size = max(7, int(n_points * 0.03))
if window_size % 2 == 0:
window_size += 1
F_smooth = uniform_filter1d(F_flat, size=window_size, mode="nearest")
# Detect peaks in smoothed data
height_threshold = 0.05 * np.max(F_smooth) if np.max(F_smooth) > 0 else None
prominence_threshold = (
0.005 * np.max(F_smooth) if np.max(F_smooth) > 0 else None
)
peaks, properties = find_peaks(
F_smooth, height=height_threshold, prominence=prominence_threshold
)
if len(peaks) == 0:
print(
"Warning: No peaks detected in noisy mode. Using default initialization."
)
else:
# Select the ngauss most prominent peaks
widths_samples, _, _, _ = peak_widths(F_smooth, peaks, rel_height=0.5)
prominences, _, _ = peak_prominences(F_smooth, peaks)
if len(peaks) < self.ngauss:
print(
f"Warning: Only {len(peaks)} peaks detected but ngauss={self.ngauss}. "
f"Using all {len(peaks)} detected peaks."
)
num_peaks_to_use = len(peaks)
selected_indices = np.arange(len(peaks))
else:
num_peaks_to_use = self.ngauss
# Sort by prominence and select top ngauss
sorted_indices = np.argsort(prominences)[::-1]
selected_indices = sorted_indices[: self.ngauss]
selected_peaks = peaks[selected_indices]
selected_widths = widths_samples[selected_indices]
selected_prominences = prominences[selected_indices]
# Sort selected peaks by position for consistency
position_order = np.argsort(selected_peaks)
selected_peaks = selected_peaks[position_order]
selected_widths = selected_widths[position_order]
selected_prominences = selected_prominences[position_order]
# Set initial parameters based on detected peaks
has_weights = self.ngauss > 1
off_mu = self.ngauss if has_weights else 0
off_sig = off_mu + self.Ndim
# Set means to peak positions (as INITIAL VALUES, not fixed)
mus_peaks = self.X[selected_peaks]
for i in range(num_peaks_to_use):
idx_base = off_mu + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i]
# Set sigmas based on widths
if len(self.X) > 1:
dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1))
else:
dx = 1.0
sigma_conversion = 1.0 / 2.355
for i in range(num_peaks_to_use):
width_physical = selected_widths[i] * dx
sigma_est = float(width_physical * sigma_conversion)
sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax))
idx_base = off_sig + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = sigma_est
# Set weights based on prominences
if has_weights:
weights = selected_prominences / np.sum(selected_prominences)
self.minparams[:num_peaks_to_use] = weights
# Update uparams and mark as user-set
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = True
# Handle noisy_multimodal mode (fixed means at peak positions)
elif mode == "noisy_multimodal":
from scipy.signal import peak_widths, peak_prominences
from scipy.ndimage import uniform_filter1d
if self.nvars != 1:
raise ValueError(
"mode='noisy' is only supported for univariate functions (nvars=1)"
)
F_flat = np.asarray(self.F).ravel()
# Smooth the data using a window ~2-3% of points for effective noise reduction
# while preserving multiple peaks
n_points = len(F_flat)
# Window size: approximately 3% of points (balance between smoothing and peak preservation)
window_size = max(7, int(n_points * 0.03))
if window_size % 2 == 0: # Make it odd for symmetry
window_size += 1
F_smooth = uniform_filter1d(F_flat, size=window_size, mode="nearest")
# Detect peaks in smoothed data with balanced thresholds
# Height threshold: 5% of max to catch significant peaks
height_threshold = 0.05 * np.max(F_smooth) if np.max(F_smooth) > 0 else None
# Prominence threshold: 7% of max (balanced between catching real peaks and avoiding noise)
prominence_threshold = (
0.07 * np.max(F_smooth) if np.max(F_smooth) > 0 else None
)
peaks, properties = find_peaks(
F_smooth, height=height_threshold, prominence=prominence_threshold
)
if len(peaks) == 0:
print(
"Warning: No peaks detected in noisy_multimodal mode. Fallback to existing configuration."
)
else:
# Use ALL detected peaks (like multimodal mode)
self.ngauss = len(peaks)
# Re-initialize with new ngauss
self.Ndim = self.ngauss * self.nvars
self.Ncorr = int(self.nvars * (self.nvars - 1) / 2)
self.mog = MixtureOfGaussians(
ngauss=self.ngauss,
nvars=self.nvars,
domain=self.domain,
normalize_weights=False,
)
self.set_params()
# Calculate widths and prominences
widths_samples, _, _, _ = peak_widths(F_smooth, peaks, rel_height=0.5)
prominences, _, _ = peak_prominences(F_smooth, peaks)
has_weights = self.ngauss > 1
off_mu = self.ngauss if has_weights else 0
off_sig = off_mu + self.Ndim
# Set means to peak positions
mus_peaks = self.X[peaks]
for i in range(self.ngauss):
idx_base = off_mu + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = mus_peaks[i]
# Set sigmas based on widths
if len(self.X) > 1:
dx = abs((self.X[-1, 0] - self.X[0, 0]) / (len(self.X) - 1))
else:
dx = 1.0
sigma_conversion = 1.0 / 2.355 # FWHM to sigma
for i in range(self.ngauss):
width_physical = widths_samples[i] * dx
sigma_est = float(width_physical * sigma_conversion)
sigma_est = float(np.clip(sigma_est, 1e-6, 0.99 * self._sigmax))
idx_base = off_sig + i * self.nvars
self.minparams[idx_base : idx_base + self.nvars] = sigma_est
# Set weights based on prominences
if has_weights:
weights = prominences / np.sum(prominences)
self.minparams[: self.ngauss] = weights
# Construct bounds (means fixed, sigmas and weights free)
boundw = (-np.inf, np.inf)
bounds_orig = getattr(self.mog, "_domain_bounds", None)
domain_widths = [
bounds_orig[j][1] - bounds_orig[j][0]
for j in range(self.nvars)
if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1])
]
sigma_hi = 0.99 * self._sigmax
if domain_widths:
width_max = max(domain_widths)
sigma_hi = min(sigma_hi, 2.0 * width_max)
bound_sigma_trans = (
Util.f2u(1e-6, self._sigmax),
Util.f2u(sigma_hi, self._sigmax),
)
bound_rho_trans = (-np.inf, np.inf)
new_bounds = []
if has_weights:
new_bounds.extend([boundw] * self.ngauss)
# Fix means
for i in range(self.ngauss):
for j in range(self.nvars):
mu_val = mus_peaks[i, j]
new_bounds.append((mu_val - 1e-9, mu_val + 1e-9))
# Free sigmas
new_bounds.extend([bound_sigma_trans] * self.Ndim)
# Free rhos
new_bounds.extend([bound_rho_trans] * (self.ngauss * self.Ncorr))
minargs["bounds"] = tuple(new_bounds)
self.uparams = Util.t_if(self.minparams, self.scales, Util.f2u)
self._initial_params_set_by_user = True
if minargs.get("method") == "Powell":
minargs["method"] = "L-BFGS-B"
bounds_orig = getattr(self.mog, "_domain_bounds", None)
has_finite_domain = bounds_orig is not None and any(
np.isfinite(bounds_orig[j][0]) or np.isfinite(bounds_orig[j][1])
for j in range(self.nvars)
)
if (
has_finite_domain
and "bounds" not in args
and mode not in ["multimodal", "noisy_multimodal"]
):
domain_widths = [
bounds_orig[j][1] - bounds_orig[j][0]
for j in range(self.nvars)
if np.isfinite(bounds_orig[j][0]) and np.isfinite(bounds_orig[j][1])
]
sigma_hi = 0.99 * self._sigmax
if domain_widths:
width_max = max(domain_widths)
sigma_hi = min(sigma_hi, 0.25 * width_max)
bounds_tuple = self.set_bounds(bounds=(1e-6, sigma_hi))
minargs["bounds"] = bounds_tuple
if minargs.get("method") == "Powell":
minargs["method"] = "L-BFGS-B"
if not getattr(self, "_initial_params_set_by_user", False):
self._init_params_from_grid()
else:
if not getattr(self, "_initial_params_set_by_user", False):
self._init_params_from_function_support()
self.neval = 0
def _advance(x, show=False):
if advance > 0 and (self.neval % advance == 0 or show):
if self.neval == 0:
print("Iterations:")
loss = self._loss_function(x)
print(f" Iter {self.neval}: loss = {loss:.6g}")
self.neval += 1
callback = _advance if advance else None
self.solution = minimize(
self._loss_function,
self.uparams,
callback=callback,
**minargs,
)
if advance:
_advance(self.solution.x, show=True)
self.uparams = self.solution.x
self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f)
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
pdf = self.mog.pdf(self.X)
pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300)
self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf))
def _prune_and_refit_adaptive(self, wtol, verbose, advance, minargs):
"""
Prune Gaussians with small weights and refit with remaining components.
Used in adaptive mode to eliminate spurious Gaussians after convergence.
Parameters
----------
wtol : float
Weight threshold. Gaussians with weight < wtol are removed.
verbose : int
Verbosity level.
advance : int
Print progress every advance iterations.
minargs : dict
Minimization arguments.
"""
# Get current weights
weights = self.mog.weights.ravel()
if verbose > 0:
print(f"\n🔍 Pruning Gaussians with weights < {wtol}:")
print(f" Current weights: {weights}")
# Identify Gaussians to keep (weight >= wtol)
keep_mask = weights >= wtol
n_keep = np.sum(keep_mask)
if n_keep == self.ngauss:
if verbose > 0:
print(
f" ✅ All {self.ngauss} Gaussians have sufficient weight. No pruning needed."
)
return
if n_keep == 0:
if verbose > 0:
print(f" ⚠️ All Gaussians have weight < {wtol}. Keeping at least 1.")
# Keep the Gaussian with largest weight
keep_mask = np.zeros_like(weights, dtype=bool)
keep_mask[np.argmax(weights)] = True
n_keep = 1
# Extract parameters of Gaussians to keep
mus_keep = self.mog.mus[keep_mask]
sigmas_keep = self.mog.sigmas[keep_mask]
weights_keep = weights[keep_mask]
if verbose > 0:
n_removed = self.ngauss - n_keep
print(f" 🗑️ Removing {n_removed} Gaussian(s) with small weights")
print(f" ✅ Keeping {n_keep} Gaussian(s)")
print(f" Refitting with pruned components...")
# Re-initialize with reduced ngauss
self.ngauss = n_keep
self.Ndim = self.ngauss * self.nvars
self.Ncorr = int(self.nvars * (self.nvars - 1) / 2)
self.mog = MixtureOfGaussians(
ngauss=self.ngauss,
nvars=self.nvars,
domain=self.domain,
normalize_weights=False,
)
self.set_params()
# Set initial parameters from pruned components
self.set_initial_params(mus=mus_keep, sigmas=sigmas_keep)
# Perform optimization with standard lsq
self.neval = 0
def _advance_prune(x, show=False):
if advance > 0 and (self.neval % advance == 0 or show):
if self.neval == 0:
print(f" Iterations:")
loss = self._loss_function(x)
print(f" Iter {self.neval}: loss = {loss:.6g}")
self.neval += 1
callback = _advance_prune if advance else None
self.solution = minimize(
self._loss_function,
self.uparams,
callback=callback,
**minargs,
)
if advance:
_advance_prune(self.solution.x, show=True)
self.uparams = self.solution.x
self.minparams = Util.t_if(self.uparams, self.scales, Util.u2f)
params = self.pmap(self.minparams)
self.mog.set_stdcorr(params, self.nvars)
pdf = self.mog.pdf(self.X)
pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300)
self.normalization = float(np.dot(self.F, pdf) / np.dot(pdf, pdf))
if verbose > 0:
out = self.quality_of_fit(verbose=False)
print(f" ✅ Refit complete: ngauss = {self.ngauss}, R² = {out['R2']:.6f}")
[docs]
def quality_of_fit(self, verbose=True):
"""
Report quality-of-fit statistics after fitting.
Computes the coefficient of determination (R²) and the root mean
square error (RMSE) between the mesh function F and the fitted
model (normalization * mog.pdf(X)). Call after fit_data().
R² (coefficient of determination)
R² = 1 - SS_res / SS_tot, where
SS_res = sum((F - F_pred)²), SS_tot = sum((F - mean(F))²).
R² = 1 means a perfect fit; R² = 0 means the model explains
no variance beyond the mean; R² < 0 is possible if the fit
is worse than using the constant mean(F).
RMSE (root mean square error)
RMSE = sqrt(mean((F - F_pred)²)). Same units as F; lower
is better.
Parameters
----------
verbose : bool, optional
If True (default), print a short report.
Returns
-------
dict
Keys: 'R2', 'RMSE', 'SS_res', 'SS_tot', 'n_points'.
Raises
------
ValueError
If fit_data() has not been run (normalization is None).
"""
if self.normalization is None:
raise ValueError(
"quality_of_fit requires a fitted model; run fit_data() first"
)
F_obs = np.asarray(self.F).ravel().astype(float)
pdf = self.mog.pdf(self.X)
pdf = np.maximum(np.atleast_1d(pdf).astype(float), 1e-300)
F_pred = self.normalization * pdf
F_pred = np.asarray(F_pred).ravel().astype(float)
n = F_obs.size
ss_res = float(np.sum((F_obs - F_pred) ** 2))
mean_f = float(np.mean(F_obs))
ss_tot = float(np.sum((F_obs - mean_f) ** 2))
if ss_tot > 0:
r2 = 1.0 - ss_res / ss_tot
else:
r2 = float("nan")
rmse = np.sqrt(np.mean((F_obs - F_pred) ** 2))
out = {
"R2": r2,
}
if verbose:
print("Quality of fit (after fit_data):")
print(f" R² = {out['R2']:.6g} (1 = perfect, 0 = no better than mean)")
return out
[docs]
def plot_fit(
self,
figsize=(8, 5),
xlabel=None,
ylabel=None,
n_curve=300,
sargs=True,
pargs=True,
hargs=None,
largs=None,
dargs=None,
**kwargs,
):
"""
Plot original function (data) and fitted MoG for univariate fits.
Draws the mesh data (X, F) and the fitted model normalization * mog.pdf(X).
Only implemented for nvars == 1.
Ex. F.plot_fit(xlabel=r'$\\tau$', ylabel='ISR', sargs=dict(ms=2, color='C0'))
Parameters
----------
figsize : tuple, optional
Figure size (default (8, 5)).
xlabel : str, optional
Label for the x-axis.
ylabel : str, optional
Label for the y-axis.
n_curve : int, optional
Number of points for the smooth PDF curve (default 300).
sargs : dict or bool or None, optional
Options for the data (points) plot (passed to ax.plot or ax.scatter).
- If True (default): plots data with defaults (marker='+', linestyle='-', label='data').
- If dict: plots data with provided options.
- If None: data is not plotted.
pargs : dict or bool or None, optional
Options for the fit line (norm × MoG PDF).
- If True (default): plots PDF with default options (color='r', lw=2, label='fit').
- If dict: plots PDF with provided options.
- If None: PDF is not plotted.
Passed to ax.plot.
dargs : dict or bool or None, optional
Options for the component lines.
- If True or dict: plots individual components with defaults (color='k', ls='--', alpha=0.3).
- If dict: updates defaults with provided options.
- If None (default): components are not plotted.
hargs : dict, optional
Options for histogram (not currently used in plot_fit, but included for API consistency).
largs : dict, optional
Deprecated. Use pargs instead.
Returns
-------
fig : matplotlib.figure.Figure
Figure handle.
Examples
--------
>>> F.plot_fit(xlabel=r'$\\tau$', ylabel='ISR')
>>> F.plot_fit(sargs=dict(ms=2, color='C0'), pargs=dict(color='C1', lw=3))
"""
if self.nvars != 1:
raise NotImplementedError(
"plot_fit is only implemented for univariate (nvars=1) fits"
)
from matplotlib import pyplot as plt
# Handle deprecation/alias
if pargs is None and largs is not None:
pargs = largs
# Apply defaults for PDF (pargs)
if pargs is True:
pargs = dict(color="r", lw=2, label="fit (norm × MoG PDF)")
elif isinstance(pargs, dict):
pargs = dict(pargs)
pargs.setdefault("label", "fit (norm × MoG PDF)")
# If pargs is None (or False), it remains None/False and we skip plotting
if pargs:
# Default zorder for PDF (background)
pargs.setdefault("zorder", -100)
# Apply defaults for Data (sargs)
if sargs is True:
# User requested default: sargs=dict(marker='+',linestyle='-')
sargs = dict(marker="+", ms=5, linestyle="-", label="data")
elif isinstance(sargs, dict):
sargs = dict(sargs)
sargs.setdefault("label", "data")
sargs.setdefault("marker", "+")
sargs.setdefault("linestyle", "-")
# If sargs is None (or False), it remains None/False and we skip plotting
# Merge kwargs into pargs ?? Or sargs?
# If user passes kwargs, they probably mean to affect the plot being shown (PDF).
if kwargs and pargs:
pargs.update(kwargs)
fig, ax = plt.subplots(1, 1, figsize=figsize)
x = np.asarray(self.X).ravel()
f = np.asarray(self.F).ravel()
# Only plot data if sargs is explicitly provided (or true default)
if sargs:
sargs_defaults = dict(ms=1.5, label="data", marker="+", linestyle="-")
# We use update to overwrite defaults with user provided sargs (which might be the default dict from above)
sargs_defaults.update(sargs)
# Default zorder for data (foreground)
sargs_defaults.setdefault("zorder", -100)
ax.plot(x, f, **sargs_defaults)
# Plot PDF (always plotted unless pargs somehow suppressed, but user wants it default)
if self.normalization is not None:
x_min, x_max = x.min(), x.max()
margin = max(1e-6, 0.05 * (x_max - x_min))
x_curve = np.linspace(x_min - margin, x_max + margin, n_curve)
# Plot components if requested
if dargs is not None and dargs is not False:
dargs_default = dict(color="k", ls="--", alpha=0.3)
if isinstance(dargs, dict):
dargs_default.update(dargs)
dargs_default.setdefault("zorder", -10)
# We need to construct single-component MoGs to use the robust pdf method
# (which handles truncation and normalization correctly)
for k in range(self.mog.ngauss):
# Extract parameters for k-th component
mu_k = self.mog.mus[k : k + 1]
sigma_k = self.mog.Sigmas[k : k + 1]
# Create temporary MoG with this single component
# Note: we use weight=1 here and apply the actual weight manually
# to ensure we get the component shape correctly scaled
mog_k = MixtureOfGaussians(
mus=mu_k,
Sigmas=sigma_k,
weights=[1.0],
domain=self.domain,
normalize_weights=False,
)
pdf_k = mog_k.pdf(x_curve.reshape(-1, 1))
# Scale by normalization and the component's weight
model_vals_k = (
self.normalization * self.mog.weights[k] * np.atleast_1d(pdf_k)
)
# Determine styling for this component
dargs_k = dargs_default.copy()
# Color: cycle if not explicitly provided
if "color" not in dargs and "c" not in dargs:
# Use standard matplotlib cycle C0, C1, etc.
dargs_k["color"] = f"C{k % 10}"
# Label: construct default label with component details
base_label = dargs_default.get("label", f"Comp {k + 1}")
if base_label is None: # explicit None
label = None
else:
mu_val = float(mu_k.ravel()[0])
sigma_val = np.sqrt(float(sigma_k.ravel()[0]))
label = rf"{base_label} ($\mu$={mu_val:.2f}, $\sigma$={sigma_val:.2f})"
dargs_k["label"] = label
ax.plot(x_curve, model_vals_k, **dargs_k)
if pargs:
pdf_vals = self.mog.pdf(x_curve.reshape(-1, 1))
model_vals = self.normalization * np.atleast_1d(pdf_vals)
ax.plot(x_curve, model_vals, **pargs)
if xlabel is not None:
ax.set_xlabel(xlabel)
if ylabel is not None:
ax.set_ylabel(ylabel)
ax.legend(loc="best", frameon=True)
ax.grid(True, alpha=0.3)
ax.set_ylim(0, None)
fig.tight_layout()
if not getattr(self, "_watermark_added_plot_fit", False):
multimin_watermark(ax, frac=0.5)
self._watermark_added_plot_fit = True
self.fig = fig
return fig
def _parse_function_mesh_data(self, Xmesh, Fmesh):
"""
Convert (Xmesh, Fmesh) from mesh/function format to points X (N, nvars) and values F (N,).
For univariate: Xmesh and Fmesh are 1D arrays of the same length.
For multivariate: Xmesh is a tuple of ndarrays (e.g. from np.meshgrid), Fmesh is an array
of the same shape as the grid.
Parameters
----------
Xmesh : array-like or tuple of array-like
Evaluation points. 1D array (n,) for univariate, or tuple of arrays for a grid.
Fmesh : array-like
Function values at those points. Shape (n,) for 1D or same shape as grid for mesh.
Returns
-------
X : numpy.ndarray
Points, shape (N, nvars).
F : numpy.ndarray
Values, shape (N,).
nvars : int
Number of variables.
"""
Fmesh = np.asarray(Fmesh, dtype=float)
if isinstance(Xmesh, (list, tuple)) and len(Xmesh) > 0:
# Multivariate: meshgrid-style (x1, x2, ...)
grids = [np.asarray(x, dtype=float) for x in Xmesh]
nvars = len(grids)
shape = grids[0].shape
for g in grids:
if g.shape != shape:
raise ValueError(
"Xmesh grid components must have the same shape; "
f"got {[g.shape for g in grids]}"
)
if Fmesh.shape != shape:
raise ValueError(
f"Fmesh shape {Fmesh.shape} must match grid shape {shape}"
)
X = np.column_stack([g.ravel() for g in grids])
F = Fmesh.ravel()
else:
# Univariate: Xmesh and Fmesh are 1D
Xmesh = np.asarray(Xmesh, dtype=float)
if Xmesh.ndim != 1:
raise ValueError(
"For univariate, Xmesh must be 1D; for multivariate use a tuple of arrays"
)
n = len(Xmesh)
if Fmesh.size != n:
raise ValueError(
f"Xmesh length ({n}) and Fmesh size ({Fmesh.size}) must match"
)
nvars = 1
X = Xmesh.reshape(-1, 1)
F = np.asarray(Fmesh, dtype=float).ravel()
return X, F, nvars