"""Experimental / work-in-progress samplers.
This module collects three samplers whose implementations have not
been validated end-to-end. Constructing any of them currently raises
:class:`NotImplementedError`:
- :class:`SWIsing` — Swendsen-Wang cluster sampler for the Ising
model (+/-1 basis).
- :class:`HamiltonianMC` — Hamiltonian Monte Carlo sampler whose
default leapfrog helper is hardcoded for Heisenberg energies.
- :class:`Heisenberg3DSampler` — Heisenberg-3D MCMC sampler with
spherical-vector moves.
Module-level helpers (``calc_e``, ``grad_e``, ``grad_e_theta``,
``cross``, ``cross_``, ``jit_sample``, ``jit_sample_nearby_vector``,
``pairwise_prod``, ``sample_bonds``, ``iter_cluster``,
``spec_cluster``, ``check_e_logp``) are kept here because they are
only consumed by the experimental classes above.
"""
import numpy as np
import multiprocess as mp
from numba import jit, njit
from numpy import sin, cos
from scipy.spatial.distance import squareform
from scipy.optimize import minimize, fmin
from ..samplers import Sampler
from ..utils import sub_to_ind, unique_rows
# --------------------------------------------------------------------------- #
# Heisenberg-model energy and gradient (used by HamiltonianMC.jit_sample and
# referenced by Heisenberg3DSampler).
# --------------------------------------------------------------------------- #
[docs]
@njit
def calc_e(theta, x):
"""Heisenberg model energy.
Parameters
----------
theta : ndarray
List of couplings Jij.
x : ndarray
List of angles (theta_0, phi_0, theta_1, phi_1, ...).
"""
n = len(x)//2 # number of spins
E = 0.
k = 0
for i in range(0, 2*(n-1), 2):
for j in range(i+2, 2*n, 2):
E += theta[k] * (sin(x[i])*sin(x[j]) *
(cos(x[i+1])*cos(x[j+1]) + sin(x[i+1])*sin(x[j+1])) +
cos(x[i])*cos(x[j]))
k += 1
return -E
[docs]
@njit
def grad_e(theta, x):
"""Derivatives wrt the angles of the spins."""
n = len(x)//2
g = np.zeros((len(x)))
for i in range(0, 2*n, 2):
for j in range(0, 2*n, 2):
if i != j:
k = sub_to_ind(n, i//2, j//2)
# Derivative wrt angle theta_i
g[i] += theta[k] * (cos(x[i])*sin(x[j]) *
(cos(x[i+1])*cos(x[j+1]) + sin(x[i+1])*sin(x[j+1])) -
sin(x[i])*cos(x[j]))
# Derivative wrt angle phi_i
g[i+1] += theta[k] * (sin(x[i])*sin(x[j]) *
(-sin(x[i+1])*cos(x[j+1]) + cos(x[i+1])*sin(x[j+1])))
return -g
[docs]
@njit
def grad_e_theta(theta, x):
"""Derivatives wrt the couplings theta."""
n = len(x)//2
g = np.zeros((len(theta)))
k = 0
for i in range(0, 2*(n-1), 2):
for j in range(i+2, 2*n, 2):
g[k] = (sin(x[i])*sin(x[j]) *
(cos(x[i+1])*cos(x[j+1]) + sin(x[i+1])*sin(x[j+1])) +
cos(x[i])*cos(x[j]))
k += 1
return -g
# --------------------------------------------------------------------------- #
# SWIsing
# --------------------------------------------------------------------------- #
[docs]
class SWIsing(Sampler):
def __init__(self, n, theta, calc_e, nCpus=None, rng=None):
"""
Swendsen-Wang sampler on Ising model with +/-1 formulation.
NOTE: This has not been properly tested.
Parameters
----------
n : int
Number of elements in system.
theta : ndarray
Vector of parameters in Hamiltonian.
calc_e : function
f( states, params )
nCpus : int,0
If None, then will use all available CPUs.
rng : RandomState,None
"""
raise NotImplementedError
self.n = n
self.theta = theta
self.h,self.J = theta[:n],theta[n:]
self.nCpus = nCpus or mp.cpu_count()
self.calc_e = calc_e
if rng is None:
self.rng=np.random.RandomState()
[docs]
def generate_sample_parallel(self,n_samples,n_iters,
initial_state=None,
n_cpus=None):
"""
Parameters
----------
n_samples
n_iters
initial_state : ndarray,None
"""
n_cpus = n_ncpus or self.nCpus
if initial_state is None:
sample = self.rng.choice([-1,1],size=(n_samples,self.n))
else:
sample = initial_state
def f(params):
i,seed,state = params
self.rng = np.random.RandomState(seed)
for j in range(n_iters):
self.one_step(state)
return state
p = mp.Pool(n_cpus)
self.sample = np.vstack( p.map(f,list(zip(list(range(n_samples)),
self.rng.randint(2**31,size=n_samples),
sample))) )
p.close()
[docs]
def generate_sample(self, n_samples, n_iters, initial_state=None):
"""
Parameters
----------
n_samples
n_iters
initial_state : ndarray,None
"""
if initial_state is None:
sample = self.rng.choice([-1,1],size=(n_samples,self.n))
else:
sample = initial_state
for i in range(n_samples):
for j in range(n_iters):
self.one_step(sample[i])
self.sample = sample
[docs]
def one_step(self,state):
self._clusters = self.get_clusters(state)
self.randomly_flip_clusters(state,self._clusters)
return state
[docs]
def get_clusters(self, state):
"""
Get a random sample of clusters.
"""
n,J = self.n,self.J
# Generate random edges between neighbors.
p = 1-np.exp(-2*self.J*pairwise_prod(state))
adj = squareform( sample_bonds(p,self.rng.rand(len(J)),state,J) )
return iter_cluster(adj)
[docs]
def randomly_flip_clusters(self, state, clusters):
for cls in clusters:
if self.rng.rand()>(1/(1+np.exp(-2*self.h[cls].dot(state[cls])))):
state[cls] *= -1
[docs]
def print_cluster_size(self, n_iters):
n_samples = 1
sample = self.rng.choice([-1,1],size=(n_samples,self.n))
for j in range(n_iters):
self.one_step(sample[0])
print(np.mean([len(c) for c in self._clusters]))
[docs]
@njit
def pairwise_prod(state):
counter = 0
n = len(state)
prod = np.zeros((n*(n-1)//2))
for i in range(n-1):
for j in range(i+1,n):
prod[counter] = state[i]*state[j]
counter += 1
return prod
[docs]
@njit
def sample_bonds(p, r, state, J):
"""
Parameters
----------
p : ndarray
Probability of bond formation.
r : ndarray
Random numbers.
state
J
"""
n = len(state)
bonds = np.zeros((len(J)))
counter=0
for i in range(n-1):
for j in range(i+1,n):
if J[counter]<0 and state[i]*state[j]<0 and r[counter]<p[counter]:
bonds[counter] = 1
elif J[counter]>0 and state[i]*state[j]>0 and r[counter]<p[counter]:
bonds[counter] = 1
counter += 1
return bonds
[docs]
def iter_cluster(adj):
"""
Cycle through all spins to get clusters.
"""
initialSites = set(range(len(adj)))
marked = set()
clusters = []
while initialSites:
thisCluster = []
newSites = [initialSites.pop()]
marked.add(newSites[0])
while len(newSites)>0:
thisCluster.append(newSites[0])
neighbors = set(np.nonzero(adj[newSites[0]])[0].tolist())
neighbors.difference_update(marked)
newSites += neighbors
marked.update(neighbors)
newSite0 = newSites.pop(0)
clusters.append(thisCluster)
initialSites.difference_update(marked)
return clusters
[docs]
def spec_cluster(L, exact=True):
"""
Parameters
----------
L : ndarray
Graph Laplacian
"""
from scipy.linalg import eig
eig, v = eig(L)
if exact:
clusterix = np.nonzero(eig == 0)[0]
else:
clusterix = np.nonzero(np.isclose(eig, 0))[0]
cluster = []
for i in clusterix:
cluster.append(np.nonzero(np.isclose(v[:, i], 0) == 0)[0])
return cluster
# --------------------------------------------------------------------------- #
# HamiltonianMC
# --------------------------------------------------------------------------- #
[docs]
class HamiltonianMC(Sampler):
def __init__(self, n, theta, calc_e, random_sample,
grad_e=None,
dt=.01,
leapfrogN=20,
nCpus=0):
"""
NOTE: This has not been properly tested.
Parameters
----------
n : int
Number of elements in system.
theta : ndarray
Vector of parameters in Hamiltonian.
calc_e : function
Must take in straight vector of numbers.
grad_e : function,None
dt : float,.01
Momentum step length.
leapfrogN : int,20
"""
raise NotImplementedError("This hasn't been properly tested.")
self.dt = dt
self.leapfrogN = leapfrogN
self.n = n
self.theta = theta
if nCpus is None:
self.nCpus = mp.cpu_count()
else:
self.nCpus = nCpus
self.calc_e = calc_e
if grad_e is None:
self.grad_e = Gradient(lambda x: calc_e(self.theta,x))
else:
self.grad_e = grad_e
self.random_sample = random_sample
[docs]
def sample(self, x0, nBurn, saveHistory=False):
"""
Get a single sample by MC sampling from this Hamiltonian. Slow method
"""
if saveHistory:
history = [x0]
x = x0
E = self.calc_e(self.theta,x0)
g = self.grad_e(self.theta,x0)
converged = False
counter = 0
while not converged:
# Randomly sample momentum.
p = np.random.normal(size=self.n)
# Current Hamiltonian on joint space.
H = p.dot(p)/2 + E
# Leapfrog. Involves sandwiching gradient descent on q around x gradient descent.
xnew = x
gnew = g
for t in range(self.leapfrogN):
p -= self.dt*gnew/2
xnew += self.dt*p
gnew = self.grad_e(self.theta,xnew)
p -= self.dt*gnew/2
# Compute new energies.
Enew = self.calc_e(self.theta,xnew)
Hnew = p.dot(p)/2 + Enew
dH = Hnew - H
# MC sample.
if dH<0 or np.random.rand()<np.exp(-dH):
g = gnew
E = Enew
x = xnew
if saveHistory:
history.append(x.copy())
if counter>nBurn:
converged = True
counter += 1
if saveHistory:
return x,history
return x
[docs]
def generate_sample(self, nSamples, nBurn=100, fast=True, x0=None):
"""Generate nSamples from this Hamiltonian starting from random initial conditions
from each sample.
"""
if x0 is None:
x0 = self.random_sample(nSamples)
if self.nCpus==0:
if fast:
dt,leapfrogN,theta = self.dt,self.leapfrogN,self.theta
@jit
def f(x0):
for i in range(nSamples):
r = np.random.normal(size=(nBurn,x0.shape[1]))
#r[:,::2] /= 2.
jit_sample( theta,x0[i],nBurn,dt,leapfrogN,
r,
np.random.rand(nBurn) )
return x0
return f(x0)
else:
for i in range(nSamples):
x = self.sample(x0[i],nBurn)
samples[i,:] = x[:]
return samples
else:
if fast:
dt,leapfrogN,theta = self.dt,self.leapfrogN,self.theta
@jit
def f(x0):
rng = np.random.RandomState()
for i in range(nSamples):
x = jit_sample(theta,x0,nBurn,dt,leapfrogN,
rng.normal(size=(nBurn,len(x0))),rng.rand(nBurn))
for j in range(len(x)):
samples[i,j] = x[j]
return samples
else:
def f(x0):
return self.sample(x0,nBurn)
p = mp.Pool(self.nCpus)
samples = np.vstack(( p.map(f,x0) ))
p.close()
return samples
[docs]
@njit
def jit_sample(theta, x0, nBurn, dt, leapfrogN, randNormal, randUnif):
"""
Get a single sample by MC sampling from this Hamiltonian.
Parameters
----------
theta : ndarray
Parameters
x0 : ndarray
Sample
nBurn : int
dt : float
leapfrogN : int
randNormal : ndarray
nBurn x ndim
randUnif : ndarray
nBurn
"""
x = x0
E = calc_e(theta,x0)
g = grad_e(theta,x0)
# Randomly sample momenta.
p = np.zeros_like(x0)
for counter in range(nBurn):
# Read in previously generated random momenta.
for i in range(len(x0)):
p[i] = randNormal[counter,i]
# Current Hamiltonian on joint space.
H = (p*p).sum()/2. + E
# Leapfrog. Involves sandwiching gradient descent on q around x gradient descent.
xnew = x
gnew = g
for t in range(leapfrogN):
p -= dt*gnew/2.
xnew += dt*p
gnew = grad_e(theta,xnew)
p -= dt*gnew/2.
# Compute new energies.
Enew = calc_e(theta,xnew)
Hnew = (p*p).sum()/2. + Enew
dH = Hnew - H
# MC sample.
if (dH<0) or (randUnif[counter]<np.exp(-dH)):
g = gnew
E = Enew
x = xnew
# --------------------------------------------------------------------------- #
# Heisenberg3DSampler
# --------------------------------------------------------------------------- #
[docs]
class Heisenberg3DSampler(Sampler):
"""
Simple MC Sampling from Heisenberg model with a lot of helpful functions.
Methods
-------
generate_sample()
equilibrate_samples()
sample_metropolis()
sample_energy_min()
"""
def __init__(self, J, calc_e, random_sample):
"""
NOTE: This has not been properly tested.
Parameters
----------
J : ndarray
vector of coupling parameters
calc_e : lambda
Function for calculating energies of array of given states with args
(J,states). States must be array with dimensions (nSamples,nSpins,3).
random_sample (lambda) Function for returning random samples with args
(rng,n_samples).
"""
raise NotImplementedError("This hasn't been properly tested.")
self.J = J
self.Jmat = squareform(J)
self.Jrows = np.zeros((self.Jmat.shape[0],self.Jmat.shape[0]-1))
for i in range(self.Jmat.shape[0]):
ix = np.zeros((self.Jmat.shape[0]))==0
ix[i] = False
self.Jrows[i] = self.Jmat[i][ix]
self.calc_e = calc_e
self.random_sample = random_sample
self.rng = np.random.RandomState()
[docs]
def generate_sample(self, nSamples, n_iters=100, **kwargs):
# Initialize random samples
samples = self.random_sample(self.rng,nSamples)
self.equilibrate_samples( samples, n_iters,**kwargs )
return samples
[docs]
def equilibrate_samples(self, samples, n_iters, method='mc', nCpus=0):
if nCpus is None:
nCpus = mp.cpu_count()
print("Using all cores.")
else:
print("Using %d cores."%nCpus)
if method=='mc':
def sample_method(s,E):
for i in range(n_iters):
_,E = self.sample_metropolis(s,E)
return E
else:
raise Exception("Unsupported sampling method.")
if nCpus==0:
E = np.zeros((len(samples)))
for i in range(len(samples)):
E[i] = self.calc_e( self.J,[samples[i]] )
E[i] = sample_method(samples[i],E[i])
return
else:
p = mp.Pool(nCpus)
def f(args):
sample,J = args
E = self.calc_e( J,[sample] )
self.rng = np.random.RandomState()
E = sample_method(sample,E)
return E,sample
E,samples[:] = list(zip( *p.map(f,list(zip(samples,[self.J]*len(samples)))) ))
p.close()
[docs]
def sample_metropolis(self, oldState, E0):
newState = self.sample_nearby_sample(oldState,sigma=.3)
E1 = self.calc_e( self.J, [newState] )
de = E1-E0
if (de>0 and (self.rng.rand()>np.exp(-de))):
return oldState,E0
oldState[:] = newState[:]
return newState,E1
[docs]
def sample_nearby_vector(self, v, nSamples=1, otheta=None, ophi=None, sigma=.1):
"""
Sample random vector that is nearby. It is important how you choose the width sigma.
NOTE: code might be simplified by using arctan2 instead of arctan
Parameters
----------
v : ndarray
xyz vector about which to sample random vectors
nSamples : int,1
number of random samples
otheta : float,None
polar angle for v
ophi : float,None
azimuthal angle for v
sigma : float,.1
width of Gaussian about v
"""
if otheta is None or ophi is None:
r = np.sqrt( v[0]*v[0] + v[1]*v[1] )
otheta = np.pi/2 - np.arctan( v[2]/r )
if v[1]>=0:
ophi = np.arccos( v[0]/r )
else:
ophi = 2*np.pi - np.arccos( v[0]/r )
return jit_sample_nearby_vector( self.rng.randint(2**32-1),v,nSamples,otheta,ophi,sigma )
def _sample_nearby_vector(self, v, nSamples=1, otheta=None, ophi=None, sigma=.1):
"""
Deprecated: old slower way. Sample random vector that is nearby.
"""
if otheta is None or ophi is None:
r = np.sqrt( v[0]*v[0] + v[1]*v[1] )
otheta = np.pi/2 - np.arctan( v[2]/r )
if v[1]>=0:
ophi = np.arccos( v[0]/r )
else:
ophi = 2*np.pi - np.arccos( v[0]/r )
# Generate random vector with roughly Gaussian distribution on spherical surface about z-axis.
if sigma==0:
theta = 0
else:
theta = self.rng.normal(scale=sigma,size=nSamples)
phi = self.rng.uniform(0,2*np.pi,size=nSamples)
randv = np.vstack([np.sin(theta)*np.cos(phi),np.sin(theta)*np.sin(phi),np.cos(theta)]).T
# Rotate into v frame using Rodrigues' formula.
k = np.array([-np.sin(ophi),np.cos(ophi),0])
return np.array([(r * np.cos(otheta) + np.cross(k,r)*np.sin(otheta) + k*np.dot(k,r)*(1-np.cos(otheta)))
for r in randv])
[docs]
def sample_nearby_sample(self, X, **kwargs):
"""Randomly move given state around for new metropolis sample."""
return np.vstack([self.sample_nearby_vector(x,**kwargs) for x in X])
[docs]
def grad_E(self, X):
"""
Gradient wrt theta and phi.
Parameters
----------
X : ndarray
with dims (nSpins,2) with angles theta and phi
"""
assert X.ndim==2
g = np.zeros((X.shape[0],2))
for i in range(X.shape[0]):
for j in range(X.shape[0]):
if i!=j:
g[i,0] += -( self.Jmat[i,j]*( np.cos(X[i,0])*np.sin(X[j,0])*np.cos(X[i,1]-X[j,1]) -
np.sin(X[i,0])*np.cos(X[j,0]) ) )
g[i,1] += self.Jmat[i,j] * np.sin(X[j,0]) * np.sin(X[i,1]-X[j,1])
return g
[docs]
def sample_energy_min(self,
nFixed=0,
rng=np.random.RandomState(),
initialState=None,
method='powell',
**kwargs):
"""Find local energy minimum given state in angular form."""
n = self.Jmat.shape[0]
if initialState is None:
initialState = np.zeros((1,n-nFixed,2))
initialState[0,:,0] = rng.uniform(0,np.pi,size=n-nFixed)
initialState[0,:,1] = rng.uniform(0,2*np.pi,size=n-nFixed)
def f(sample):
if np.any(sample<0):
return np.inf
sample = sample.reshape(1,n-nFixed,2)
if np.any(sample[0,:,0]>np.pi):
return np.inf
sample = self.convert_to_xyz(sample)
return self.calc_e(self.J,sample)
if method=='fmin':
return fmin(f,initialState.ravel())
return minimize(f,initialState.ravel(),method=method,**kwargs)
[docs]
@classmethod
def to_dict(cls,data,names):
"""Arrange 3d samples into a dict of n x 3 arrays."""
from collections import OrderedDict
X = OrderedDict()
for i,k in enumerate(names):
X[k] = np.vstack([d[i,:] for d in data])
return X
[docs]
@jit
def cross(vec1, vec2):
"""Cross product of two 3D vectors."""
result = np.zeros((3))
return cross_(vec1, vec2, result)
[docs]
@njit
def cross_(vec1, vec2, result):
"""Cross product of two 3D vectors (in-place)."""
a1, a2, a3 = vec1[0], vec1[1], vec1[2]
b1, b2, b3 = vec2[0], vec2[1], vec2[2]
result[0] = a2*b3 - a3*b2
result[1] = a3*b1 - a1*b3
result[2] = a1*b2 - a2*b1
return result
[docs]
@njit
def jit_sample_nearby_vector(rseed, v, nSamples, otheta, ophi, sigma):
np.random.seed(rseed)
# Generate random vector with roughly Gaussian distribution on spherical surface about z-axis.
if sigma == 0:
theta = np.zeros((1))
else:
theta = np.zeros((nSamples))
for i in range(nSamples):
theta[i] = np.random.randn()*sigma
phi = np.zeros((nSamples))
for i in range(nSamples):
phi[i] = np.random.rand()*2*np.pi
randv = np.zeros((nSamples, 3))
for i in range(nSamples):
randv[i, 0] = np.sin(theta[i])*np.cos(phi[i])
randv[i, 1] = np.sin(theta[i])*np.sin(phi[i])
randv[i, 2] = np.cos(theta[i])
# Rotate into v frame using Rodrigues' formula.
k = np.zeros((3))
k[0] = -np.sin(ophi)
k[1] = np.cos(ophi)
rotatedv = np.zeros_like(randv)
for i in range(len(randv)):
rotatedv_ = (randv[i] * np.cos(otheta) + cross(k, randv[i])*np.sin(otheta)
+ k*np.dot(k, randv[i])*(1-np.cos(otheta)))
for j in range(randv.shape[1]):
rotatedv[i, j] = rotatedv_[j]
return rotatedv
[docs]
def check_e_logp(sample, calc_e):
"""Boltzmann-type model with discrete state space should have E proportional
to -logP. Calculate these for comparison.
"""
uniqueix = unique_rows(sample, return_inverse=True)
uniqStates = sample[unique_rows(sample)]
stateCount = np.bincount(uniqueix)
stateCount = stateCount/stateCount.sum()
uniqueE = calc_e(uniqStates)
return uniqStates, uniqueE, np.log(stateCount)