"""Parameter conversion and maxent helper-function factories.
Internal module split out of the original coniii.utils. Public names
are re-exported by :mod:`coniii.utils`.
"""
import numpy as np
from numba import jit, njit
from itertools import combinations
from scipy.special import logsumexp, binom
from scipy.spatial.distance import squareform
from ._indexing import sub_to_ind, bin_states, unravel_index, multinomial
from ._correlations import pair_corr, calc_overlap
[docs]
def convert_params(h, J, convert_to, concat=False):
"""Convert Ising model fields and couplings from {0,1} basis to {-1,1} and vice versa.
Parameters
----------
h : ndarray
Fields.
J : ndarray
Couplings.
convert_to : str
Either '01' or '11'.
concat : bool, False
If True, return a vector concatenating fields and couplings.
Returns
-------
ndarray
Mean bias h vector. Concatenated vector of h and J if concat is True.
ndarray, optional
Vector of J.
"""
if len(J.shape)!=2:
Jmat = squareform(J)
else:
Jmat = J
J = squareform(J)
if convert_to=='11':
# Convert from 0,1 to -/+1
Jp = J/4.
hp = h/2 + np.sum(Jmat,1)/4.
elif convert_to=='01':
# Convert from -/+1 to 0,1
hp = 2.*(h - np.sum(Jmat,1))
Jp = J*4.
else:
raise Exception("Invalid choice for convert_to. Must be '01' or '11'.")
if concat:
return np.concatenate((hp, Jp))
return hp, Jp
[docs]
def ising_convert_params(oparams, convert_to, concat=False):
"""General conversion of parameters from 01 to 11 basis.
Take set of Ising model parameters up to nth order interactions in either {0,1} or
{-1,1} basis and convert to other basis.
Parameters
----------
oparams : tuple of lists
Tuple of lists of interactions between spins starting with the lowest order
interactions. Each list should consist of all interactions of that order such that
the length of each list should be binomial(n,i) for all i starting with i>=1.
convert_to : str
concat : bool,False
Returns
-------
params : tuple of lists or list
New parameters in order of lowest to highest order interactions to mean biases.
Can all be concatenated together if concat switch is True.
"""
oparams = oparams[::-1]
n = len(oparams[-1])
params = [np.zeros(int(binom(n,i))) for i in range(len(oparams),0,-1)]
if convert_to=='01':
# basically need to expand polynomials to all lower order terms
# start at the highest order terms
for counter,order in enumerate(range(len(oparams),0,-1)):
# iterate through all combinations of indices of order
for ijkcounter,ijk in enumerate(combinations(range(n), order)):
ijkcoeff = oparams[counter][ijkcounter]
# same order term is only multiplied by powers of two
params[counter][ijkcounter] += 2**order * ijkcoeff
# break this term down to lower order terms in the new basis
for subcounter,suborder in enumerate(range(order-1,0,-1)):
for subijk in combinations(ijk, suborder):
ix = unravel_index(subijk, n)
params[subcounter+counter+1][ix] += ijkcoeff * 2**suborder * (-1)**(order-suborder)
elif convert_to=='11':
# basically need to expand polynomials to all lower order terms
# start at the highest order terms
for counter,order in enumerate(range(len(oparams),0,-1)):
# iterate through all combinations of indices of order
for ijkcounter,ijk in enumerate(combinations(range(n), order)):
ijkcoeff = oparams[counter][ijkcounter]
# same order term is only multiplied by powers of two
params[counter][ijkcounter] += 2**-order * ijkcoeff
# break this term down to lower order terms in the new basis
for subcounter,suborder in enumerate(range(order-1,0,-1)):
for subijk in combinations(ijk, suborder):
ix = unravel_index(subijk, n)
params[subcounter+counter+1][ix] += ijkcoeff * 2**-order
else:
raise Exception("Invalid choice for convert_to. Must be either '01' or '11'.")
if concat:
return np.concatenate(params[::-1])
return params[::-1]
def _expand_binomial(a, b, n=2):
"""Expand a product of binomials that have the same coefficients given by a and b.
E.g. (a*x0 + b) * (a*x1 + b) * ... * (a*xn + b)
Parameters
----------
a : float
b : float
n : int, 2
"""
coeffs=[]
for i in range(n+1):
coeffs.extend( [a**(n-i)*b**i]*int(binom(n,i)) )
return coeffs
[docs]
def split_concat_params(p, n):
"""Split parameters for Ising model that have all been concatenated together into a
single list into separate lists. Assumes that the parameters are increasing in order
of interaction and that all parameters are present.
Parameters
----------
p : list-like
Returns
-------
list of list-like
Parameters increasing in order: (h, Jij, Kijk, ... ).
"""
splitp = []
counter = 0
i = 1
while counter<len(p):
splitp.append( p[counter:counter+int(binom(n,i))] )
counter += int(binom(n,i))
i += 1
return splitp
[docs]
def define_pseudo_ising_helper_functions(N):
"""Define helper functions for using Pseudo method on Ising model.
Parameters
----------
N : int
System size.
Returns
-------
function
get_multipliers_r
function
calc_observables_r
"""
@njit
def get_multipliers_r(r, multipliers, N=N):
"""Return r's field and all couplings to spin r.
Parameters
----------
r : int
multipliers : ndarray
All fields and couplings concatenated together.
Returns
-------
ndarray
Relevant multipliers.
list
Index of where multipliers appear in full multipliers array.
"""
ix = [r]
multipliersr = np.zeros(N)
multipliersr[0] = multipliers[r] # first entry is the biasing field
# fill in the couplings
ixcounter = 1
for i in range(N):
if i!=r:
if i<r:
ix.append( sub_to_ind(N, i, r) + N )
multipliersr[ixcounter] = multipliers[ix[ixcounter]]
else:
ix.append( sub_to_ind(N, r, i) + N )
multipliersr[ixcounter] = multipliers[ix[ixcounter]]
ixcounter += 1
return multipliersr, ix
@njit
def calc_observables_r(r, X, N=N):
"""Return the observables relevant for calculating the conditional probability of
spin r.
Parameters
----------
r : int
Spin index.
X : ndarray
Data samples of dimensions (n_samples, n_dim).
Returns
-------
ndarray
observables
"""
obs = np.zeros((X.shape[0],N))
for rowix in range(X.shape[0]):
ixcounter = 1
obs[rowix,0] = X[rowix,r]
for i in range(N-1):
for j in range(i+1,N):
if i==r or j==r:
obs[rowix,ixcounter] = X[rowix,i]*X[rowix,j]
ixcounter += 1
return obs
return get_multipliers_r, calc_observables_r
[docs]
def define_pseudo_potts_helper_functions(n, k):
"""Define helper functions for using Pseudo method on Potts model with simple form for
couplings that are only nonzero when the spins are occupying the same state.
Parameters
----------
n : int
System size.
k : int
Number of possible configurations in Potts model.
Returns
-------
function
get_multipliers_r
function
calc_observables_r
"""
assert n>1
assert k>1
@njit
def get_multipliers_r(r, multipliers, n=n, k=k):
"""Return r's field and all couplings to spin r.
Parameters
----------
r : int
multipliers : ndarray
All fields and couplings concatenated together.
Returns
-------
ndarray
Relevant multipliers.
list
Index of where multipliers appear in full multipliers array.
"""
ix = [r+n*i for i in range(k)]
multipliersr = np.zeros(k-1+n)
for i in range(k):
multipliersr[i] = multipliers[r+n*i]
# fill in the couplings
ixcounter = k
for i in range(n):
if i!=r:
if i<r:
ix.append( sub_to_ind(n, i, r) + k*n )
multipliersr[ixcounter] = multipliers[ix[ixcounter]]
else:
ix.append( sub_to_ind(n, r, i) + k*n )
multipliersr[ixcounter] = multipliers[ix[ixcounter]]
ixcounter += 1
return multipliersr, ix
@njit
def calc_observables_r(r, X, n=n, k=k):
"""Return the observables relevant for calculating the conditional probability of
spin r.
Parameters
----------
r : int
Spin index.
X : ndarray
Data samples of dimensions (n_samples, n_dim).
Returns
-------
ndarray
observables
list of ndarray
observables if spin r were to occupy all other possible states
ndarray
Each col details the occupied by spin r in each array of the previous return
value, i.e., the first col of this array tells me what r has been changed to
in the first array in the above list.
"""
obs = np.zeros((X.shape[0],k-1+n), dtype=np.int8)
# keep another copy of observables where the spin iterates thru all other possible states
otherobs = [np.zeros((X.shape[0],k-1+n), dtype=np.int8)
for i in range(k-1)]
# note the hypothetical states occupied by spin r. this makes it easier to keep track of things later
otherstates = np.zeros((X.shape[0],k-1), dtype=np.int8)
# for each data sample in X
for rowix in range(X.shape[0]):
counter = 0
# record state of spin r in obs and hypothetical scenarios when it is another state in otherobs
for i in range(k):
if X[rowix,r]==i:
obs[rowix,i] = 1
else:
otherobs[counter][rowix,i] = 1
otherstates[rowix,counter] = i
counter += 1
ixcounter = k
for i in range(n-1):
for j in range(i+1,n):
if i==r:
obs[rowix,ixcounter] = X[rowix,i]==X[rowix,j]
kcounter = 0
for state in range(k):
if state!=X[rowix,r]:
otherobs[kcounter][rowix,ixcounter] = X[rowix,j]==state
kcounter += 1
ixcounter += 1
elif j==r:
obs[rowix,ixcounter] = X[rowix,i]==X[rowix,j]
kcounter = 0
for state in range(k):
if state!=X[rowix,r]:
otherobs[kcounter][rowix,ixcounter] = X[rowix,i]==state
kcounter += 1
ixcounter += 1
return obs, otherobs, otherstates
return get_multipliers_r, calc_observables_r
[docs]
def define_ising_helper_functions():
"""Functions for plugging into solvers for +/-1 Ising model with fields h_i and
couplings J_ij.
Returns
-------
function
calc_e
function
calc_observables
function
mch_approximation
"""
@njit(cache=True)
def fast_sum(J, s):
"""Helper function for calculating energy in calc_e(). Iterates couplings J."""
e = np.zeros(s.shape[0])
for n in range(s.shape[0]):
k = 0
for i in range(s.shape[1]-1):
for j in range(i+1,s.shape[1]):
e[n] += J[k]*s[n,i]*s[n,j]
k += 1
return e
@njit
def calc_e(s, params):
"""
Parameters
----------
s : 2D ndarray of ints
state either {0,1} or {+/-1}; any integer width
params : ndarray
(h, J) vector
Returns
-------
E : ndarray
Energies of all given states.
"""
e = -fast_sum(params[s.shape[1]:],s)
e -= np.sum(s*params[:s.shape[1]],1)
return e
def mch_approximation(samples, dlamda):
"""Function for making MCH approximation step for Ising model."""
dE = calc_e(samples, dlamda)
ZFraction = len(dE) / np.exp(logsumexp(-dE))
predsisj = pair_corr(samples, weights=np.exp(-dE)/len(dE), concat=True) * ZFraction
assert not (np.any(predsisj < -1.00000001) or
np.any(predsisj>1.000000001)),"Predicted values are beyond limits, (%1.6f,%1.6f)"%(predsisj.min(),
predsisj.max())
return predsisj
@njit(cache=True)
def calc_observables(samples):
"""Observables for Ising model."""
n = samples.shape[1]
obs = np.zeros((samples.shape[0], n+n*(n-1)//2))
k = 0
for i in range(n):
obs[:,i] = samples[:,i]
for j in range(i+1,n):
obs[:,n+k] = samples[:,i] * samples[:,j]
k += 1
return obs
return calc_e, calc_observables, mch_approximation
[docs]
def define_ising_helper_functions_sym():
"""Functions for plugging into solvers for +/-1 Ising model with couplings J_ij and no
fields.
Returns
-------
function
calc_e
function
calc_observables
function
mch_approximation
"""
@njit("float64[:](int64[:],float64[:,:])", cache=True)
def fast_sum(J,s):
"""Helper function for calculating energy in calc_e(). Iterates couplings J."""
e = np.zeros((s.shape[0]))
for n in range(s.shape[0]):
k = 0
for i in range(s.shape[1]-1):
for j in range(i+1,s.shape[1]):
e[n] += J[k]*s[n,i]*s[n,j]
k += 1
return e
@njit
def calc_e(s, params):
"""
Parameters
----------
s : 2D ndarray
state either {0,1} or {+/-1}; any integer width
params : ndarray
(h,J) vector
Returns
-------
E : ndarray
"""
return -fast_sum(params,s)
def mch_approximation( samples, dlamda ):
"""Function for making MCH approximation step for symmetrized Ising model."""
dE = calc_e(samples,dlamda)
dE -= dE.min()
ZFraction = 1. / np.mean(np.exp(-dE))
predsisj = pair_corr( samples, weights=np.exp(-dE)/len(dE) )[1] * ZFraction
assert not (np.any(predsisj<-1.00000001) or
np.any(predsisj>1.000000001)),"Predicted values are beyond limits, (%1.6f,%1.6f)"%(predsisj.min(),
predsisj.max())
return predsisj
@njit
def calc_observables(samples):
"""Observables for symmetrized Ising model."""
n = samples.shape[1]
obs = np.zeros((samples.shape[0],n*(n-1)//2))
k = 0
for i in range(n):
for j in range(i+1,n):
obs[:,k] = samples[:,i]*samples[:,j]
k += 1
return obs
return calc_e, calc_observables, mch_approximation
[docs]
def define_triplet_helper_functions():
@njit
def calc_observables(X):
"""Triplet order model consists of constraining all the correlations up to third order."""
n = X.shape[1]
Y = np.zeros((len(X), n+n*(n-1)//2+n*(n-1)*(n-2)//6))
# average orientation (magnetization)
counter = 0
for i in range(n):
Y[:,counter] = X[:,i]
counter += 1
# pairwise correlations
for i in range(n-1):
for j in range(i+1, n):
Y[:,counter] = X[:,i]*X[:,j]
counter += 1
# triplet correlations
for i in range(n-2):
for j in range(i+1, n-1):
for k in range(j+1, n):
Y[:,counter] = X[:,i]*X[:,j]*X[:,k]
counter += 1
return Y
def calc_e(X, multipliers):
return -calc_observables(X).dot(multipliers)
return calc_e, calc_observables
[docs]
def define_ternary_helper_functions():
@njit
def calc_observables(X):
"""Triplet order model consists of constraining all the correlations up to third
order.
"""
n = X.shape[1]
Y = np.zeros((len(X), n*3+n*(n-1)//2))
# average orientation (magnetization)
counter = 0
for i in range(3*n):
Y[:,counter] = X[:,i]
counter += 1
# pairwise correlations
for i in range(n-1):
for j in range(i+1, n):
Y[:,counter] = X[:,i]*X[:,j]
counter += 1
return Y
def calc_e(X, multipliers):
return -calc_observables(X).dot(multipliers)
return calc_e, calc_observables
[docs]
def define_potts_helper_functions(k):
"""Helper functions for calculating quantities in k-state Potts model.
Parameters
----------
k : int
Number of possible states.
Returns
-------
function
calc_e
function
calc_observables
function
mch_approximation
"""
@njit
def calc_observables(X, k=k):
"""
Parameters
----------
X : ndarray of dtype np.int64
Dimensions (n_samples, n_spins).
Returns
-------
ndarray
Dimensions (n_samples, n_observables).
"""
n = X.shape[1]
Y = np.zeros((len(X), n*k+n*(n-1)//2), dtype=np.int8)
# average orientation (magnetization)
# note that fields for the third state are often set to 0
counter = 0
for i in range(k):
for j in range(n):
Y[:,counter] = X[:,j]==i
counter += 1
# pairwise correlations
for i in range(n-1):
for j in range(i+1, n):
Y[:,counter] = X[:,i]==X[:,j]
counter += 1
return Y
def calc_e(X, multipliers, k=k, calc_observables=calc_observables):
"""
Parameters
----------
X : ndarray of dtype np.int64
Dimensions (n_samples, n_spins).
multipliers : ndarray of dtype np.float64
Returns
-------
ndarray
Energies of each observable.
"""
return -calc_observables(X, k).dot(multipliers)
def mch_approximation(sample, dlamda, calc_e=calc_e):
"""Function for making MCH approximation step for Potts model.
Parameters
----------
sample : ndarray
Of dimensions (n_sample, n_spins).
dlamda : ndarray
Change in parameters.
Returns
-------
ndarray
Predicted correlations.
"""
dE = calc_e(sample, dlamda)
ZFraction = len(dE) / np.exp(logsumexp(-dE))
predsisj = (np.exp(-dE[:,None]) / len(dE) * calc_observables(sample)).sum(0) * ZFraction
assert not ((predsisj<0).any() or
(predsisj>(1+1e-10)).any()),"Predicted values are beyond limits, (%E,%E)"%(predsisj.min(),
predsisj.max())
return predsisj
return calc_e, calc_observables, mch_approximation