Source code for coniii.samplers

# ====================================================================================== #
# Classes for sampling from Boltzmann type models.
# Released with ConIII package.
# Author : Eddie Lee, edlee@alumni.princeton.edu

# MIT License
# 
# Copyright (c) 2020 Edward D. Lee, Bryan C. Daniels
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ====================================================================================== #
"""Monte Carlo samplers for Boltzmann-type (maxent) models.

This module provides the sampling machinery used by the iterative
solvers in :mod:`coniii.solvers` (notably :class:`~coniii.solvers.MCH`)
and exposed to users through the convenience function
:func:`sample_ising`.

Public API (see ``__all__``)
----------------------------
:class:`Sampler`
    Abstract base class for all samplers.
:class:`Metropolis`
    Single-spin-flip Metropolis sampler for the Ising model. The
    workhorse; accelerated by the optional Boost C++ extension
    (``coniii.samplers_ext``) when it is compiled, otherwise a
    numba-jitted pure-Python path is used.
:class:`WolffIsing`
    Wolff cluster sampler.
:class:`ParallelTempering`
    Replica-exchange (parallel tempering) wrapper around Metropolis.
:class:`Potts3`
    Three-state Potts variant of :class:`Metropolis`.
:func:`sample_ising`
    One-call helper to draw Metropolis samples from an Ising model.

Work-in-progress samplers (``SWIsing``, ``HamiltonianMC``,
``Heisenberg3DSampler``) live in :mod:`coniii.experimental.samplers`.
"""
import numpy as np
from numba import jit, njit, float64, int64
from numpy import sin, cos, exp
from scipy.spatial.distance import squareform
import multiprocess as mp
from datetime import datetime
from multiprocess import Pool, cpu_count
from warnings import warn
from itertools import chain
from numba.typed import List

from .utils import *
try:
    from .samplers_ext import BoostIsing, BoostPotts3
    IMPORTED_SAMPLERS_EXT = True
except (ModuleNotFoundError, ImportError):
    # ModuleNotFoundError: the extension was never built (pure-Python install).
    # ImportError: the extension exists but a runtime dependency (e.g. a Boost
    # shared library) cannot be loaded. Either way, fall back to pure Python.
    IMPORTED_SAMPLERS_EXT = False


# Public API. BoostIsing / BoostPotts3 are not listed because their
# availability depends on whether the optional C++ extension compiled.
# Users who need them should import them directly:
#     from coniii.samplers_ext import BoostIsing
__all__ = [
    'Sampler',
    'Metropolis',
    'WolffIsing',
    'ParallelTempering',
    'Potts3',
    # Convenience entry point:
    'sample_ising',
]
# SWIsing, HamiltonianMC, Heisenberg3DSampler now live in
# coniii.experimental.samplers (work-in-progress).



[docs] class Sampler(): """Base class for MCMC sampling.""" def __init__(self, n, theta, **kwargs): """ Parameters ---------- n : int System size. theta : ndarray Lagrangian multipliers. """ assert type(n) is int, "n must be of type int." self.n = n self.theta = theta return
[docs] def update_parameters(self, new_parameters): return
[docs] def generate_sample(self, sample_size, **kwargs): """ Parameters ---------- sample_size : int """ return
[docs] def generate_sample_parallel(self, sample_size, **kwargs): """ Parameters ---------- sample_size : int """ return
[docs] def sample_metropolis(self, s, energy): """ Parameters ---------- s : ndarray State to perturb randomly. energy : float Energy of configuration. """ return
#end Sampler
[docs] class WolffIsing(Sampler): def __init__(self, J, h): """ Wolff cluster sampling for +/-1 Ising model. NOTE: This has not been properly tested. Parameters ---------- J : ndarray Couplings as vector or matrix. h : ndarray Local fields. """ assert len(J)==(len(h)*(len(h)-1)//2) from scipy.spatial.distance import squareform self.update_parameters(J,h) self.n=len(h) self.rng=np.random.RandomState()
[docs] def update_parameters(self, J, h): if J.ndim==1: J=squareform(J) self.J=J self.h=h
[docs] def generate_sample(self, samplesize, n_iters, initialSample=None, save_history=False, ): """ Generate samples by starting from random initial states. """ if initialSample is None: sample = self.rng.choice([-1.,1.],size=(samplesize,self.n)) else: sample = initialSample if save_history: history = np.zeros((samplesize,self.n,n_iters+1)) history[:,:,0] = sample.copy() for i,s in enumerate(sample): for j in range(n_iters): self.one_step(s,initialsite=j%self.n) history[i,:,j+1] = s.copy() return sample,history else: for i,s in enumerate(sample): for j in range(n_iters): self.one_step(s) return sample
[docs] def generate_sample_parallel(self,samplesize,n_iters, initialSample=None): """ Generate samples by starting from random or given initial states. """ if initialSample is None: sample = self.rng.choice([-1.,1.],size=(samplesize,self.n)) else: sample = initialSample #if (n_iters%self.n)!=0: # n_iters = (n_iters//self.n+1)*self.n # hit each spin the same number of times # print "Increased n_iters to %d"%n_iters def f(args): s,seed=args self.rng=np.random.RandomState(seed) for j in range(n_iters): self.one_step(s,initialsite=j%self.n) return s pool=mp.Pool(mp.cpu_count()) sample=np.vstack(pool.map(f,list(zip(sample,np.random.randint(2**31-1,size=samplesize))))) pool.close() return sample
def _generate_sample(self,samplesize,sampleseparation): """ Generate samples by evolving a single state. """ n=self.n state=self.rng.randint(2,size=n)*2-1. sample=np.zeros((samplesize,n)) nsamplesCounter=0 counter=0 while nsamplesCounter<samplesize: self.one_step(state) if (counter%sampleseparation)==0: sample[nsamplesCounter]=state.copy() nsamplesCounter+=1 counter+=1 return sample
[docs] def one_step(self,state,initialsite=None): """Run one iteration of the Wolff algorithm that involves finding a cluster and possibly flipping it. """ n,J,h=self.n,self.J,self.h self.expdJ = np.exp(-2*state[:,None]*J*state[None,:]) # Choose random site. initialsite = initialsite or self.rng.randint(n) cluster = self.build_cluster(state,initialsite) # Flip cluster? # The first line is my addition to the algorithm that introduces a factor into the acceptance ratio # such that detailed balance is met and agrees with the ratio of the probabiltiies of the two possible # orientations of the cluster. if self.rng.rand()<np.exp(-2*h[cluster].dot(state[cluster])): state[cluster] *= -1
[docs] def build_cluster(self,state,initialsite): """ Grow cluster from initial site. """ n=self.n marked,newSites=[],[] newSites.append(initialsite) # list of sites to continue exploring marked.append(initialsite) # list of sites in the cluster # While there are new neighboring sites to explore from the ones already marked, keep going. while len(newSites)>0: newSites += self.find_neighbors(state,newSites[0],marked) newSites.pop(0) return marked
[docs] def find_neighbors(self,state,site,alreadyMarked): """Return neighbors of given site that need to be visited excluding sites that have already been visited. This is the implementation of the Wolff algorithm for finding neighbors such that detailed balance is satisfied. I have modified to include random fields such tha the probability of adding a neighbors depends both on its coupling with the current site and the neighbor's magnetic field. Parameters ---------- state site alreadyMarked """ # Find neighbors in cluster. # If spins were parallel in original system, then the probability that a bond forms between is # (1-exp(-2*J)) # Find neighbors but keep only neighbors that have not already been visited. ix = np.zeros((self.n),dtype=bool) ix[alreadyMarked] = True ix[site] = True neighbors = iterate_neighbors(self.n,ix,self.expdJ[:,site],self.rng.rand(self.n)).tolist() alreadyMarked += neighbors # Add to list inplace. return neighbors
# Helper functions for WolffIsing. @jit def iterate_neighbors(n,ix,expdJ,r): """Iterate through all neighbors of a particular site and see if a bond should be formed between them. Parameters ---------- n : int System size. ix : ndarray of bool Indices of sites that have already been visited. expdJ : ndarray np.exp( -2*state[:,None]*state[None,:]*J ) r : ndarray Array of random numbers. """ counter=0 neighbors=np.zeros(n, dtype=np.int64) for i in range(n): # Don't include neighbors that are already marked. # Check against probability as specified in Wolff's paper for forming link. # Must index r with i and not counter because otherwise we end up with cases where we are repeating # comparisons betweeen multiple i. if not ix[i] and r[i]<(1-expdJ[i]): neighbors[counter]=i counter += 1 return neighbors[:counter] #end WolffIsing # ====================== # # Swendsen-Wang sampler. # # ====================== #
[docs] class ParallelTempering(Sampler): def __init__(self, n, theta, calc_e, n_replicas, Tbds=(1.,3.), sample_size=1000, replica_burnin=None, rep_ex_burnin=None, rng=None): """Run multiple replicas in parallel at different temperatures using Metropolis sampling to equilibrate. Hukushima, K, and K Nemoto. “Exchange Monte Carlo Method and Application to Spin Glass Simulations.” Journal of the Physical Society of Japan 65 (1996): 1604–1608. Parameters ---------- n : int Number of elements in system. theta : ndarray Concatenated vector of the field hi and couplings Jij. They should be ordered in ascending order 0<=i<n and for Jij in order of 0<=i<j<n. calc_e : function For calculating energy. n_replicas : int Number of replicas. Tbds : duple, (1,3) Lowest and highest temperatures to start with. Lowest temperature will not change. sample_size : int, 1000 replica_burnin : int, n*50 Default number of burn in iterations for each replica when first initialising (from completely uniform distribution). rep_ex_burnin : int, n*10 Default number of Metropolis steps after exchange. rng : RandomState, None """ assert type(n) is int, "n must be of type int." assert Tbds[0]<Tbds[1] and Tbds[0]>0 assert n_replicas>1 self.n = n self.theta = theta self.calc_e = calc_e self.nReplicas = n_replicas self.sampleSize = sample_size self.sample = None self.replicaBurnin = replica_burnin or n*50 self.repExBurnin = rep_ex_burnin or n*10 self.rng = rng or np.random.RandomState() self.beta = self.initialize_beta(1/Tbds[1], 1/Tbds[0], n_replicas) self.setup_replicas() assert (np.diff(self.beta)>0).all(), self.beta
[docs] def update_replica_parameters(self): """Update parameters for each replica. Remember that the parameters include the factor of beta. """ for b,rep in zip(self.beta, self.replicas): rep.theta = self.theta*b
[docs] def setup_replicas(self): """Initialise a set of replicas at different temperatures using the Metropolis algorithm and optimize the temperatures. Replicas are burned in and ready to sample. """ self.replicas = [] for i,b in enumerate(self.beta): self.replicas.append( Metropolis(self.n, self.theta*b, self.calc_e, boost=False) ) # give each replica an index self.replicas[i].index = i self.burn_in_replicas() self.optimize_beta(10, self.n*10) self.burn_in_replicas()
[docs] def burn_in_replicas(self, pool=None, close_pool=True, n_iters=None): """Run each replica separately. Parameters ---------- pool : multiprocess.Pool, None close_pool : bool, True If True, call pool.close() at end. n_iters : int, None Default value is self.replicaBurnin. """ n_iters = n_iters or self.replicaBurnin def f(args): rep, nIters = args rep.generate_sample(1, n_iters=nIters, systematic_iter=True) return rep pool = pool or Pool() self.replicas = pool.map(f, zip(self.replicas, [n_iters]*len(self.replicas))) if close_pool: pool.close()
[docs] def burn_and_exchange(self, pool): """ Parameters ---------- pool : mp.multiprocess.Pool """ self.burn_in_replicas(pool=pool, close_pool=False, n_iters=self.repExBurnin) for i in range(self.nReplicas-1): exchangeProb = self._acceptance_ratio(1, 0, pairs=[(i,i+1)]) if self.rng.rand()<exchangeProb: # swap replicas (only need to swap samples) temp = self.replicas[i].sample self.replicas[i].sample = self.replicas[i+1].sample self.replicas[i+1].sample = temp temp = self.replicas[i]._samples self.replicas[i]._samples = self.replicas[i+1]._samples self.replicas[i+1]._samples = temp temp = self.replicas[i].index self.replicas[i].index = self.replicas[i+1].index self.replicas[i+1].index = temp # must recalculate energies with new temperature temp = self.replicas[i].E self.replicas[i].E = self.replicas[i+1].E/self.beta[i+1]*self.beta[i] self.replicas[i+1].E = temp/self.beta[i]*self.beta[i+1]
[docs] def generate_sample(self, sample_size, save_exchange_trajectory=False): """ Burn in, run replica exchange simulation, then sample. Parameters ---------- sample_size : int Number of samples to take for each replica. save_exchange_trajectory : bool, False If True, keep track of the location of each replica in beta space and return the history. Returns ------- ndarray, optional Trajectory of each replica through beta space. Each row is tells where each index is located in beta space. """ self.sample = [np.zeros((sample_size,self.n), dtype=int) for i in range(self.nReplicas)] pool = Pool() if save_exchange_trajectory: replicaIndexHistory = np.zeros((sample_size, self.nReplicas), dtype=int) for i in range(sample_size): self.burn_and_exchange(pool) # save samples for j in range(self.nReplicas): self.sample[j][i,:] = self.replicas[j].sample[0,:] replicaIndexHistory[i,j] = self.replicas[j].index pool.close() return replicaIndexHistory else: for i in range(sample_size): self.burn_and_exchange(pool) # save samples for j in range(self.nReplicas): self.sample[j][i,:] = self.replicas[j].sample[0,:] pool.close()
[docs] @staticmethod def initialize_beta(b0, b1, n_replicas): """Use linear interpolation of temperature range.""" return np.linspace(b0, b1, n_replicas)
[docs] @staticmethod def iterate_beta(beta, acceptance_ratio): """Apply algorithm from Hukushima but reversed to maintain one replica at T=1. Parameters ---------- beta : ndarray Inverse temperature. acceptance_ratio : ndarray Estimate of acceptance ratio. Returns ------- ndarray New beta. """ assert (len(acceptance_ratio)+1)==len(beta) assert (acceptance_ratio<=1).all() newBeta = beta.copy() avg = acceptance_ratio.mean() for i in range(len(beta)-1, 0, -1): newBeta[i-1] = newBeta[i] - (beta[i]-beta[i-1]) * acceptance_ratio[i-1]/avg return newBeta
[docs] def optimize_beta(self, n_samples, n_iters, tol=.01, max_iter=10): """Find suitable temperature range for replicas. Sets self.beta. Parameters ---------- n_samples : int Number of samples to use to estimate acceptance ratio. Acceptance ratio is estimated as the average of these samples. n_iters : int Number of sampling iterations for each replica. tol : float, .1 Average change in beta to reach before stopping. max_iter : int, 10 Number of times to iterate algorithm for beta. Each iteration involves sampling from replicas. """ beta = self.beta oldBeta = np.zeros_like(beta) + np.inf counter = 0 while counter<max_iter and tol<np.abs(oldBeta-beta).mean(): acceptanceRatio = self._acceptance_ratio(n_samples, n_iters) oldBeta = beta beta = self.iterate_beta(beta, acceptanceRatio) # change beta in the replicas for sampling with them self.beta = beta self.update_replica_parameters() counter += 1 #print(beta, acceptanceRatio) if counter==max_iter: print("Optimization for beta did not converge.") # TODO: smooth by spline interpolation self.beta = beta
def _acceptance_ratio(self, n_samples, n_iters, pairs=None): """Estimate acceptance ratio as an average over multiple Metropolis samples. Parameters ---------- n_samples : int Number of Metropolis samples to use for averaging the ratio. n_iters : int Number of MC steps between samples. Bigger is better. pairs : list of duples, None Pairs for which to compute the acceptance ratio. If not given, all pairs are compared. Returns ------- ndarray Estimate of acceptance ratio for each pair. """ assert all([len(r.sample)==1 for r in self.replicas]) if pairs is None: pairs = [(i,i+1) for i in range(self.nReplicas-1)] acceptanceRatio = np.zeros((len(pairs),n_samples)) # estimate acceptance probabilities if n_iters>0: pool = Pool() for i in range(n_samples): self.burn_in_replicas(pool=pool, close_pool=False, n_iters=n_iters) for j,p in enumerate(pairs): # must divide out self.beta to get energies; each replica holds a # single current sample, so .E is length-1 -> take the scalar dE = self.replicas[p[0]].E[0]/self.beta[p[0]] - self.replicas[p[1]].E[0]/self.beta[p[1]] acceptanceRatio[j,i] = min(1, np.exp( dE * (self.beta[p[0]]-self.beta[p[1]]) )) pool.close() else: for j,p in enumerate(pairs): # must divide out self.beta to get energies; each replica holds a # single current sample, so .E is length-1 -> take the scalar dE = self.replicas[p[0]].E[0]/self.beta[p[0]] - self.replicas[p[1]].E[0]/self.beta[p[1]] acceptanceRatio[j,0] = min(1, np.exp( dE * (self.beta[p[0]]-self.beta[p[1]]) )) return acceptanceRatio.mean(1)
#end ParallelTempering #class FastMCIsing(Sampler): # def __init__(self, n, theta, # n_cpus=None, # rng=None): # """ # MC sample on Ising model with +/-1 formulation. Fast metropolis sampling by assuming form of # Hamiltonian is an Ising model. # # Parameters # ---------- # n : int # Number of elements in system. # theta : ndarray # Concatenated vector of the field hi and couplings Jij. They should be ordered in # ascending order 0<=i<n and for Jij in order of 0<=i<j<n. # n_cpus : int, 0 # If None, then will use all available CPUs minus 1. # rng : RandomState, None # """ # # warn("At the moment, Metropolis is much faster for sampling.") # self.n = n # self.update_parameters(theta) # self.nCpus = n_cpus or mp.cpu_count()-1 # # self.calc_e, calc_observables, mchApproximation = define_ising_helper_functions() # self.sample_metropolis = _jit_sample_metropolis # self.rng = rng or np.random.RandomState() # self._samples = None # # def update_parameters(self, theta): # self.theta = theta # self.h, self.J = theta[:self.n], squareform(theta[self.n:]) # # def generate_sample(self, # sample_size, # n_iters=1000, # saveHistory=False, # initial_sample=None, # systematic_iter=False): # """ # Generate Metropolis samples using a for loop and save samples in self.sample. # # Parameters # ---------- # sample_size : int # Number of samples to take. # n_iters : int, 1000 # Number of iterations to run Metropolis sampler. # saveHistory : bool, False # If True, the energy of the system after each Metropolis sampling step will be recorded. # This can be useful for making sure that the system has reached equilibrium. # initial_sample : ndarray, None # If this is given, then this is used as the starting point for the Markov chain instead # of a random sample. # systematic_iter : bool, False # If True, will iterate through each spin in a fixed sequence. This ensures that all spins # receive in equal number of chances to flip. # """ # # sample_metropolis = self.sample_metropolis # alias # # if (initial_sample is None and # (self._samples is None or len(self._samples)!=1)): # self._samples = self.rng.choice([-1,1], size=(1, self.n)) # elif not initial_sample is None: # msg = "Sequential sample generation requires initial sample of dim (1, n)." # assert np.array_equal((1,self.n), initial_sample.shape), msg # self._samples = initial_sample.astype(int) # # E = self.calc_e( self._samples, self.theta ) # self.E = np.zeros(sample_size) # self.sample = np.zeros((sample_size, self.n), dtype=int) # h, J = self.h, self.J # n = self.n # # if systematic_iter: # @jit(forceobj=True) # def get_ix(j, rng): # return j%n # else: # @jit(forceobj=True) # def get_ix(j, rng): # return rng.randint(n) # # if saveHistory: # @jit('float64[:,:]()', locals={'_samples':int64[:,:], 'E':float64[:]}, forceobj=True) # def sample(seed, E=E, _samples=self._samples): # # set rng for jit environment # np.random.seed(seed) # # history = np.zeros(sample_size*n_iters+1) # history[0] = E # counter = 1 # for i in range(sample_size): # for j in range(n_iters): # de = sample_metropolis( _samples[0], h, J, get_ix(j, np.random), np.random ) # E += de # history[counter] = E # counter += 1 # self.E[i] = E # self.sample[i,:] = _samples[:] # return history # else: # @jit(locals={'_samples':int64[:,:], 'E':float64[:]}, forceobj=True) # def sample(seed, E=E, _samples=self._samples): # # set rng for jit environment # np.random.seed(seed) # # for i in range(sample_size): # for j in range(n_iters): # de = sample_metropolis( _samples[0], h, J, get_ix(j, np.random), np.random ) # E += de # self.E[i] = E # self.sample[i,:] = _samples[:] # return sample(self.rng.randint(2**32-1)) # # def generate_sample_parallel(self, # sample_size, # n_iters=1000, # n_cpus=None, # initial_sample=None, # systematic_iter=False): # """ # Metropolis sample multiple states in parallel and save them into self.sample. # # Parameters # ---------- # sample_size : int # Number of samples to take. # n_iters : int, 1000 # Number of iterations to run Metropolis sampler. # initial_sample : ndarray, None # If this is given, then this is used as the starting point for the Markov chain instead # of a random sample. # systematic_iter : bool, False # If True, will iterate through each spin in a fixed sequence. This ensures that all spins # receive in equal number of chances to flip. # """ # # n_cpus = self.nCpus # alias # assert n_cpus>=2 # assert sample_size>n_cpus, "Parallelization only helps if many samples are generated per thread." # if (initial_sample is None and # (self._samples is None or len(self._samples)!=n_cpus)): # self._samples = self.rng.choice([-1,1], size=(n_cpus, self.n)) # elif not initial_sample is None: # assert np.array_equal((n_cpus,self.n), initial_sample.shape), "initial_sample wrong size" # self._samples = initial_sample.astype(int) # # n_cpus = n_cpus or self.nCpus # sample_metropolis = _jit_sample_metropolis # h, J = self.h, self.J # n = self.n # calc_e = self.calc_e # theta = self.theta # # if systematic_iter: # @jit(forceobj=True) # def get_ix(j, rng): # return j%n # else: # @jit(forceobj=True) # def get_ix(j, rng): # return rng.randint(n) # # @njit(locals={'seed':int64, 'theta':float64[:], 's':int64[:,:], 'E':float64[:]}) # def f(args): # seed, theta, s = args # np.random.seed(seed) # # E = calc_e(s, theta) # # for j in range(n_iters): # de = sample_metropolis( s[0], h, J, get_ix(j, np.random), np.random ) # E += de # return s, E # # args=((self.rng.randint(2**32-1),theta,self._samples[i][None,:]) for i in range(n_cpus)) # # # run sampling # pool = mp.Pool(n_cpus) # self.sample, self.E = list(zip(*pool.map(f, args))) # pool.close() # # # save results of sampling into instance data members # self.sample = np.vstack(self.sample) # self.E = np.array(self.E) # # def _sample_metropolis(self, sample0, rng=None, flip_site=None): # """ # Metropolis sampling. # """ # # rng = rng or self.rng # flipSite = flip_site or rng.randint(sample0.size) # sample0[flipSite] *= -1 # de = -2*sample0[flipSite]*self.h[flipSite] - 2*self.J[flipSite].dot(sample0[flipSite]*sample0) # # # Only accept flip if dE<=0 or probability exp(-dE) # # Thus reject flip if dE>0 and with probability (1-exp(-dE)) # if de<0: # return de # elif rng.rand()>np.exp(-de): # sample0[flipSite] *= -1 # return 0. # else: # return de #end FastMCIsing @jit(forceobj=True) def _jit_sample_metropolis(sample0, h, J, flip_site, rng): """ Metropolis sampling. """ sample0[flip_site] *= -1 de = -2*sample0[flip_site]*h[flip_site] - np.dot(2*J[flip_site], 1.*sample0[flip_site]*sample0) # Only accept flip if dE<=0 or probability exp(-dE) # Thus reject flip if dE>0 and with probability (1-exp(-dE)) if de < 0: return de elif rng.rand() > np.exp(-de): sample0[flip_site] *= -1 return 0. else: return de
[docs] class Metropolis(Sampler): def __init__(self, n, theta, calc_e=None, rng=None, boost=True, iprint=True): """MC sample on Ising model with +/-1 formulation. Attempts to use Boost library but defaults back to pure Python methods if it is unavailable. Parameters ---------- n : int Number of elements in system. theta : ndarray Vector of parameters in Hamiltonian. calc_e : function, None Function for calculating the energy of any given state. Only required if default Boost library for sampling from Ising model is not being used. Of form f( states, params ) rng : np.random.RandomState, None Random number generator. boost : bool, True If True, use boost library. iprint : bool, True If True, print details when code is run. """ assert type(n) is int, "n must be of type int." self.n = n self.theta = np.asarray(theta, dtype=float) # int theta gave wrong energies/sampling (#34) self.calc_e = calc_e self.rng = rng or np.random.RandomState() self._samples = None # use boost by default for Ising model if boost and IMPORTED_SAMPLERS_EXT and self.theta.size==(n*(n-1)//2+n): if iprint: warn("Assuming that the model is Ising.") # use boost library for fast sampling self.generate_sample = self.generate_sample_boost self.generate_sample_parallel = self.generate_sample_parallel_boost self.generate_cond_sample = self.generate_cond_sample_boost else: if boost and iprint: warn("Boost C++ implementation not available. Defaulting to slower sampling methods.") assert not self.calc_e is None self.generate_sample = self.generate_sample_py self.generate_sample_parallel = self.generate_sample_parallel_py self.generate_cond_sample = self.generate_cond_sample_py
[docs] def generate_sample_boost(self, sample_size, n_iters=1000, burn_in=None, systematic_iter=False): """Generate Metropolis samples using C++ and boost. Parameters ---------- sample_size : int Number of samples. n_iters : int, 1000 Number of Metropolis iterations between samples. burn_in : int, None If not set, will be the same value as n_iters. systematic_iter : bool, False If True, iterate through each element of system by increment index by one. Returns ------- ndarray, optional Saved array of energies at each sampling step. """ burn_in = burn_in or n_iters assert sample_size>0 and n_iters>-1 and burn_in>-1 bsampler = BoostIsing(self.n, self.theta, int(self.rng.randint(0, 2**31-1))) bsampler.generate_sample(sample_size, burn_in, n_iters, systematic_iter) self.sample = bsampler.fetch_sample().astype(int)
[docs] def generate_sample_py(self, sample_size, n_iters=1000, burn_in=None, systematic_iter=False, saveHistory=False, initial_sample=None): """Generate Metropolis samples using a for loop. Parameters ---------- sample_size : int Number of samples. n_iters : int, 1000 Number of iterations to run the sampler floor. burn_in : int, None If None, n_iters is used. systematic_iter : bool, False If True, iterate through each element of system by increment index by one. saveHistory : bool, False If True, also save the energy of each sample at each sampling step. initial_sample : ndarray, None Start with this sample (i.e. to avoid warming up). Otherwise, self._samples is the initial sample. Returns ------- ndarray, optional Saved array of energies at each sampling step. """ burn_in = burn_in or n_iters if (initial_sample is None and (self._samples is None or len(self._samples)!=sample_size)): self._samples = self.random_sample(1) elif not initial_sample is None: msg = "Initial sample can only be one state for sequential sampling." assert np.array_equal((1,self.n), initial_sample.shape), msg self._samples = initial_sample.astype(int) E = self.calc_e( self._samples, self.theta )[0] # scalar energy of the single replica self.sample = np.zeros((sample_size, self.n), dtype=int) self.E = np.zeros(sample_size) # burn in if systematic_iter: for i in range(burn_in): de = self.sample_metropolis( self._samples[0], E, flip_site=i%self.n ) E += de else: for i in range(burn_in): de = self.sample_metropolis( self._samples[0], E ) E += de if saveHistory: history = np.zeros(sample_size*n_iters+1) if systematic_iter: counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis( self._samples[0], E, flip_site=j%self.n ) E += de history[counter] = E counter += 1 self.sample[i,:] = self._samples[:] self.E[i] = E else: counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis( self._samples[0], E ) E += de history[counter]=E counter += 1 self.sample[i,:] = self._samples[:] self.E[i] = E return history else: if systematic_iter: counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis( self._samples[0], E, flip_site=j%self.n ) E += de counter +=1 self.sample[i,:] = self._samples[:] self.E[i] = E else: counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis( self._samples[0], E ) E += de counter += 1 self.sample[i,:] = self._samples[:] self.E[i] = E
[docs] def generate_sample_parallel_boost(self, sample_size, n_iters=1000, burn_in=None, systematic_iter=False): """Generate samples in parallel. Each replica in self._samples runs on its own thread and a sample is generated every n_iters. In order to control the random number generator, we pass in seeds that are samples from the class instance's rng. Parameters ---------- sample_size : int Number of samples. n_iters : int, 1000 Number of iterations between taking a random sample. burn_in : int, None If None, n_iters is used. systematic_iter : bool, False If True, iterate through spins systematically instead of choosing them randomly. """ burn_in = burn_in or n_iters # Parallel sample. Each thread needs to return sample_size/n_cpus samples. def f(args, n=self.n, multipliers=self.theta): sample_size, seed = args bsampler = BoostIsing(n, multipliers, int(seed)) bsampler.generate_sample(sample_size, burn_in, n_iters, systematic_iter) return bsampler.fetch_sample().astype(int) with mp.Pool() as pool: n_cpus = mp.cpu_count() self.sample = np.vstack( list(pool.map(f, zip([int(np.ceil(sample_size/n_cpus))]*n_cpus, self.rng.randint(2**31-1, size=n_cpus)))) ) self.sample = np.vstack(self.sample)[:sample_size]
[docs] def generate_sample_parallel_py(self, sample_size, n_iters=1000, burn_in=None, initial_sample=None, systematic_iter=False): """ Generate samples in parallel. Each replica in self._samples runs on its own thread and a sample is generated every n_iters. In order to control the random number generator, we pass in seeds that are samples from the class instance's rng. Parameters ---------- sample_size : int Number of samples. n_iters : int, 1000 Number of iterations between taking a random sample. burn_in : int, None If None, n_iters is used. initial_sample : ndarray, None Starting set of replicas otherwise self._samples is used. systematic_iter : bool, False If True, iterate through spins systematically instead of choosing them randomly. """ burn_in = burn_in or n_iters n_cpus = mp.cpu_count() if (initial_sample is None and (self._samples is None or len(self._samples)!=n_cpus)): self._samples = self.random_sample(n_cpus) elif not initial_sample is None: assert np.array_equal((n_cpus,self.n), initial_sample.shape), "initial_sample wrong size" self._samples = initial_sample.astype(int) E = self.calc_e( self._samples, self.theta ) # one energy per replica (length n_cpus) self.sample = None # delete this to speed up pickling for multiprocess # Parallel sample. Each thread needs to return sample_size/n_cpus samples. if not systematic_iter: def f(args): s, E, nSamples, seed = args rng = np.random.RandomState(seed) samples = np.zeros((nSamples, self.n), dtype=np.int8) for i in range(burn_in): de = self.sample_metropolis( s, E, rng=rng ) E += de for i in range(nSamples): for j in range(n_iters): de = self.sample_metropolis( s, E, rng=rng ) E += de samples[i,:] = s[:] return samples, s, E else: def f(args): s, E, nSamples, seed = args rng = np.random.RandomState(seed) samples = np.zeros((nSamples, self.n), dtype=np.int8) for i in range(burn_in): de = self.sample_metropolis( s, E, rng=rng, flip_site=i%self.n ) E += de for i in range(nSamples): for j in range(n_iters): de = self.sample_metropolis( s, E, rng=rng, flip_site=j%self.n ) E += de samples[i,:] = s[:] return samples, s, E with mp.Pool() as pool: self.sample, self._samples, self.E = list(zip(*pool.map(f,zip(self._samples, E, [int(np.ceil(sample_size/n_cpus))]*n_cpus, self.rng.randint(2**31-1,size=n_cpus))))) self.sample = np.vstack(self.sample)[:sample_size] self._samples = np.vstack(self._samples)
[docs] def generate_cond_sample_boost(self, sample_size, fixed_subset, burn_in=1000, n_iters=1000, initial_sample=None, systematic_iter=False, parallel=True): """Generate samples from conditional distribution (while a subset of the spins are held fixed). Parameters ---------- sample_size : int fixed_subset : list of duples Each duple is the index of the spin and the value to fix it at. These should be ordered by spin index. burn_in : int, 1000 Burn in. n_iters : int, 1000 initial_sample : ndarray, None Option to set initial random sample. systematic_iter : bool, False Iterate through spins systematically instead of choosing them randomly. parallel : bool, True If True, use parallelized routine. Returns ------- ndarray Samples from distribution. ndarray Energy of each sample. """ burn_in = burn_in or n_iters assert sample_size>0 and n_iters>-1 and burn_in>-1 fixed_subset_ = list(zip(*fixed_subset)) assert (np.diff(fixed_subset_[0]) > 0).all() assert frozenset((-1,1)) >= set(fixed_subset_[1]) # reformat for C++ module, note that Boost still relies on 32-bit ints fixed_subset = list(zip(*fixed_subset)) fixed_subset = np.array(fixed_subset[0], dtype=np.int32), np.array(fixed_subset[1], dtype=np.int32) assert (np.diff(fixed_subset[0]) > 0).all() assert frozenset((-1,1)) >= set(fixed_subset[1].tolist()) if not parallel: bsampler = BoostIsing(self.n, self.theta, int(self.rng.randint(0, 2**31-1))) bsampler.generate_cond_sample(fixed_subset[0], fixed_subset[1], sample_size, burn_in, n_iters, systematic_iter) self.sample = bsampler.fetch_sample().astype(int) else: # Parallel sample. Each thread needs to return sample_size/n_cpus samples. def f(args, n=self.n, multipliers=self.theta): sample_size, seed = args bsampler = BoostIsing(n, multipliers, int(seed)) bsampler.generate_cond_sample(fixed_subset[0], fixed_subset[1], sample_size, burn_in, n_iters, systematic_iter) return bsampler.fetch_sample().astype(int) n_cpus = mp.cpu_count() with mp.Pool() as pool: self.sample = np.vstack( list(pool.map(f, zip([int(np.ceil(sample_size/n_cpus))]*n_cpus, self.rng.randint(2**31-1, size=n_cpus)))) ) self.sample = np.vstack(self.sample)[:sample_size]
[docs] def generate_cond_sample_py(self, sample_size, fixed_subset, burn_in=1000, n_iters=1000, initial_sample=None, systematic_iter=False, parallel=False): """ Generate samples from conditional distribution (while a subset of the spins are held fixed). Samples are generated in parallel. NOTE: There is a bug with multiprocess where many calls to the parallel sampling routine in a row leads to increasingly slow evaluation of the code. Parameters ---------- sample_size : int fixed_subset : list of duples Each duple is the index of the spin and the value to fix it at. These should be ordered by spin index. burn_in : int, 1000 Burn in. n_iters : int, 1000 initial_sample : ndarray, None Option to set initial random sample. systematic_iter : bool, False Iterate through spins systematically instead of choosing them randomly. parallel : bool, False If True, use parallelized routine. Returns ------- ndarray Samples from distribution. ndarray Energy of each sample. """ nSubset = self.n - len(fixed_subset) fixed_subset_ = list(zip(*fixed_subset)) assert (np.diff(fixed_subset_[0]) > 0).all() assert frozenset((-1,1)) >= set(fixed_subset_[1]) # Initialize sampler. if initial_sample is None: self.sample = self.rng.choice([-1,1], size=(sample_size, nSubset)) else: self.sample = initial_sample # Redefine calc_e to calculate energy and putting back in the fixed spins. def cond_calc_e(state, theta): fullstate = np.zeros((1, self.n), dtype=np.int64) i0 = 0 stateix = 0 # Fill all spins in between fixed ones. for i, s in fixed_subset: for ii in range(i0, i): fullstate[0,ii] = state[0,stateix] stateix += 1 fullstate[0,i] = s i0 = i+1 # Any remaining spots to fill. for ii in range(i0, self.n): fullstate[0,ii] = state[0,stateix] stateix += 1 return self.calc_e(fullstate, theta) self.E = np.array([cond_calc_e(s[None,:], self.theta) for s in self.sample]) # Parallel sample. if parallel: # need to recode this to use replicas raise NotImplementedError if not systematic_iter: def f(args): s, E, seed = args rng = np.random.RandomState(seed) for j in range(burn_in): de = self.sample_metropolis(s, E, rng=rng, calc_e=cond_calc_e) E += de return s, E else: def f(args): s, E, seed = args rng = np.random.RandomState(seed) for j in range(burn_in): de = self.sample_metropolis(s, E, rng=rng, flip_site=j%nSubset, calc_e=cond_calc_e) E += de return s, E # avoid pickling a copy of self.sample into every thread samples = self.sample self.sample = None with mp.Pool() as pool: # evolve each sample for burn_in steps args = zip(samples, self.E, self.rng.randint(0, 2**31-1, size=sample_size)) self.sample, self.E = list(zip(*pool.map(f, args))) self.sample = np.vstack(self.sample) else: if not systematic_iter: def loop(): s = self.rng.choice([-1,1], size=nSubset) E = cond_calc_e(s[None,:], self.theta) for j in range(burn_in): de = self.sample_metropolis(s, E, rng=self.rng, calc_e=cond_calc_e) E += de counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis(s, E, rng=self.rng, calc_e=cond_calc_e) E += de counter += 1 self.sample[i,:] = s[:] self.E[i] = E else: def loop(): s = self.rng.choice([-1,1], size=nSubset) E = cond_calc_e(s[None,:], self.theta) for j in range(burn_in): de = self.sample_metropolis(s, E, rng=self.rng, flip_site=j%nSubset, calc_e=cond_calc_e) E += de counter = 0 for i in range(sample_size): for j in range(n_iters): de = self.sample_metropolis(s, E, rng=self.rng, flip_site=j%nSubset, calc_e=cond_calc_e) E += de counter += 1 self.sample[i,:] = s[:] self.E[i] = E # run sampling loop() # Insert fixed spins back in. counter = 0 for i, s in fixed_subset: if i==0: self.sample = np.insert(self.sample, list(range(i, self.sample.size, nSubset+counter)), s) else: self.sample = np.insert(self.sample, list(range(i, self.sample.size+1, nSubset+counter)), s) counter += 1 self.sample = np.reshape(self.sample, (sample_size, self.n)) self.E = np.concatenate(self.E)
[docs] def sample_metropolis(self, sample0, E0, rng=None, flip_site=None, calc_e=None): """Metropolis sampling given an arbitrary energy function. Parameters ---------- sample0 : ndarray Sample to start with. Passed by ref and changed. E0 : ndarray Initial energy of state. rng : np.random.RandomState Random number generator. flip_site : int Site to flip. calc_e : function If another function to calculate energy should be used Returns ------- float delta energy. """ rng = rng or self.rng if flip_site is None: flip_site = rng.randint(sample0.size) calc_e = calc_e or self.calc_e sample0[flip_site] *= -1 E1 = calc_e( sample0[None,:], self.theta )[0] # scalar energy of single state de = E1-E0 # Only accept flip if dE<=0 or probability exp(-dE) # Thus reject flip if dE>0 and with probability (1-exp(-dE)) if ( de>0 and (rng.rand()>np.exp(-de)) ): sample0[flip_site] *= -1 return 0. else: return de
[docs] def random_sample(self, n_samples): return self.rng.choice([-1,1], size=(n_samples, self.n))
[docs] def update_parameters(self, theta): assert self.theta.size==theta.size, "New parameters must be of same size." theta = np.asarray(theta, dtype=float) # ensure float (#34) self.theta = theta.copy()
def define_jit_cond_calc_e(n, calc_e, fixed_subset): """ Redefine calc_e to calculate energy and putting back in the fixed spins. Parameters ---------- state : ndarray Free spins (not fixed). theta : ndarray Parameters. Returns ------- float Energy of state with fixed spins. """ @njit def cond_calc_e(state, theta, fixed_subset=fixed_subset): fullstate = np.zeros((1, n), dtype=np.int64) i0 = 0 stateix = 0 # Fill all spins in between fixed ones. for i in range(0, fixed_subset.size, 2): i = fixed_subset[i] s = fixed_subset[i+1] for ii in range(i0, i): fullstate[0,ii] = state[0,stateix] stateix += 1 fullstate[0,i] = s i0 = i+1 # Any remaining spots to fill. for ii in range(i0, n): fullstate[0,ii] = state[0,stateix] stateix += 1 return calc_e(fullstate, theta) return cond_calc_e #end Metropolis
[docs] class Potts3(Metropolis): def __init__(self, n, theta, calc_e=None, n_cpus=None, rng=None, boost=True): """MC sample 3-state Potts model. Attempts to use Boost library but defaults back to pure Python methods if it is unavailable. Parameters ---------- n : int Number of elements in system. theta : ndarray Vector of parameters in Hamiltonian. calc_e : function, None Function for calculating the energy of any given state. Only required if default Boost library for sampling from Ising model is not being used. Of form f( states, params ) n_cpus : int, None If None, then will use all available CPUs. rng : np.random.RandomState, None Random number generator. boost : bool, True If True, use boost library. """ assert type(n) is int, "n must be of type int." if type(theta) is np.ndarray: assert theta.size==(n*3+n*(n-1)//2) else: assert theta[0].size==(3*n) and theta[1].size==n*(n-1)/2 self.n = n self.theta = theta self.nCpus = n_cpus or mp.cpu_count()-1 assert type(self.nCpus) is int, "n_cpus must be of type int." self.calc_e = calc_e self.rng = rng or np.random.RandomState() self._samples = None # use boost by default if boost and IMPORTED_SAMPLERS_EXT: self.bsampler = BoostPotts3(n, theta, self.rng.randint(2**31-1)) # use boost library for fast sampling self.generate_sample = self.generate_sample_boost self.generate_sample_parallel = self.generate_sample_parallel_boost else: if boost: warn("Boost library not available. Defaulting to slower sampling methods.") assert not self.calc_e is None self.generate_sample = self.generate_sample_py self.generate_sample_parallel = self.generate_sample_parallel_py
[docs] def generate_sample_parallel_boost(self, sample_size, n_iters=1000, burn_in=None, systematic_iter=False): """Generate samples in parallel. Each replica in self._samples runs on its own thread and a sample is generated every n_iters. In order to control the random number generator, we pass in seeds that are samples from the class instance's rng. Parameters ---------- sample_size : int Number of samples. n_iters : int, 1000 Number of iterations between taking a random sample. burn_in : int, None If None, n_iters is used. systematic_iter : bool, False If True, iterate through spins systematically instead of choosing them randomly. """ burn_in = burn_in or n_iters n_cpus = self.nCpus # alias assert n_cpus>=2, "Sampler is not set up for multiprocessing, nCpus<=1." assert sample_size>n_cpus, "Parallelization only helps if many samples are generated per thread." # Parallel sample. Each thread needs to return sample_size/n_cpus samples. def f(args, n=self.n, multipliers=self.theta): nSamples, seed = args bsampler = BoostPotts3(n, multipliers, int(seed)) bsampler.generate_sample(nSamples, burn_in, n_iters, systematic_iter) return bsampler.fetch_sample().astype(int) pool = mp.Pool(n_cpus) self.sample = np.vstack( list(pool.map(f, zip([int(np.ceil(sample_size/n_cpus))]*n_cpus, self.rng.randint(2**31-1, size=n_cpus)))) ) pool.close() self.sample = np.vstack(self.sample)[:sample_size]
[docs] def sample_metropolis(self, sample0, E0, rng=None, flip_site=None, calc_e=None): """Metropolis sampling given an arbitrary sampling function. Parameters ---------- sample0 : ndarray Sample to start with. Passed by ref and changed. E0 : ndarray Initial energy of state. rng : np.random.RandomState Random number generator. flip_site : int Site to flip. calc_e : function If another function to calculate energy should be used Returns ------- float delta energy. """ rng = rng or self.rng if flip_site is None: flip_site = rng.randint(sample0.size) calc_e = calc_e or self.calc_e oState = sample0[flip_site] sample0[flip_site] = (oState+rng.randint(1,3))%3 E1 = calc_e( sample0[None,:], self.theta )[0] # scalar energy of single state de = E1-E0 # Only accept flip if dE<=0 or probability exp(-dE) # Thus reject flip if dE>0 and with probability (1-exp(-dE)) if ( de>0 and (rng.rand()>np.exp(-de)) ): sample0[flip_site] = oState return 0. else: return de
[docs] def random_sample(self, n_samples): return self.rng.randint(0, 3, size=(n_samples, self.n))
#end Potts3
[docs] def sample_ising(multipliers, n_samples, seed=None, parallel=True, generate_sample_kw={}): """Easy way to Metropolis sample from Ising model. Parameters ---------- multipliers : ndarray or twople N individual fields followed by N(N-1)/2 pairwise couplings. n_samples : int Number of samples to take. seed : int, None Random number generator seed. parallel : bool, True If True, use parallelized sampling routine. generate_sample_kw : dict, {} Any extra arguments to send into Metropolis sampler. Default args are n_iters=1000 systematic_iter=False saveHistory=False initial_sample=None Returns ------- ndarray Matrix of dimensions (n_samples, n). """ if len(multipliers)==2: multipliers = np.concatenate(multipliers) else: multipliers = np.array(multipliers) rng = np.random.RandomState(seed=seed) n = 0.5 * (-1 + np.sqrt(1 + 8*len(multipliers)) ) assert n == int(n),"The length of multipliers vector does not correspond to an integer number of spins." calc_e = define_ising_helper_functions()[0] sampler = Metropolis(int(n), multipliers, rng=rng, iprint=False, calc_e=calc_e) # generate samples if parallel: sampler.generate_sample_parallel(n_samples, **generate_sample_kw) else: sampler.generate_sample(n_samples, **generate_sample_kw) return sampler.sample