Source code for coniii.solvers

# ====================================================================================== #
# ConIII module for algorithms for solving the inverse Ising problem.
#
# Authors: Edward Lee (edlee@alumni.princeton.edu) and Bryan Daniels
#          (bryan.daniels.1@asu.edu)
# ====================================================================================== #
"""Inverse-Ising / maximum-entropy solvers.

This is the central module of ConIII. Each solver takes a sample of
observations (or a system size) and fits the maximum-entropy model
whose pairwise correlations match the data. The solvers share the
:class:`Solver` base class and the ``basic_setup`` machinery; they
differ in the algorithm used to find the Lagrange multipliers.

Public API (see ``__all__``)
----------------------------
:class:`Enumerate`
    Exact solution by enumeration; feasible for small systems.
:class:`SparseEnumerate`
    Enumerate with a restricted set of constrained parameters.
:class:`MPF`
    Minimum Probability Flow.
:class:`MCH`
    Monte Carlo Histogram (Broderick et al., 2007). Relies on the
    samplers in :mod:`coniii.samplers`.
:class:`Pseudo`
    Pseudolikelihood maximization.
:class:`ClusterExpansion`
    Adaptive cluster expansion (Barton & Cocco, 2013).
:class:`RegularizedMeanField`
    Regularized mean-field inversion.

The mean-field helpers used by ``ClusterExpansion`` and
``RegularizedMeanField`` live in :mod:`coniii.legacy.mean_field_ising`.
See ``usage_guide.ipynb`` for worked examples of each solver.
"""
import copy
import warnings as _warnings
import numpy as np
import multiprocess as mp
from warnings import warn
from scipy.optimize import minimize, fmin_ncg, minimize_scalar, root, check_grad
from scipy.spatial.distance import squareform

# mean_field_ising lives in coniii.legacy and emits a DeprecationWarning on
# import; suppress it for internal use because ClusterExpansion and
# RegularizedMeanField legitimately need the module.
with _warnings.catch_warnings():
    _warnings.simplefilter("ignore", DeprecationWarning)
    from .legacy import mean_field_ising

from .utils import *
from .samplers import *
from .models import Ising


__all__ = [
    'Solver',
    'Enumerate', 'SparseEnumerate',
    'MPF',
    'MCH', 'MCHIncompleteData', 'SparseMCH',
    'Pseudo',
    'ClusterExpansion',
    'RegularizedMeanField',
]



[docs] class Solver(): """Base class for declaring common methods and attributes for inverse maxent algorithms. """
[docs] def basic_setup(self, sample_or_n=None, model=None, calc_observables=None, iprint=True, model_kwargs={}): """General routine for setting up a Solver instance. Parameters ---------- sample_or_n : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. iprint : str, True If empty, do not display warning messages. model_kwargs : dict, {} Additional arguments that will be passed to Ising class. These only matter if model is None. Important ones include "n_cpus" and "rng". """ # When neither sampler nor system size are specified if sample_or_n is None: self.sample = None self.n = None self.model = model if calc_observables is None: if iprint: warn("Assuming that calc_observables should be for Ising model.") self.calc_observables = define_ising_helper_functions()[1] else: self.calc_observables = calc_observables self.constraints = None # When system size is specified elif isinstance(sample_or_n, int): assert sample_or_n > 1, "System size must be greater than 1." self.sample = None self.n = sample_or_n if model is None: self.model = Ising(np.zeros((self.n**2+self.n)//2), **model_kwargs) else: self.model = model if calc_observables is None: if iprint: warn("Assuming that calc_observables should be for Ising model.") self.calc_observables = define_ising_helper_functions()[1] else: self.calc_observables = calc_observables self.constraints = None # When data sample is specified else: if not set(np.unique(sample_or_n).tolist())<=set((-1,1)) and iprint: warn("Data is not only (-1, 1) entries.") self.sample = sample_or_n self.n = sample_or_n.shape[1] if model is None: self.model = Ising(np.zeros((self.n**2+self.n)//2), **model_kwargs) else: self.model = model if calc_observables is None: if iprint: warn("Assuming that calc_observables should be for Ising model.") self.calc_observables = define_ising_helper_functions()[1] else: self.calc_observables = calc_observables self.constraints = self.calc_observables(sample_or_n).mean(0) if np.isclose(np.abs(self.constraints), 1, atol=1e-3).any() and iprint: msg = ("Some pairwise correlations have magnitude close to one. Potential for poor "+ "solutions from diverging parameters.") warn(msg) self.iprint = iprint
[docs] def solve(self): """To be defined in derivative classes.""" raise NotImplementedError
[docs] def set_insertion_ix(self): """Calculate indices to fill in with zeros to "fool" code that takes full set of params. """ nParams = self.model.multipliers.size insertion_ix = [0] * self.parameterIx[0] for i, ix in enumerate(self.parameterIx[1:]): insertion_ix.extend([i+1] * (ix-self.parameterIx[i]-1)) if self.parameterIx.size==1: # give i a value since above for loop didn't iterate i = -1 if self.parameterIx[-1]<(nParams - 1): insertion_ix.extend([i+2] * (nParams - self.parameterIx[-1] - 1)) #assert np.insert(initial_guess, insertion_ix, 0).size==nParams self.insertionIx = insertion_ix
[docs] def fill_in(self, x, fill_value=0): """Helper function for filling in missing parameter values. Parameters ---------- x : ndarray fill_value : float, 0 Returns ------- ndarray With missing entries filled in. """ return np.insert(x, self.insertionIx, fill_value)
[docs] def logp(self, sample=None, run_checks=True): """Estimate log likelihood of given set of states using self.model.sample. Parameters ---------- sample : ndarray, None Sample of states for which to estimate log likelihood. Default is to use self.sample. run_checks : bool, True Returns ------- ndarray """ # verify input sample = sample if not sample is None else self.sample if run_checks: assert isinstance(sample, np.ndarray) assert set(np.unique(sample)) <= frozenset((-1,0,1)) if sample.ndim==1: sample = s[None,:] # only iterate over unique states in sample us, uix = np.unique(sample, axis=0, return_inverse=True) logp = np.zeros(sample.shape[0]) for i, s in enumerate(us): # est probability of an observation is frequency of observed subset spin_ix = s!=0 p = (s[spin_ix][None,:]==self.model.sample[:,spin_ix]).all(1).sum() p /= self.model.sample.shape[0] logp[uix==i] = p return np.log(logp)
#end Solver
[docs] class Enumerate(Solver): """Class for solving fully-connected inverse Ising model problem by enumeration of the partition function and then using gradient descent. """ def __init__(self, sample=None, model=None, calc_observables=None, iprint=True, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs)
[docs] def solve(self, initial_guess=None, constraints=None, max_param_value=50, full_output=False, use_root=True, scipy_solver_kwargs={'method':'krylov', 'options':{'fatol':1e-13,'xatol':1e-13}}): """Must specify either constraints (the correlations) or samples from which the correlations will be calculated using self.calc_observables. This routine by default uses scipy.optimize.root to find the solution. This is MUCH faster than the scipy.optimize.minimize routine which can be used instead. If still too slow, try adjusting the accuracy. If not converging, try increasing the max number of iterations. If receiving Jacobian error (or some other numerical estimation error), parameter values may be too large for faithful evaluation. Try decreasing max_param_value. Parameters ---------- initial_guess : ndarray, None Initial starting guess for parameters. By default, this will start with all zeros if left unspecified. constraints : ndarray, None Can specify constraints directly instead of using the ones calculated from the sample. This can be useful when the pairwise correlations are known exactly. This will override the self.constraints data member. max_param_value : float, 50 Absolute value of max parameter value. Bounds can also be set in the kwargs passed to the minimizer, in which case this should be set to None. full_output : bool, False If True, return output from scipy.optimize.minimize. use_root : bool, True If False, use scipy.optimize.minimize instead. This is typically much slower. scipy_solver_kwargs : dict, {'method':'krylov', 'options':{'fatol':1e-13,'xatol':1e-13}} High accuracy is slower. Although default accuracy may not be so good, lowering these custom presets will speed things up. Choice of the root finding method can also change runtime and whether a solution is found or not. Recommend playing around with different solvers and tolerances or getting a close approximation using a different method if solution is hard to find. Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. dict, optional Output from scipy.optimize.root. """ if not initial_guess is None: assert initial_guess.size==self.constraints.size else: initial_guess = np.zeros(self.constraints.size) if constraints is None: constraints = self.constraints assert not constraints is None # default solver routine if use_root: if not max_param_value is None: def f(params): if np.any(np.abs(params)>max_param_value): return np.zeros_like(constraints) + 1e30 return self.model.calc_observables(params)-constraints else: def f(params): return self.model.calc_observables(params)-constraints soln = root(f, initial_guess, **scipy_solver_kwargs) else: if not max_param_value is None: def f(params): if np.any(np.abs(params)>max_param_value): return 1e30 return np.linalg.norm( self.model.calc_observables(params)-constraints ) else: def f(params): return np.linalg.norm( self.model.calc_observables(params)-constraints ) soln = minimize(f, initial_guess, **scipy_solver_kwargs) self.multipliers = soln['x'] if full_output: return soln['x'], soln return soln['x']
[docs] def logp(self, sample=None, run_checks=True): """Log likelihood of given set of states using self.model.calc_p(). Parameters ---------- sample : ndarray, None Sample of states for which to estimate log likelihood. Default is to use self.sample. run_checks : bool, True Returns ------- ndarray """ # verify input sample = sample if not sample is None else self.sample if run_checks: assert isinstance(sample, np.ndarray) assert set(np.unique(sample)) <= frozenset((-1,0,1)) if sample.ndim==1: sample = s[None,:] logp = np.zeros(sample.shape[0]) all_states = bin_states(self.n, sym=True) p = self.model.calc_p(self.multipliers) for i, s in enumerate(sample): # est probability of an observation is frequency of observed subset logp[i] = p[(s[None,:]==all_states).all(1)] return np.log(logp)
#end Enumerate
[docs] class SparseEnumerate(Solver): """Class for solving Ising model with a sparse parameter set by enumeration of the partition function and then using gradient descent. Unspecified parameters are implicitly fixed to be zero, which corresponds to leaving the corresponding correlation function unconstrained. """ def __init__(self, sample=None, model=None, calc_observables=None, parameter_ix=None, iprint=True, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. parameter_ix : ndarray, None Indices of Ising parameters to fit. Ones that are not specified are fixed at zero. Parameters are ordered by default as all fields (indices 0 thru n-1) and then all couplings (as unraveled upper triangular interaction symmetric matrix). **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) assert not parameter_ix is None, "Must specify parameter_ix." assert parameter_ix.dtype==np.int64, "parameter_ix must be array of indices." if np.unique(parameter_ix).size != parameter_ix.size and self.iprint: warn("parameter_ix has repeated entries.") self.parameterIx = np.unique(parameter_ix) self.set_insertion_ix()
[docs] def solve(self, initial_guess=None, constraints=None, max_param_value=50, full_output=False, use_root=True, scipy_solver_kwargs={'method':'krylov', 'options':{'fatol':1e-13,'xatol':1e-13}}): """Must specify either constraints (the correlations) or samples from which the correlations will be calculated using self.calc_observables. This routine by default uses scipy.optimize.root to find the solution. This is MUCH faster than the scipy.optimize.minimize routine which can be used instead. If still too slow, try adjusting the accuracy. If not converging, try increasing the max number of iterations. If receiving Jacobian error (or some other numerical estimation error), parameter values may be too large for faithful evaluation. Try decreasing max_param_value. Parameters ---------- initial_guess : ndarray, None Initial starting guess for parameters. By default, this will start with all zeros if left unspecified. constraints : ndarray, None Can specify constraints directly instead of using the ones calculated from the sample. This can be useful when the pairwise correlations are known exactly. This will override the self.constraints data member. max_param_value : float, 50 Absolute value of max parameter value. Bounds can also be set in the kwargs passed to the minimizer, in which case this should be set to None. full_output : bool, False If True, return output from scipy.optimize.minimize. use_root : bool, True If False, use scipy.optimize.minimize instead. This is typically much slower. scipy_solver_kwargs : dict, {'method':'krylov', 'options':{'fatol':1e-13,'xatol':1e-13}} High accuracy is slower. Although default accuracy may not be so good, lowering these custom presets will speed things up. Choice of the root finding method can also change runtime and whether a solution is found or not. Recommend playing around with different solvers and tolerances or getting a close approximation using a different method if solution is hard to find. Returns ------- ndarray Solved multipliers (parameters). dict, optional Output from scipy.optimize.root. """ if not initial_guess is None: assert initial_guess.size==self.parameterIx.size else: initial_guess = np.zeros(self.parameterIx.size) if constraints is None: constraints = self.constraints assert not constraints is None and constraints.size==self.parameterIx.size # default solver routine if use_root: if not max_param_value is None: def f(params): # fix unspecified params to zero params = self.fill_in(params) if np.any(np.abs(params) > max_param_value): return np.zeros_like(constraints) + 1e30 return self.model.calc_observables(params)[self.parameterIx]-constraints else: def f(params): # fix unspecified params to zero params = self.fill_in(params) return self.model.calc_observables(params)[self.parameterIx]-constraints soln = root(f, initial_guess, **scipy_solver_kwargs) else: if not max_param_value is None: def f(params): # fix unspecified params to zero params = self.fill_in(params) if np.any(np.abs(params) > max_param_value): return 1e30 return np.linalg.norm( self.model.calc_observables(params)[self.parameterIx]-constraints ) else: def f(params): # fix unspecified params to zero params = self.fill_in(params) return np.linalg.norm( self.model.calc_observables(params)[self.parameterIx]-constraints ) soln = minimize(f, initial_guess, **scipy_solver_kwargs) self.multipliers = soln['x'] if full_output: return soln['x'], soln return soln['x']
[docs] def logp(self, sample=None, run_checks=True): """Log likelihood of given set of states using self.model.calc_p(). Parameters ---------- sample : ndarray, None Sample of states for which to estimate log likelihood. Default is to use self.sample. run_checks : bool, True Returns ------- ndarray """ # verify input sample = sample if not sample is None else self.sample if run_checks: assert isinstance(sample, np.ndarray) assert set(np.unique(sample)) <= frozenset((-1,0,1)) if sample.ndim==1: sample = s[None,:] logp = np.zeros(sample.shape[0]) all_states = bin_states(self.n, sym=True) p = self.model.calc_p(self.fill_in(self.multipliers)) for i, s in enumerate(sample): # est probability of an observation is frequency of observed subset missing_spin_ix = s==0 if missing_spin_ix.any(): logp[i] = p[(s[~missing_spin_ix][None,:]==all_states[:,~missing_spin_ix]).all(1)].sum() else: logp[i] = p[(s[None,:]==all_states).all(1)] return np.log(logp)
#end SparseEnumerate def unwrap_self_worker_obj(arg, **kwarg): return MPF.worker_objective_task(*arg, **kwarg)
[docs] class MPF(Solver): def __init__(self, sample, model=None, calc_observables=None, calc_de=None, adj=None, iprint=True, **default_model_kwargs): """Parallelized implementation of Minimum Probability Flow algorithm. Most time consuming step is the computation of the energy of a given state. Make this as fast as possible. Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. calc_de : function, None Function for calculating derivative of energy wrt parameters. Takes in 2d state array and index of the parameter. adj : function, None Function for getting all the neighbors of any given state. Note that the backed in self.solvers runs everything in the {0,1} basis for spins, so this needs to find neighboring states in the {0,1} basis. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) # Use the supplied neighbor function, or the default Ising adj from utils. if adj is None: from .utils import adj self.adj = adj # calc_de is optional; None means the calc_de speed-up is disabled. self.calc_de = calc_de
[docs] @staticmethod def worker_objective_task( s, Xcount, adjacentStates, params, calc_e ): return Xcount * np.sum(np.exp( .5*(calc_e(s[None,:],params) - calc_e(adjacentStates,params) ) ))
[docs] def K( self, Xuniq, Xcount, adjacentStates, params ): """Compute objective function. Parameters ---------- Xuniq : ndarray (ndata x ndims) unique states that appear in the data Xcount : ndarray of int number of times that each unique state appears in the data adjacentStates : list of ndarray list of adjacent states for each given unique state params : ndarray parameters for computation of energy Returns ------- K : float """ obj = 0. objGrad = np.zeros((params.size)) for i,s in enumerate(Xuniq): dobj = Xcount[i] * np.exp( .5*(self.model.calc_e(s[None,:], params) - self.model.calc_e(adjacentStates[i], params) ) ) if not self.calc_de is None: for j in range(params.size): if dobj.size != adjacentStates[i].shape[0]: raise Exception("Sizes do not match") objGrad[j] += .5 * (dobj * ( self.calc_de(s[None,:],j) - self.calc_de(adjacentStates[i],j) )).sum() obj += dobj.sum() #else: # # Parallel loop through objective function calculation for each state in the data. # obj = [self.pool.apply( unwrap_self_worker_obj, # args=([Xuniq[i],Xcount[i],adjacentStates[i],params,self.calc_e],) ) # for i in range(Xuniq.shape[0])] # obj = obj.sum() # if not self.calc_de is None: # from warning import warn # warn("Gradient computation not written for parallel loop.") if not self.calc_de is None: return obj / Xcount.sum(), objGrad / Xcount.sum() else: return obj / Xcount.sum()
def _K( self, X, J ): """Translation from Sohl-Dickstein's code K_dk_ising.m. This is here for testing purposes only. Caution: This uses a different convention for negatives and 1/2 factors. To use this properly, all parameters will have an extra negative, the returned J's will be halved and the energy calculation should include a 1/2 factor in front of h's. """ nbatch, ndims = X.shape X = X.T h = J[:ndims] J = squareform( J[ndims:] ) J[diag_indices(ndims)] = h Y = dot(J,X) diagJ = J.diagonal() # XnotX contains (X - [bit flipped X]) XnotX = 2.*X-1; # Kfull is a [ndims, nbatch] matrix containing the contribution to the # objective function from flipping each bit in the rows, for each datapoint # on the columns Kfull = np.exp( XnotX * Y - (1/2)*tile(diagJ[:,None],(1,nbatch)) ) K = sum(Kfull) K = K / nbatch return K
[docs] def logK( self, Xuniq, Xcount, adjacentStates, params ): """ Compute log of objective function. Parameters ---------- Xuniq : ndarray (n_samples, n_dim) unique states that appear in the data Xcount : ndarray of int number of times that each unique state appears in the data adjacentStates : list of ndarray list of adjacent states for each given unique state params : ndarray parameters for computation of energy Returns ------- logK : float """ obj = 0. objGrad = np.zeros((params.size)) power = np.zeros((len(Xuniq), len(adjacentStates[0]))) # energy differences for i,s in enumerate(Xuniq): power[i,:] = .5*( self.model.calc_e(s[None,:], params) - self.model.calc_e(adjacentStates[i], params) ) obj = logsumexp( power + np.log(Xcount)[:,None] -np.log(Xcount.sum()) ) if self.calc_de is None: return obj # coefficients that come out from taking derivative of exp for i in range(params.size): gradcoef = np.zeros((len(Xuniq), len(adjacentStates[0]))) for j,s in enumerate(Xuniq): gradcoef[j,:] = .5 * ( self.calc_de(s[None,:],i) - self.calc_de(adjacentStates[j],i) ) power -= power.max() objGrad[i] = ((gradcoef*np.exp(power)*Xcount[:,None]).sum() / (np.exp(power)*Xcount[:,None]).sum()) objGrad -= np.log(Xcount.sum()) if objGrad.size==1: raise Exception("") return obj, objGrad
[docs] def list_adjacent_states(self, Xuniq, all_connected): """Use self.adj to evaluate all adjacent states in Xuniq. Parameters ---------- Xuniq : ndarray all_connected : bool Returns ------- adjacentStates """ adjacentStates = [] for s in Xuniq: adjacentStates.append( self.adj(s).astype(int) ) # Remove states already in data if not all_connected: ix = np.zeros((s.size))==0 for i,t in enumerate(adjacentStates[-1]): if np.any(np.all(t[None,:]==Xuniq,1)): ix[i] = False if np.sum(ix)==X.shape[1]: raise Exception("This data set does not satisfy MPF assumption that each \ state be connected to at least one non-data state (?)") adjacentStates[-1] = adjacentStates[-1][ix] return adjacentStates
[docs] def solve(self, initial_guess=None, method='L-BFGS-B', full_output=False, all_connected=True, parameter_limits=100, solver_kwargs={'maxiter':100,'ftol':1e-15}, uselog=True): """Minimize MPF objective function using scipy.optimize.minimize. Parameters ---------- initial_guess : ndarray, None method : str, 'L-BFGS-B' Option for scipy.optimize.minimize. full_output : bool, False all_connected : bool, True Switch for summing over all states that data sets could be connected to or just summing over non-data states (second summation in Eq 10 in Sohl-Dickstein 2011). parameter_limits : float, 100 Maximum allowed magnitude of any single parameter. solver_kwargs : dict, {'maxiter':100,'ftol':1e-15} For scipy.optimize.minimize. uselog : bool, True If True, calculate log of the objective function. This can help with numerical precision errors. Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. dict (optional) Output from scipy.optimize.minimize returned if full_output is True. """ assert parameter_limits>0 # Convert from {+/-1} to {0,1} axis. X = (self.sample+1)//2 if not self.calc_de is None: includeGrad = True else: includeGrad = False if initial_guess is None: initial_guess = self.calc_observables(X).mean(0) else: initial_guess = ising_convert_params( split_concat_params(initial_guess, self.n), '01', True) # Get list of unique data states and how frequently they appear. Xuniq, ix, Xcount = np.unique(X, axis=0, return_inverse=True, return_counts=True) adjacentStates = self.list_adjacent_states(Xuniq, all_connected) # Interface to objective function. if uselog: def f(params): return self.logK( Xuniq, Xcount, adjacentStates, params ) else: def f(params): return self.K( Xuniq, Xcount, adjacentStates, params ) # If calc_de has been provided then minimize will use gradient information. soln = minimize( f, initial_guess, bounds=[(-parameter_limits,parameter_limits)]*len(initial_guess), method=method, jac=includeGrad, options=solver_kwargs ) self.multipliers = ising_convert_params( split_concat_params(soln['x'], self.n), '11', True) if full_output: return self.multipliers, soln return ising_convert_params( split_concat_params(soln['x'], self.n), '11', True)
#end MPF
[docs] class MCH(Solver): """Class for solving maxent problems using the Monte Carlo Histogram method. Broderick, T., Dudik, M., Tkacik, G., Schapire, R. E. & Bialek, W. Faster solutions of the inverse pairwise Ising problem. arXiv 1-8 (2007). """ def __init__(self, sample, model=None, calc_observables=None, sample_size=1000, sample_method='metropolis', mch_approximation=None, iprint=True, sampler_kw={}, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. sample_size : int, 1000 Number of samples to use MCH sampling step. sample_method : str, 'metropolis' Only 'metropolis' allowed currently. mch_approximation : function, None For performing the MCH approximation step. Is specific to the maxent model. iprint : str, True sampler_kw : dict, {} To pass into self.model.setup_sampler(). rng : np.random.RandomState, None Random number generator. n_cpus : int, None If 1 or less no parallel processing, other numbers above 0 specify number of cores to use. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ assert sample_size>0 self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) if sample_size < 1000 and self.iprint: warn("Small sample size will lead to poor convergence.") # Sampling parameters. self.sampleSize = sample_size self.mch_approximation = mch_approximation or define_ising_helper_functions()[-1] self.model.setup_sampler(sample_size=sample_size, sampler_kwargs=sampler_kw)
[docs] def solve(self, initial_guess=None, constraints=None, tol=None, tolNorm=None, n_iters=30, burn_in=30, maxiter=10, custom_convergence_f=None, iprint=False, full_output=False, learn_params_kwargs={'maxdlamda':1, 'eta':1}, generate_kwargs={}): """Solve for maxent model parameters using MCH routine. Parameters ---------- initial_guess : ndarray, None Initial starting point. constraints : ndarray, None For debugging! Vector of correlations to fit. tol : float, None Maximum error allowed in any observable. tolNorm : float, None Norm error allowed in found solution. n_iters : int, 30 Number of iterations to make between samples in MCMC sampling. burn_in : int, 30 Initial burn in from random sample when MC sampling. max_iter : int, 10 Max number of iterations of MC sampling and MCH approximation. custom_convergence_f : function, None Function for determining convergence criterion. At each iteration, this function should return the next set of learn_params_kwargs and optionally the sample size. As an example: def learn_settings(i): ''' Take in the iteration counter and set the maximum change allowed in any given parameter (maxdlamda) and the multiplicative factor eta, where d(parameter) = (error in observable) * eta. Additional option is to also return the sample size for that step by returning a tuple. Larger sample sizes are necessary for higher accuracy. ''' if i<10: return {'maxdlamda':1,'eta':1} else: return {'maxdlamda':.05,'eta':.05} iprint : bool, False full_output : bool, False If True, also return the errflag and error history. learn_parameters_kwargs : dict, {'maxdlamda':1,'eta':1} generate_kwargs : dict, {} Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. int Error flag. 0, converged within given criterion 1, max iterations reached ndarray Log of errors in matching constraints at each step of iteration. """ if (self.n*10)>burn_in and self.iprint: msg = ("Number of burn in MCMC iterations between samples may be too small for "+ "convergence to stationary distribution.") warn(msg) if (self.n*10)>n_iters and self.iprint: msg = ("Number of MCMC iterations between samples may be too small for convergence to "+ "stationary distribution.") warn(msg) if constraints is None: constraints = self.constraints errors = [] # history of errors to track # Set initial guess for parameters. self._multipliers is where the current guess for the # parameters is stored. if not (initial_guess is None): assert len(initial_guess)==len(constraints) self._multipliers = initial_guess.copy() else: self._multipliers = np.zeros((len(constraints))) tol = tol or 1/np.sqrt(self.model.sampleSize) tolNorm = tolNorm or np.sqrt( 1/self.model.sampleSize )*len(self._multipliers) # Redefine function for automatically adjusting learn_params_kwargs so that it returns the # MCH iterator settings and the sample size if it doesn't already. if custom_convergence_f is None: custom_convergence_f = lambda i : (learn_params_kwargs, self.model.sampleSize) if type(custom_convergence_f(0)) is dict: custom_convergence_f_ = custom_convergence_f custom_convergence_f = lambda i : (custom_convergence_f_(i), self.model.sampleSize) assert 'maxdlamda' and 'eta' in list(custom_convergence_f(0)[0].keys()) assert type(custom_convergence_f(0)[1]) is int # Generate initial set of samples. self.model.generate_sample(n_iters, burn_in, multipliers=self._multipliers, generate_kwargs=generate_kwargs) thisConstraints = self.calc_observables(self.model.sample).mean(0) errors.append(thisConstraints - constraints) if iprint=='detailed': print(self._multipliers) # MCH iterations. counter = 0 # number of MCMC and MCH steps keepLooping = True # loop control learn_params_kwargs, self.model.sampleSize = custom_convergence_f(counter) while keepLooping: # MCH step if iprint: print("Iterating parameters with MCH...") self.learn_parameters_mch(thisConstraints, constraints, **learn_params_kwargs) if iprint=='detailed': print("After MCH step, the parameters are...") print(self._multipliers) # MC sampling step if iprint: print("Sampling...") self.model.generate_sample(n_iters, burn_in, multipliers=self._multipliers, generate_kwargs=generate_kwargs) thisConstraints = self.calc_observables(self.model.sample).mean(0) counter += 1 errors.append(thisConstraints - constraints) if iprint=='detailed': print(f"Error is {np.linalg.norm(errors[-1]):.4f}.") # Exit criteria. if (np.linalg.norm(errors[-1]) < tolNorm and np.all(np.abs(thisConstraints - constraints) < tol)): if iprint: print("Solved.") errflag = 0 keepLooping=False elif counter > maxiter: if iprint: print("Over maxiter") errflag = 1 keepLooping=False else: learn_params_kwargs, self.model.sampleSize = custom_convergence_f(counter) self.multipliers = self._multipliers.copy() # generate sample given these multipliers self.model.generate_sample(n_iters, burn_in, multipliers=self._multipliers, generate_kwargs=generate_kwargs) if full_output: return self.multipliers, errflag, np.vstack(errors) return self.multipliers
[docs] def estimate_jac(self, eps=1e-3): """Approximation Jacobian using the MCH approximation. Parameters ---------- eps : float, 1e-3 Returns ------- jac : ndarray Jacobian is an n x n matrix where each row corresponds to the behavior of fvec wrt to a single parameter. """ dlamda = np.zeros(self._multipliers.shape) jac = np.zeros((self._multipliers.size,self._multipliers.size)) print("evaluating jac") for i in range(len(self._multipliers)): dlamda[i] += eps dConstraintsPlus = self.mch_approximation(self.model.sample, dlamda) dlamda[i] -= 2*eps dConstraintsMinus = self.mch_approximation(self.model.sample, dlamda) jac[i,:] = (dConstraintsPlus-dConstraintsMinus)/(2*eps) dlamda[i] += eps return jac
[docs] def learn_parameters_mch(self, estConstraints, constraints, maxdlamda=1, maxdlamdaNorm=1, maxLearningSteps=50, eta=1 ): """ Parameters ---------- estConstraints : ndarray Constraints estimated from MCH approximation. constraints : ndarray maxdlamda : float, 1 Max allowed magnitude for any element of dlamda vector before exiting. maxdlamdaNorm : float, 1 Max allowed norm of dlamda vector before exiting. maxLearningSteps : int max learning steps before ending MCH eta : float, 1 factor for changing dlamda Returns ------- ndarray MCH estimate for constraints from parameters lamda+dlamda. """ keepLearning = True dlamda = np.zeros(constraints.size) learningSteps = 0 distance = 1 while keepLearning: # Get change in parameters. # If observable is too large, then corresponding energy term has to go down # (think of double negative). dlamda += -(estConstraints - constraints) * np.min([distance,1.]) * eta #dMultipliers /= dMultipliers.max() # Predict distribution with new parameters. estConstraints = self.mch_approximation(self.model.sample, dlamda) distance = np.linalg.norm(estConstraints - constraints) # Counter. learningSteps += 1 # Evaluate exit criteria. if np.linalg.norm(dlamda) > maxdlamdaNorm or np.any(np.abs(dlamda) > maxdlamda): keepLearning = False elif learningSteps > maxLearningSteps: keepLearning = False self._multipliers += dlamda return estConstraints
#end MCH MonteCarloHistogram = MCH # alias
[docs] class SparseMCH(Solver): """Class for solving maxent problems on sparse constraints using the Monte Carlo Histogram method. See MCH class. """ def __init__(self, sample, model=None, calc_observables=None, sample_size=1000, sample_method='metropolis', mch_approximation=None, parameter_ix=None, iprint=True, sampler_kw={}, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. sample_size : int, 1000 Number of samples to use MCH sampling step. sample_method : str, 'metropolis' Only 'metropolis' allowed currently. mch_approximation : function, None For performing the MCH approximation step. Is specific to the maxent model. parameter_ix : ndarray, None Indices of Ising parameters to fit. Ones that are not specified are fixed at zero. Parameters are ordered by default as all fields (indices 0 thru n-1) and then all couplings (as unraveled upper triangular interaction symmetric matrix). iprint : str, True sampler_kw : dict, {} To pass into self.model.setup_sampler(). rng : np.random.RandomState, None Random number generator. n_cpus : int, None If 1 or less no parallel processing, other numbers above 0 specify number of cores to use. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ assert sample_size>0 self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) if sample_size<1000 and self.iprint: warn("Small sample size will lead to poor convergence.") # Sampling parameters. self.sampleSize = sample_size self.mch_approximation = mch_approximation or define_ising_helper_functions()[-1] self.model.setup_sampler(sample_size=sample_size, sampler_kwargs=sampler_kw) # set up members for sparseness constraints assert not parameter_ix is None, "Must specify parameter_ix." assert parameter_ix.dtype==np.int64, "parameter_ix must be array of indices." if np.unique(parameter_ix).size != parameter_ix.size and self.iprint: warn("parameter_ix has repeated entries.") self.parameterIx = np.unique(parameter_ix) self.set_insertion_ix()
[docs] def solve(self, initial_guess=None, constraints=None, tol=None, tolNorm=None, n_iters=30, burn_in=30, maxiter=10, custom_convergence_f=None, iprint=False, full_output=False, learn_params_kwargs={'maxdlamda':1, 'eta':1}, generate_kwargs={}): """Solve for maxent model parameters using MCH routine. Parameters ---------- initial_guess : ndarray, None Initial starting point. constraints : ndarray, None For debugging! Vector of correlations to fit. tol : float, None Maximum error allowed in any observable. tolNorm : float, None Norm error allowed in found solution. n_iters : int, 30 Number of iterations to make between samples in MCMC sampling. burn_in : int, 30 Initial burn in from random sample when MC sampling. max_iter : int, 10 Max number of iterations of MC sampling and MCH approximation. custom_convergence_f : function, None Function for determining convergence criterion. At each iteration, this function should return the next set of learn_params_kwargs and optionally the sample size. As an example: def learn_settings(i): ''' Take in the iteration counter and set the maximum change allowed in any given parameter (maxdlamda) and the multiplicative factor eta, where d(parameter) = (error in observable) * eta. Additional option is to also return the sample size for that step by returning a tuple. Larger sample sizes are necessary for higher accuracy. ''' if i<10: return {'maxdlamda':1,'eta':1} else: return {'maxdlamda':.05,'eta':.05} iprint : bool, False full_output : bool, False If True, also return the errflag and error history. learn_parameters_kwargs : dict, {'maxdlamda':1,'eta':1} generate_kwargs : dict, {} Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. int Error flag. 0, converged within given criterion 1, max iterations reached ndarray Log of errors in matching constraints at each step of iteration. """ # check given arguments if (self.n*10) > burn_in and self.iprint: warn("Number of burn in MCMC iterations between samples may be too small for "+ "convergence to stationary distribution.") if (self.n*10) > n_iters and self.iprint: warn("Number of MCMC iterations between samples may be too small for convergence to "+ "stationary distribution.") if not initial_guess is None: assert initial_guess.size==self.parameterIx.size else: initial_guess = np.zeros(self.parameterIx.size) if constraints is None: constraints = self.constraints assert not constraints is None and constraints.size==self.parameterIx.size # set up for solution errors = [] # history of errors to track # Set initial guess for parameters. self._multipliers is where the current guess # for the parameters is stored. This is the full set of parameters including all # the ones that have been set to 0. if not (initial_guess is None): assert len(initial_guess)==len(constraints) self._multipliers = self.fill_in(initial_guess) else: self._multipliers = self.fill_in(np.zeros(constraints.size)) tol = tol or 1/np.sqrt(self.model.sampleSize) tolNorm = tolNorm or np.sqrt(1/self.model.sampleSize) * len(self._multipliers) # Redefine function for automatically adjusting learn_params_kwargs so that it # returns the MCH iterator settings and the sample size if it doesn't already. if custom_convergence_f is None: custom_convergence_f = lambda i : (learn_params_kwargs, self.model.sampleSize) if type(custom_convergence_f(0)) is dict: custom_convergence_f_ = custom_convergence_f custom_convergence_f = lambda i : (custom_convergence_f_(i), self.model.sampleSize) assert 'maxdlamda' and 'eta' in list(custom_convergence_f(0)[0].keys()) assert type(custom_convergence_f(0)[1]) is int # Generate initial set of samples. self.model.generate_sample(n_iters, burn_in, multipliers=self._multipliers, generate_kwargs=generate_kwargs) thisConstraints = self.calc_observables(self.model.sample).mean(0)[self.parameterIx] errors.append(thisConstraints - constraints) if iprint=='detailed': print(self._multipliers[self.parameterIx]) # MCH iterations. counter = 0 # number of MCMC and MCH steps keepLooping = True # loop control learn_params_kwargs, self.model.sampleSize = custom_convergence_f(counter) while keepLooping: # MCH step if iprint: print("Iterating parameters with MCH...") self.learn_parameters_mch(thisConstraints, constraints, **learn_params_kwargs) if iprint=='detailed': print("After MCH step, the parameters are...") print(self._multipliers[self.parameterIx]) # MC sampling step if iprint: print("Sampling...") self.model.generate_sample( n_iters, burn_in, multipliers=self._multipliers, generate_kwargs=generate_kwargs ) thisConstraints = self.calc_observables(self.model.sample).mean(0)[self.parameterIx] counter += 1 errors.append(thisConstraints - constraints) if iprint=='detailed': print(f"Error is {np.linalg.norm(errors[-1]):.4f}.") # Exit criteria. if ( np.linalg.norm(errors[-1])<tolNorm and np.all(np.abs(thisConstraints - constraints)<tol) ): if iprint: print("Solved.") errflag = 0 keepLooping=False elif counter>maxiter: if iprint: print("Over maxiter") errflag = 1 keepLooping=False else: learn_params_kwargs, self.model.sampleSize = custom_convergence_f(counter) self.multipliers = self._multipliers.copy() if full_output: return self.multipliers[self.parameterIx], errflag, np.vstack((errors)) return self.multipliers[self.parameterIx]
[docs] def learn_parameters_mch(self, estConstraints, constraints, maxdlamda=1, maxdlamdaNorm=1, maxLearningSteps=50, eta=1): """ Parameters ---------- estConstraints : ndarray Constraints estimated from MCH approximation. constraints : ndarray maxdlamda : float, 1 Max allowed magnitude for any element of dlamda vector before exiting. maxdlamdaNorm : float, 1 Max allowed norm of dlamda vector before exiting. maxLearningSteps : int max learning steps before ending MCH eta : float, 1 factor for changing dlamda Returns ------- ndarray MCH estimate for constraints from parameters lamda+dlamda. """ keepLearning = True dlamda = np.zeros(constraints.size) learningSteps = 0 distance = 1 while keepLearning: # Get change in parameters. # If observable is too large, then corresponding energy term has to go down # (think of double negative). dlamda += -(estConstraints - constraints) * np.min([distance, 1.]) * eta #dMultipliers /= dMultipliers.max() # Predict distribution with new parameters. estConstraints = self.mch_approximation(self.model.sample.astype(np.int64), self.fill_in(dlamda))[self.parameterIx] distance = np.linalg.norm(estConstraints - constraints) # Counter. learningSteps += 1 # Evaluate exit criteria. if np.linalg.norm(dlamda) > maxdlamdaNorm or np.any(np.abs(dlamda) > maxdlamda): keepLearning = False elif learningSteps > maxLearningSteps: keepLearning = False self._multipliers += self.fill_in(dlamda) return estConstraints
#end SparseMCH
[docs] class MCHIncompleteData(MCH): """ Class for solving maxent problems using the Monte Carlo Histogram method on incomplete data where some spins may not be visible. Broderick, T., Dudik, M., Tkacik, G., Schapire, R. E. & Bialek, W. Faster solutions of the inverse pairwise Ising problem. arXiv 1-8 (2007). NOTE: This only works for Ising model. Not ready for release. """ def __init__(self, *args, **kwargs): """Not ready for release. """ warn("MCHIncompleteData is not officially released as part of ConIII.") super(MCHIncompleteData,self).__init__(*args,**kwargs) self.condSamples = []
[docs] def solve(self, X=None, constraints=None, initial_guess=None, cond_sample_size=100, cond_sample_iters=100, tol=None, tolNorm=None, n_iters=30, burn_in=30, maxiter=10, disp=False, full_output=False, learn_params_kwargs={}, generate_kwargs={}): """Solve for parameters using MCH routine. Parameters ---------- X : ndarray constraints : ndarray Constraints calculated from the incomplete data (accounting for missing data points). initial_guess : ndarray=None initial starting point cond_sample_size : int or function Number of samples to make for conditional distribution. If function is passed in, it will be passed number of missing spins and must return an int. cond_sample_iters : int or function Number of MC iterations to make between samples. tol : float=None maximum error allowed in any observable tolNorm : float norm error allowed in found solution n_iters : int=30 Number of iterations to make between samples in MCMC sampling. burn_in (int=30) disp : int=0 0, no output 1, some detail 2, most detail full_output : bool,False Return errflag and errors at each iteration if True. learn_parameters_kwargs : dict generate_kwargs : dict Returns ------- parameters : ndarray Found solution. errflag : int errors : ndarray Errors in matching constraints at each step of iteration. """ # Check args. import types assert (not X is None) and (not constraints is None), "Must provide data and constriants." self.constraints = constraints if type(cond_sample_size) is int: f_cond_sample_size = lambda n: cond_sample_size elif type(cond_sample_size) is types.FunctionType: f_cond_sample_size = cond_sample_size if type(cond_sample_iters) is int: f_cond_sample_iters = lambda n: cond_sample_iters elif type(cond_sample_iters) is types.FunctionType: f_cond_sample_iters = cond_sample_iters # Set initial guess for parameters. if not (initial_guess is None): assert len(initial_guess)==len(self.constraints) self._multipliers = initial_guess.copy() else: self._multipliers = np.zeros((len(self.constraints))) tol = tol or 1/np.sqrt(self.sampleSize) tolNorm = tolNorm or np.sqrt( 1/self.sampleSize )*len(self._multipliers) errors = [] # history of errors to track # Get unique incomplete data points. incompleteIx = (X==0).any(1) uIncompleteStates = X[incompleteIx][unique_rows(X[incompleteIx])] # Frequency of each unique state. uIncompleteStatesCount = np.bincount( unique_rows(X[incompleteIx], return_inverse=True) ) fullFraction = (len(X)-incompleteIx.sum())/len(X) if disp: print("There are %d unique states."%len(uIncompleteStatesCount)) # Sample. if disp: print("Sampling...") self.generate_sample(n_iters,burn_in, uIncompleteStates,f_cond_sample_size,f_cond_sample_iters, generate_kwargs=generate_kwargs,disp=disp) thisConstraints = self.calc_observables(self.samples).mean(0) errors.append( thisConstraints-self.constraints ) # MCH iterations. counter = 0 keepLoop = True if disp>=2: print(self._multipliers) while keepLoop: if disp: print("Iterating parameters with MCH...") self.learn_parameters_mch(thisConstraints, fullFraction, uIncompleteStates, uIncompleteStatesCount, **learn_params_kwargs) if disp>=2: print("After MCH step, the parameters are...") print(self._multipliers) # Sample. if disp: print("Sampling...") self.generate_sample(n_iters,burn_in, uIncompleteStates,f_cond_sample_size,f_cond_sample_iters, generate_kwargs=generate_kwargs,disp=disp) thisConstraints = self.calc_observables(self.samples).mean(0) counter += 1 # Exit criteria. errors.append( thisConstraints-self.constraints ) if ( np.linalg.norm(errors[-1])<tolNorm and np.all(np.abs(thisConstraints-self.constraints)<tol) ): print("Solved.") errflag=0 keepLoop=False elif counter>maxiter: print("Over maxiter") errflag=1 keepLoop=False self.multipliers = self._multipliers.copy() if full_output: return self.multipliers,errflag,np.vstack((errors)) return self.multipliers
[docs] def learn_parameters_mch(self, estConstraints, fullFraction, uIncompleteStates, uIncompleteStatesCount, maxdlamda=1, maxdlamdaNorm=1, maxLearningSteps=50, eta=1 ): """ Update parameters with MCH step. Update is proportional to the difference between the observables and the predicted observables after a small change to the parameters. This is calculated from likelihood maximization, and for the incomplete data points this corresponds to the marginal probability distribution weighted with the number of corresponding data points. Parameters ---------- estConstraints : ndarray fullFraction : float Fraction of data points that are complete. uIncompleteStates : list-like Unique incomplete states in data. uIncompleteStatesCount : list-like Frequency of each unique data point. maxdlamda : float,1 maxdlamdaNorm : float,1 maxLearningSteps : int max learning steps before ending MCH eta : float,1 factor for changing dlamda Returns ------- estimatedConstraints : ndarray """ keepLearning = True dlamda = np.zeros((self.constraints.size)) learningSteps = 0 distance = 1 # for each data point, estimate the value of the observables with MCH # take the average of the predictions # minimize the diff btwn that avg and the goal while keepLearning: # Get change in parameters. # If observable is too large, then corresponding energy term has to go down # (think of double negative). dlamda += -(estConstraints-self.constraints) * np.min([distance,1.]) * eta #dMultipliers /= dMultipliers.max() # Predict distribution with new parameters. # MCH approximation with complete data points. if fullFraction>0: estConstraints = self.mch_approximation( self.samples, dlamda ) * fullFraction else: estConstraints = np.zeros_like(dlamda) # MCH approximation with incomplete data points. These will contribute to the likelihood # by the fraction of data points they constitute. So, the total weight per data point is # p(incomplete)*p(state|incomplete) for i,s in enumerate(self.condSamples): estConstraints += ( (1-fullFraction)* (uIncompleteStatesCount[i]/uIncompleteStatesCount.sum())* self.mch_approximation(s,dlamda) ) distance = np.linalg.norm( estConstraints-self.constraints ) # Counter. learningSteps += 1 # Evaluate exit criteria. if np.linalg.norm(dlamda)>maxdlamdaNorm or np.any(np.abs(dlamda)>maxdlamda): keepLearning = False elif learningSteps>maxLearningSteps: keepLearning = False self._multipliers += dlamda return estConstraints
[docs] def generate_sample(self, n_iters, burn_in, uIncompleteStates=None, f_cond_sample_size=None, f_cond_sample_iters=None, sample_size=None, sample_method=None, initial_sample=None, run_regular_sampler=True, run_cond_sampler=True, disp=0, generate_kwargs={}): """Wrapper around generate_sample_parallel() from available samplers. Parameters ---------- n_iters : int burn_in : int I think burn in is handled automatically in REMC. uIncompleteStates : list of unique states f_cond_sample_size : lambda function Given the number of hidden spins, return the number of samples to take. f_cond_sample_iters : lambda function Given the number of hidden spins, return the number of MC iterations to make. sample_size : int sample_method : str initial_sample : ndarray generate_kwargs : dict """ from datetime import datetime # for debugging assert not (self.sampler is None), "Must call setup_sampler() first." sample_method = sample_method or self.sampleMethod sample_size = sample_size or self.sampleSize if initial_sample is None and (not self.samples is None) and len(self.samples)==self.sampleSize: initial_sample = self.samples if sample_method=='metropolis': self.sampler.theta = self._multipliers # Generate samples from full distribution. if run_regular_sampler: # Burn in. self.sampler.generate_sample_parallel( sample_size, n_iters=burn_in, initial_sample=initial_sample ) self.sampler.generate_sample_parallel( sample_size, n_iters=n_iters, initial_sample=self.sampler.samples ) self.samples = self.sampler.samples if run_cond_sampler: # Sample from conditional distribution p(s_unobserved|s_observed) where s_observed # are the spins with data for the incomplete data points. def f(args): """Function for parallelizing sampling of conditional distributions.""" i,s = args frozenSpins = list(zip(np.where(s!=0)[0],s[s!=0])) if disp: start = datetime.now() sample,E = self.sampler.generate_cond_samples(f_cond_sample_size(self.n-len(frozenSpins)), frozenSpins, burn_in=f_cond_sample_iters(self.n-len(frozenSpins)), parallel=False, **generate_kwargs) if disp: print("Done sampling %d out of %d unique states in %1.1f s."%(i+1, len(uIncompleteStates), (datetime.now()-start).total_seconds())) return sample # Parallel sampling of conditional distributions. pool = mp.Pool(self.nCpus) self.condSamples = pool.map( f,list(zip(list(range(len(uIncompleteStates))),uIncompleteStates)) ) pool.close() else: raise NotImplementedError("Unrecognized sampler.")
# End MCHIncompleteData
[docs] class Pseudo(Solver): """ Pseudolikelihood approximation to solving the inverse Ising problem as described in Aurell and Ekeberg, PRL 108, 090201 (2012). """ def __init__(self, sample, model=None, calc_observables=None, get_multipliers_r=None, calc_observables_r=None, k=2, iprint=True, **default_model_kwargs): """For this technique, must specify how to calculate the energy specific to the conditional probability of spin r given the rest of the spins. These will be passed in with "get_observables_r" and "calc_observables_r". Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. get_multipliers_r : function, None Takes index r and multipliers. Defn: lambda r,multipliers : r_multipliers calc_observables_r : function, None Takes index r and samples X. Defn: lambda r,X : r_observable k : int Number of possible states for each spin. This should only be changed for the Potts model. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) if calc_observables_r is None or get_multipliers_r is None: self.get_multipliers_r, self.calc_observables_r = define_pseudo_ising_helper_functions(self.n) else: assert sample.max()<=(k-1) self.k = k self.get_multipliers_r, self.calc_observables_r = get_multipliers_r, calc_observables_r
[docs] def solve(self, force_general=False, **kwargs): """Uses a general all-purpose optimization to solve the problem using functions defined in self.get_multipliers_r and self.calc_observables_r. Parameters ---------- force_general : bool, False If True, force use of "general" algorithm. initial_guess : ndarray, None Initial guess for the parameter values. solver_kwargs : dict, {} kwargs for scipy.minimize(). Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. """ from .models import Potts3, Ising if type(self.model) is Ising and not force_general: return self._solve_ising(**kwargs) elif type(self.model) is Potts3: return self._solve_potts(**kwargs) return self._solve_general(**kwargs)
def _solve_ising(self, initial_guess=None, full_output=False, solver_kwargs={}, scale=100): """Solve for Langrangian parameters according to pseudolikelihood algorithm. Parameters ---------- initial_guess : ndarray, None Initial guess for the parameter values. full_output : bool, False If True, return output from scipy.minimize() routine. solver_kwargs : dict, {} Keyword arguments for scipy.optimize.minimize. Returns ------- ndarray Solved multipliers. dict (optional) Output from scipy.optimize.minimize. """ if initial_guess is None: initial_guess = np.zeros(self.calc_observables(self.sample[0][None,:]).size) # reformat initial_guess for easy looping later initial_guessAsMat = replace_diag(squareform(initial_guess[self.n:]), initial_guess[:self.n]) for i in range(1,self.n): tmp = initial_guessAsMat[i,i] initial_guessAsMat[i,i] = initial_guessAsMat[i,0] initial_guessAsMat[i,0] = tmp # initialize variables that will be used later obs = [self.calc_observables_r(r, self.sample) for r in range(self.n)] soln = [] # list of output from scipy.optimize.minimize for each spin Jmat = np.zeros((self.n, self.n)) # couplings stored in matrix format with fields along diagonal # iterate through each spin and solve for parameters for each one for r in range(self.n): # to use below... multipliersrix = self.get_multipliers_r(r, initial_guess)[1] guess = initial_guess.copy() # params only change the terms relevant to the particular spin being considered def f(params): guess[multipliersrix] = params multipliers = self.get_multipliers_r(r, guess)[0] E = -obs[r].dot(multipliers) # objective value and gradient both describe (negative conditional # log-likelihood) + scale*L1 penalty; the L1 sign on the value used to be # inverted relative to the gradient, which made the optimizer converge to a # poor solution. loglikelihood = -np.log( 1+np.exp(2*E) ).sum() - np.abs(multipliers).sum() * scale dloglikelihood = (( -(1/(1+np.exp(2*E)) * np.exp(2*E))[:,None] * 2*obs[r] ).sum(0) + np.sign(multipliers) * scale) return -loglikelihood, dloglikelihood def hess(params): # Hessian of the (smooth) negative conditional log-likelihood. The L1 term # is piecewise linear and contributes zero. This is the standard logistic # form obs^T diag(4 p (1-p)) obs (Aurell & Ekeberg 2012), enabling a Newton # solve that converges in far fewer iterations. guess[multipliersrix] = params multipliers = self.get_multipliers_r(r, guess)[0] E = -obs[r].dot(multipliers) p = 1 / (1 + np.exp(-2*E)) w = 4 * p * (1-p) return (obs[r] * w[:,None]).T.dot(obs[r]) soln.append(minimize(f, initial_guessAsMat[r], jac=True, hess=hess, **{'method':'Newton-CG', **solver_kwargs})) thisMultipliers = soln[-1]['x'] Jmat[r,r] = thisMultipliers[0] Jmat[r,np.delete(np.arange(self.n),r)] = thisMultipliers[1:] # symmetrize couplings Jmat = (Jmat + Jmat.T)/2 self.multipliers = np.concatenate((Jmat.diagonal(), squareform(zero_diag(Jmat)))) if full_output: return self.multipliers, soln return self.multipliers def _solve_general(self, initial_guess=None, full_output=False, solver_kwargs={}): """Solve for Langrangian parameters according to a variation on the pseudolikelihood algorithm detailed in Aurell and Ekeberg (PRL, 2012). There, the conditional log-likelihoods per spin are minimized independently and then the resulting couplings are combined in a way that ensures that the interactions are symmetric. The generalization is straightforward for higher-order interactions (normalize by the order of the interaction), but here present a different approach that is somewhat computationally simpler. The *sum* of the conditional likelihoods over each spin is minimized, ensuring that the parameters are equal across all conditional likelihood equations by construction. In general, this gives different results from the original pseudolikelihood formulation, but they agree closely in many cases. Parameters ---------- initial_guess : ndarray, None Initial guess for the parameter values. full_output : bool, False If True, return output from scipy.minimize() routine. solver_kwargs : dict, {} Keyword arguments for scipy.optimize.minimize. Returns ------- ndarray Solved multipliers. dict (optional) Output from scipy.optimize.minimize. """ if initial_guess is None: initial_guess = np.zeros(self.calc_observables(self.sample[0][None,:]).size) obs = [self.calc_observables_r(r, self.sample) for r in range(self.n)] def f(params): # running sums of function evaluations over all spins loglikelihood = 0 dloglikelihood = np.zeros_like(initial_guess) # gradient # iterate through each spin for r in range(self.n): multipliers, multipliersrix = self.get_multipliers_r(r, params) E = -obs[r].dot(multipliers) loglikelihood += -np.log( 1+np.exp(2*E) ).sum() dloglikelihood[multipliersrix] += ( -(1/(1+np.exp(2*E)) * np.exp(2*E))[:,None] * 2*obs[r] ).sum(0) return -loglikelihood, dloglikelihood def hess(params): # Full Hessian of the summed negative conditional log-likelihood. Each spin r # contributes obs[r]^T diag(4 p (1-p)) obs[r] to the block of parameters it # involves (its field and couplings), so we scatter that block into place. H = np.zeros((params.size, params.size)) for r in range(self.n): multipliers, multipliersrix = self.get_multipliers_r(r, params) E = -obs[r].dot(multipliers) p = 1 / (1 + np.exp(-2*E)) w = 4 * p * (1-p) Hr = (obs[r] * w[:,None]).T.dot(obs[r]) ix = np.asarray(multipliersrix) H[ix[:,None], ix[None,:]] += Hr return H soln = minimize(f, initial_guess, jac=True, hess=hess, **{'method':'Newton-CG', **solver_kwargs}) self.multipliers = soln['x'] if full_output: return soln['x'], soln return soln['x'] def _solve_potts(self, initial_guess=None, full_output=False, solver_kwargs={}, cost_fcn=None, cost_fcn_jac=None): """Solve Potts model formulation with k-states and non-zero coupling if spins are in the same state. Parameters ---------- initial_guess : ndarray, None Initial guess for the parameter values. full_output : bool, False If True, return output from scipy.minimize() routine. solver_kwargs : dict, {} Keyword arguments for scipy.optimize.minimize. cost_fcn : lambda function, None Takes the given set of parameters and returns a cost that is added to the neg log likelihood. Must be specified along with the jacobian. cost_fcn_jac : lambda function, None Jacobian for above cost function. Returns ------- ndarray Solved multipliers. dict (optional) Output from scipy.optimize.minimize. """ if initial_guess is None: initial_guess = np.zeros(self.calc_observables(self.sample[0][None,:]).size) obs = [] otherobs = [] otherobsstate = [] for r in range(self.n): out = self.calc_observables_r(r, self.sample) obs.append(out[0]) otherobs.append(out[1]) otherobsstate.append(out[2]) if cost_fcn is None: cost_fcn = lambda x: 0 cost_fcn_jac = lambda x: 0 elif cost_fcn: assert cost_fcn_jac, "Must specify jacobian for cost function as well." jacErr = check_grad(cost_fcn, cost_fcn_jac, initial_guess) if jacErr>1e-4: warn("Jacobian fcn is bad. Norm error of %E."%jacErr) def f(params): # running sums of function evaluations over all spins loglikelihood = 0 dloglikelihood = np.zeros_like(initial_guess) # gradient # iterate through each spin for r in range(self.n): multipliers, multipliersrix = self.get_multipliers_r(r, params) # first, calculate the log likelihood E = -obs[r].dot(multipliers) Eother = np.vstack([-o.dot(multipliers) for o in otherobs[r]]).T Edelta = Eother - E[:,None] Edelta = np.hstack((Edelta, np.zeros((self.sample.shape[0],1)))) # add constant term loglikelihoodPerSample = -logsumexp(-Edelta, axis=1) loglikelihood += loglikelihoodPerSample.sum() # calculate log likelihood gradient (take a derivative wrt to each parameter) den = -np.exp(-loglikelihoodPerSample) # iterate over each field # note that we are taking the derivative of the loglikelihood divided by # the term in the numerator, which makes the eqn simpler to handle but # makes keeping track of negatives and zeros a pain, which is mainly what # is happening below for hix in range(self.k): num = np.zeros(self.sample.shape[0]) # iterate over each exponential term that consists of the (k-1) possible other values of # this spin for ix in range(self.k-1): sgn = np.ones(self.sample.shape[0]) currentStateAndSameField = self.sample[:,r]==hix sgn[currentStateAndSameField] = -1 notCurrentStateAndNoField = (~currentStateAndSameField) & (otherobsstate[r][:,ix]!=hix) sgn[notCurrentStateAndNoField] = 0 num += sgn * np.exp(-Edelta[:,ix]) dloglikelihood[multipliersrix[hix]] += (num/den).sum() # derivative wrt to each coupling for i,jix in enumerate(np.delete(range(self.n), r)): num = np.zeros(self.sample.shape[0]) # iterate over each exponential term for ix in range(self.k-1): sgn = np.zeros(self.sample.shape[0]) neighborjIsSameStateAsCounterfactual = otherobsstate[r][:,ix]==self.sample[:,jix] sgn[neighborjIsSameStateAsCounterfactual] = 1 sameStateAsR = self.sample[:,r]==self.sample[:,jix] sgn[sameStateAsR] = -1 num += sgn * np.exp(-Edelta[:,ix]) dloglikelihood[multipliersrix[i+self.k]] += (num/den).sum() return -loglikelihood + cost_fcn(params), -dloglikelihood + cost_fcn_jac(params) #from scipy.optimize import check_grad, approx_fprime #if check_grad(lambda x: f(x)[0], lambda x: f(x)[1], initial_guess)>1e-6: # print("num:",approx_fprime(initial_guess, lambda x: f(x)[0], 1e-7)[9:]) # print("analytic:",f(initial_guess)[1][9:]) # print(approx_fprime(initial_guess, lambda x: f(x)[0], 1e-7) - f(initial_guess)[1]) # raise Exception soln = minimize(f, initial_guess, jac=True, **solver_kwargs) self.multipliers = soln['x'] if full_output: return soln['x'], soln return soln['x']
#end Pseudo
[docs] class ClusterExpansion(Solver): """Implementation of Adaptive Cluster Expansion for solving the inverse Ising problem, as described in John Barton and Simona Cocco, J. of Stat. Mech. P03002 (2013). Specific to pairwise Ising constraints. """ def __init__(self, sample, model=None, calc_observables=None, sample_size=1000, iprint=True, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. sample_size : int, 1000 Number of MC samples. rng : np.random.RandomState, None Random number generator. n_cpus : int, None If 1 or less no parallel processing, other numbers above 0 specify number of cores to use. **default_model_kwargs Additional arguments that will be passed to Ising class. These only matter if model is None. """ self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) if sample_size < 1000 and self.iprint: warn("Sample size may be too small for convergence.") self.sampleSize = sample_size self.model.setup_sampler(sample_size=sample_size)
[docs] def S(self, cluster, coocMat, deltaJdict={}, useAnalyticResults=False, priorLmbda=0., numSamples=None): """Calculate pairwise entropy of cluster. (First fits pairwise Ising model.) Parameters ---------- cluster : list List of indices belonging to each cluster. coocMat : ndarray Pairwise correlations. deltaJdict : dict, {} useAnalyticResults : bool, False Probably want False until analytic formulas are changed to include prior on J Returns ------- entropy : float Jfull : ndarray Matrix of couplings. """ if len(cluster) == 0: raise Exception elif (len(cluster) == 1) and useAnalyticResults: p = coocMat[cluster[0],cluster[0]] J = np.array( [ [ -log( p / (1.-p) ) ] ] ) elif (len(cluster) == 2) and useAnalyticResults: i = min(cluster[0],cluster[1]) j = max(cluster[0],cluster[1]) pi = coocMat[i,i] pj = coocMat[j,j] pij = coocMat[i,j] Jii1 = -log( pi / (1.-pi) ) Jjj1 = -log( pj / (1.-pj) ) Jii = -log( (pi - pij)/(1.-pi-pj+pij) ) Jjj = -log( (pj - pij)/(1.-pi-pj+pij) ) Jij = - log( pij ) + log( pi - pij ) + log( pj - pij ) \ - log( 1.-pi-pj+pij ) J = np.array( [ [ Jii, 0.5*Jij ], [ 0.5*Jij, Jjj ] ] ) else: coocMatCluster = mean_field_ising.coocCluster(coocMat,cluster) Jinit = None # <--- potential for speed-up here J = mean_field_ising.findJmatrixAnalytic_CoocMat(coocMatCluster, Jinit=Jinit, priorLmbda=priorLmbda, numSamples=numSamples) # make 'full' version of J (of size NxN) N = len(coocMat) Jfull = mean_field_ising.JfullFromCluster(J,cluster,N) ent = mean_field_ising.analyticEntropy(J) return ent, Jfull
[docs] def Sindependent(self, cluster, coocMat): """Entropy approximation assuming that each cluster appears independently of the others. Parameters ---------- cluster : list coocMat : ndarray Pairwise correlations. Returns ------- float Sind, independent entropy. ndarray Pairwise couplings. """ # sort by cluster indices coocMatCluster = mean_field_ising.coocCluster(coocMat, cluster) # in case we're given an upper-triangular coocMat: coocMatCluster = mean_field_ising.symmetrizeUsingUpper(coocMatCluster) freqs = np.diag(coocMatCluster).copy() h = -np.log(freqs/(1.-freqs)) Jind = np.diag(h) # independent approx Sinds = -freqs*np.log(freqs) - (1.-freqs)*np.log(1.-freqs) Sind = np.sum(Sinds) # make 'full' version of J (of size NfullxNfull) Nfull = len(coocMat) Jfull = mean_field_ising.JfullFromCluster(Jind, cluster, Nfull) return Sind, Jfull
# "Algorithm 1"
[docs] def deltaS(self, cluster, coocMat, deltaSdict=None, deltaJdict=None, iprint=True, meanFieldRef=False, priorLmbda=0., numSamples=None, independentRef=False, meanFieldPriorLmbda=None): """ Parameters ---------- cluster : list List of indices in cluster coocMat : ndarray deltaSdict : dict, None deltaJdict : dict, None iprint : bool, True meanFieldRef : bool, False numSamples : int, None independentRef : bool, False If True, expand about independent entropy meanFieldRef : bool, False If True, expand about mean field entropy Returns ------- float deltaScluster float deltaJcluster """ if deltaSdict is None: deltaSdict = {} if deltaJdict is None: deltaJdict = {} if (independentRef and meanFieldRef) or \ not (independentRef or meanFieldRef): raise Exception if meanFieldPriorLmbda is None: meanFieldPriorLmbda = priorLmbda cID = self.clusterID(cluster) if cID in deltaSdict: #print "deltaS: found answer for",cluster return deltaSdict[cID],deltaJdict[cID] elif iprint: print("deltaS: Calculating entropy for cluster",cluster) # start with full entropy (and J) deltaScluster, deltaJcluster = self.S(cluster,coocMat, deltaJdict, priorLmbda=priorLmbda, numSamples=numSamples) if independentRef: # subtract independent reference entropy S0cluster,J0cluster = self.Sindependent(cluster,coocMat) deltaScluster -= S0cluster deltaJcluster -= J0cluster elif meanFieldRef: # subtract mean field reference entropy S0cluster,J0cluster = SmeanField(cluster,coocMat, meanFieldPriorLmbda,numSamples) deltaScluster -= S0cluster deltaJcluster -= J0cluster # subtract entropies of sub-clusters for size in range(len(cluster)-1,0,-1): subclusters = self.subsets(cluster,size) for subcluster in subclusters: deltaSsubcluster,deltaJsubcluster = \ self.deltaS(subcluster,coocMat,deltaSdict,deltaJdict, iprint=iprint, meanFieldRef=meanFieldRef,priorLmbda=priorLmbda, numSamples=numSamples, independentRef=independentRef, meanFieldPriorLmbda=meanFieldPriorLmbda) deltaScluster -= deltaSsubcluster deltaJcluster -= deltaJsubcluster deltaSdict[cID] = deltaScluster deltaJdict[cID] = deltaJcluster return deltaScluster, deltaJcluster
[docs] def clusterID(self, cluster): return tuple(np.sort(cluster))
[docs] def subsets(self, thisSet, size, sort=False): """Given a list, returns a list of all unique subsets of that list with given size. Parameters ---------- thisSet : list size : int sort : bool, False Returns ------- list All subsets of given size. """ if len(thisSet) != len(np.unique(thisSet)): raise Exception if size == len(thisSet): return [thisSet] if size > len(thisSet): return [] if size <= 0: return [] if size == 1: return [ [s,] for s in thisSet ] sub = [] rest = copy.copy(thisSet) s = rest[0] rest.remove(s) subrest1 = self.subsets(rest,size) sub.extend(subrest1) subrest2 = self.subsets(rest,size-1) [ srest.append(s) for srest in subrest2 ] sub.extend(subrest2) if sort: return np.sort(sub) return sub
# "Algorithm 2" # was "adaptiveClusterExpansion"
[docs] def solve(self, threshold, cluster=None, deltaSdict=None, deltaJdict=None, iprint=True, priorLmbda=0., numSamples=None, meanFieldRef=False, independentRef=True, veryVerbose=False, meanFieldPriorLmbda=None, full_output=False): """ Parameters ---------- threshold : float meanFieldRef : bool, False Expand about mean-field reference. independentRef : bool, True Expand about independent reference. priorLmbda : float, 0. Strength of non-interacting prior. meanFieldPriorLmbda : float, None Strength of non-interacting prior in mean field calculation (defaults to priorLmbda). Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. float (optional, only if full_output=True) Estimated entropy. ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. list (optional, only if full_output=True) List of clusters. dict (optional, only if full_output=True) deltaSdict dict (optional, only if full_output=True) deltaJdict """ # convert input to coocMat coocMat = mean_field_ising.cooccurrence_matrix((self.sample+1)/2) if deltaSdict is None: deltaSdict = {} if deltaJdict is None: deltaJdict = {} if independentRef and meanFieldRef: raise Exception if meanFieldPriorLmbda is None: meanFieldPriorLmbda = priorLmbda N = len(coocMat) T = threshold if cluster is None: cluster = list(range(N)) clusters = {} size = 1 clusters[1] = [ [i] for i in cluster ] while len(clusters[size]) > 0: clusters[ size+1 ] = [] numClusters = len(clusters[size]) if iprint: print("adaptiveClusterExpansion: Clusters of size", size+1) for i in range(numClusters): for j in range(i+1,numClusters): # some are not unique! gamma1 = clusters[size][i] gamma2 = clusters[size][j] gammaI = np.intersect1d(gamma1,gamma2) gammaU = np.sort( np.union1d(gamma1,gamma2) ) gammaU = list(gammaU) if (len(gammaI) == size-1): deltaSgammaU, deltaJgammaU = self.deltaS(gammaU, coocMat, deltaSdict, deltaJdict, iprint=veryVerbose, meanFieldRef=meanFieldRef, priorLmbda=priorLmbda, numSamples=numSamples, independentRef=independentRef, meanFieldPriorLmbda=meanFieldPriorLmbda) if (abs(deltaSgammaU) > T) and (gammaU not in clusters[size+1]): clusters[ size+1 ].append(gammaU) size += 1 if independentRef: ent, J0 = self.Sindependent(cluster, coocMat) elif meanFieldRef: ent, J0 = SmeanField(cluster, coocMat, meanFieldPriorLmbda, numSamples) else: ent = 0. J0 = np.zeros((N,N)) J = J0.copy() for size in list(clusters.keys()): for cluster in clusters[size]: cID = self.clusterID(cluster) ent += deltaSdict[cID] J += deltaJdict[cID] # convert J to {-1,1} basis h = -J.diagonal() J = -zero_diag(J) self.multipliers = convert_params( h, squareform(J)*2, '11', concat=True ) if full_output: return self.multipliers, ent, clusters, deltaSdict, deltaJdict else: return self.multipliers
# end ClusterExpansion
[docs] class RegularizedMeanField(Solver): """Implementation of regularized mean field method for solving the inverse Ising problem, as described in Daniels, Bryan C., David C. Krakauer, and Jessica C. Flack. ``Control of Finite Critical Behaviour in a Small-Scale Social System.'' Nature Communications 8 (2017): 14301. doi:10.1038/ncomms14301 Specific to pairwise Ising constraints. """ def __init__(self, sample, model=None, calc_observables=None, sample_size=1_000, iprint=False, **default_model_kwargs): """ Parameters ---------- sample : ndarray or int, None If ndarray, of dimensions (samples, dimension). If int, specifies system size. If None, many of the default class members cannot be set and then must be set manually. model : class like one from models.py, None By default, will be set to solve Ising model. calc_observables : function, None For calculating observables from a set of samples. sample : ndarray model : class from models.py, None calc_observables : function, None sample_size : int, 1_000 iprint : str, False rng : np.random.RandomState, None Random number generator. n_cpus : int, None If 1 or less no parallel processing, other numbers above 0 specify number of cores to use. **default_model_kwargs : kwargs for default model Additional arguments that will be passed to Ising class. These only matter if model is None. """ assert sample_size > 0 self.basic_setup(sample, model, calc_observables, iprint, model_kwargs=default_model_kwargs) if sample_size < 1000 and self.iprint: warn("Small sample size will lead to poor convergence.") self.sampleSize = sample_size self.iprint = iprint self.model.setup_sampler(sample_size=sample_size)
[docs] def solve(self, n_grid_points=200, min_size=0, reset_rng=True, min_covariance=False, min_independent=True, cooc_cov=None, priorLmbda=0., bracket=None): """Varies the strength of regularization on the mean field J to best fit given cooccurrence data. Parameters ---------- n_grid_points : int, 200 If bracket is given, first test at n_grid_points points evenly spaced in the bracket interval, then give the lowest three points to scipy.optimize.minimize_scalar min_size : int, 0 Use a modified model in which samples with fewer ones than min_size are not allowed. reset_rng: bool, True Reset random number generator seed before sampling to ensure that objective function does not depend on generator state. min_covariance : bool, False ** As of v1.0.3, not currently supported ** Minimize covariance from emperical frequencies (see notes); trying to avoid biases, as inspired by footnote 12 in TkaSchBer06 min_independent : bool, True ** As of v1.0.3, min_independent is the only mode currently supported ** Each <xi> and <xi xj> residual is treated as independent cooc_cov : ndarray, None ** As of v1.0.3, not currently supported ** Provide a covariance matrix for residuals. Should typically be coocSampleCovariance(samples). Only used if min_covariance and min_independent are False. priorLmbda : float,0. ** As of v1.0.3, not currently implemented ** Strength of noninteracting prior. Returns ------- ndarray Solved multipliers (parameters). For Ising problem, these can be converted into matrix format using utils.vec2mat. """ if reset_rng: # return same rng in initial state every time rseed = self.model.rng.randint(2**32-1) get_rng = lambda rseed=rseed: np.random.RandomState(rseed) else: get_rng = lambda: self.model.rng numDataSamples = len(self.sample) # convert data samples to coocMat coocMatData = mean_field_ising.cooccurrence_matrix((self.sample+1)/2) if cooc_cov is None: cooc_cov = mean_field_ising.coocSampleCovariance(self.sample) if priorLmbda != 0.: raise NotImplementedError("priorLmbda is not currently supported") lmbda = priorLmbda / numDataSamples # stuff defining the error model, taken from findJmatrixBruteForce_CoocMat # 3.1.2012 I'm pretty sure the "repeated" line below should have the transpose, but # coocJacobianDiagonal is not sensitive to this. If you use non-diagonal jacobians in the # future and get bad behavior you may want to double-check this. if min_independent: coocStdevs = mean_field_ising.coocStdevsFlat(coocMatData,numDataSamples) coocStdevsRepeated = ( coocStdevs*np.ones((len(coocStdevs),len(coocStdevs))) ).T elif min_covariance: raise Exception("min_covariance is not currently supported") empiricalFreqs = np.diag(coocMatData) covTildeMean = covarianceTildeMatBayesianMean(coocMatData,numDataSamples) covTildeStdevs = covarianceTildeStdevsFlat(coocMatData,numDataSamples, empiricalFreqs) covTildeStdevsRepeated = ( covTildeStdevs*np.ones((len(covTildeStdevs),len(covTildeStdevs))) ).T else: raise NotImplementedError("correlated residuals calculation is not currently supported") if cooc_cov is None: raise Exception cov = cooc_cov # / numDataSamples (can't do this here due to numerical issues) # instead include numDataSamples in the calculation of coocMatMeanZSq # for use in gammaPrime <-> priorLmbda freqsList = np.diag(coocMatData) pmean = np.mean(freqsList) # Generate samples from model (need to translate parameters) def samples(J): self.model.set_rng(get_rng()) if min_covariance: J = tildeJ2normalJ(J, empiricalFreqs) burninDefault = 100*self.n J = J + J.T self.model.set_multipliers(np.concatenate([J.diagonal(), squareform(zero_diag(-J))])) self.model.generate_sample(burninDefault, 1) return self.model.sample # adapted from findJMatrixBruteForce_CoocMat def func(meanFieldGammaPrime): # translate gammaPrime prior strength to lambda prior strength meanFieldPriorLmbda = meanFieldGammaPrime / (pmean**2 * (1.-pmean)**2) # calculate regularized mean field J J = mean_field_ising.JmeanField(coocMatData, meanFieldPriorLmbda=meanFieldPriorLmbda, numSamples=numDataSamples) # sample from J isingSamples = samples(J) # calculate residuals, including prior if necessary if min_independent: # Default dc = mean_field_ising.isingDeltaCooc(isingSamples, coocMatData)/coocStdevs elif min_covariance: dc = isingDeltaCovTilde(isingSamples, covTildeMean, empiricalFreqs)/covTildeStdevs else: dc = mean_field_ising.isingDeltaCooc(isingSamples, coocMatMean) if priorLmbda != 0.: freqs = np.diag(coocMatData) factor = np.outer(freqs*(1.-freqs),freqs*(1.-freqs)) factorFlat = aboveDiagFlat(factor) priorTerm = lmbda * factorFlat * flatJ[ell:]**2 dc = np.concatenate([dc,priorTerm]) if self.iprint: print("RegularizedMeanField.solve: Tried "+str(meanFieldGammaPrime)) print("RegularizedMeanField.solve: sum(dc**2) = "+str(np.sum(dc**2))) return np.sum(dc**2) # The regularization strength is non-negative, and for (nearly) degenerate data # (correlations at +/-1) the mean-field J can blow up to nan/inf for small # gamma, which previously crashed scipy's bracketing routine (issue #16). Wrap # the objective so non-finite values / exceptions become a large finite penalty, # steering the optimizer toward gammas that yield a finite solution. def safe_func(gammaPrime): try: val = func(gammaPrime) except Exception: return 1e30 return val if np.isfinite(val) else 1e30 if bracket is not None: gridPoints = np.linspace(bracket[0], bracket[1], n_grid_points) gridResults = [ safe_func(p) for p in gridPoints ] gridBracket = self.bracket1d(gridPoints, gridResults) solution = minimize_scalar(safe_func, bracket=gridBracket) else: # bounded search avoids the bracketing step that fails on non-finite values solution = minimize_scalar(safe_func, method='bounded', bounds=(0, 1)) gammaPrimeMin = solution['x'] meanFieldPriorLmbdaMin = gammaPrimeMin / (pmean**2 * (1.-pmean)**2) J = mean_field_ising.JmeanField(coocMatData, meanFieldPriorLmbda=meanFieldPriorLmbdaMin, numSamples=numDataSamples) if not np.isfinite(J).all(): raise RuntimeError( "RegularizedMeanField could not find a finite solution; the data may be " "(nearly) degenerate (pairwise correlations at +/-1). Consider removing " "perfectly correlated/anti-correlated spins or regularizing the data.") J = J + J.T # convert J to {-1,1} basis h = -J.diagonal() J = -zero_diag(J) self.multipliers = convert_params( h, squareform(J)*2, '11', concat=True ) return self.multipliers
[docs] def bracket1d(self, xList, funcList): """Assumes xList is monotonically increasing Get bracketed interval (a,b,c) with a < b < c, and f(b) < f(a) and f(c). (Choose b and c to make f(b) and f(c) as small as possible.) If minimum is at one end, raise error. """ gridMinIndex = np.argmin(funcList) gridMin = xList[gridMinIndex] if (gridMinIndex == 0) or (gridMinIndex == len(xList)-1): raise Exception("Minimum at boundary") gridBracket1 = xList[ np.argmin(funcList[:gridMinIndex]) ] gridBracket2 = xList[ gridMinIndex + 1 + np.argmin(funcList[gridMinIndex+1:]) ] gridBracket = (gridBracket1,gridMin,gridBracket2) return gridBracket
#end RegularizedMeanField