Source code for gstools.random.rng

"""
GStools subpackage providing the core of the spatial random field generation.

.. currentmodule:: gstools.random.rng

The following classes are provided

.. autosummary::
   RNG
"""
# pylint: disable=E1101
import emcee as mc
import numpy as np
import numpy.random as rand
from emcee.state import State

from gstools.random.tools import MasterRNG, dist_gen

__all__ = ["RNG"]


[docs]class RNG: """ A random number generator for different distributions and multiple streams. Parameters ---------- seed : :class:`int` or :any:`None`, optional The seed of the master RNG, if ``None``, a random seed is used. Default: ``None`` """ def __init__(self, seed=None): # set seed self._master_rng = None self.seed = seed
[docs] def sample_ln_pdf( self, ln_pdf, size=None, sample_around=1.0, nwalkers=50, burn_in=20, oversampling_factor=10, ): """Sample from a distribution given by ln(pdf). This algorithm uses the :class:`emcee.EnsembleSampler` Parameters ---------- ln_pdf : :any:`callable` The logarithm of the Probability density function of the given distribution, that takes a single argument size : :class:`int` or :any:`None`, optional sample size. Default: None sample_around : :class:`float`, optional Starting point for initial guess Default: 1. nwalkers : :class:`int`, optional The number of walkers in the mcmc sampler. Used for the emcee.EnsembleSampler class. Default: 50 burn_in : :class:`int`, optional Number of burn-in runs in the mcmc algorithm. Default: 20 oversampling_factor : :class:`int`, optional To guess the sample number needed for proper results, we use a factor for oversampling. The intern used sample-size is calculated by ``sample_size = max(burn_in, (size/nwalkers)*oversampling_factor)`` So at least, as much as the burn-in runs. Default: 10 """ if size is None: # pragma: no cover sample_size = burn_in else: sample_size = max(burn_in, (size / nwalkers) * oversampling_factor) # sample_size needs to be integer for emcee >= 3.1 sample_size = int(sample_size) # initial guess init_guess = ( self.random.rand(nwalkers).reshape((nwalkers, 1)) * sample_around ) # initialize the sampler sampler = mc.EnsembleSampler(nwalkers, 1, ln_pdf) # burn in phase with saving of last position initial_state = State(init_guess, copy=True) initial_state.random_state = self.random.get_state() burn_in_state = sampler.run_mcmc( initial_state=initial_state, nsteps=burn_in ) # reset after burn_in sampler.reset() # actual sampling initial_state = State(burn_in_state, copy=True) initial_state.random_state = self.random.get_state() sampler.run_mcmc(initial_state=initial_state, nsteps=sample_size) samples = sampler.get_chain(flat=True)[:, 0] # choose samples according to size return self.random.choice(samples, size)
[docs] def sample_dist(self, pdf=None, cdf=None, ppf=None, size=None, **kwargs): """Sample from a distribution given by pdf, cdf and/or ppf. Parameters ---------- pdf : :any:`callable` or :any:`None`, optional Probability density function of the given distribution, that takes a single argument Default: ``None`` cdf : :any:`callable` or :any:`None`, optional Cumulative distribution function of the given distribution, that takes a single argument Default: ``None`` ppf : :any:`callable` or :any:`None`, optional Percent point function of the given distribution, that takes a single argument Default: ``None`` size : :class:`int` or :any:`None`, optional sample size. Default: None **kwargs Keyword-arguments that are forwarded to :any:`scipy.stats.rv_continuous`. Returns ------- samples : :class:`float` or :class:`numpy.ndarray` the samples from the given distribution Notes ----- At least pdf or cdf needs to be given. """ kwargs["seed"] = self.random dist = dist_gen(pdf_in=pdf, cdf_in=cdf, ppf_in=ppf, **kwargs) return dist.rvs(size=size)
[docs] def sample_sphere(self, dim, size=None): """Uniform sampling on a d-dimensional sphere. Parameters ---------- dim : :class:`int` Dimension of the sphere. Just 1, 2, and 3 supported. size : :class:`int`, optional sample size Returns ------- coord : :class:`numpy.ndarray` x[, y[, z]] coordinates on the sphere with shape (dim, size) """ if size is None: # pragma: no cover coord = np.empty((dim, 1), dtype=np.double) else: coord = np.empty( # saver conversion of size to resulting shape (dim,) + tuple(np.atleast_1d(size)), dtype=np.double ) if dim == 1: coord[0] = self.random.choice([-1, 1], size=size) elif dim == 2: ang1 = self.random.uniform(0.0, 2 * np.pi, size) coord[0] = np.cos(ang1) coord[1] = np.sin(ang1) elif dim == 3: ang1 = self.random.uniform(0.0, 2 * np.pi, size) ang2 = self.random.uniform(-1.0, 1.0, size) coord[0] = np.sqrt(1.0 - ang2**2) * np.cos(ang1) coord[1] = np.sqrt(1.0 - ang2**2) * np.sin(ang1) coord[2] = ang2 else: # pragma: no cover # http://corysimon.github.io/articles/uniformdistn-on-sphere/ coord = self.random.normal(size=coord.shape) while True: # loop until all norms are non-zero norm = np.linalg.norm(coord, axis=0) # check for zero norms zero_norms = np.isclose(norm, 0) # exit the loop if all norms are non-zero if not np.any(zero_norms): break # transpose, since the next transpose reverses axis order zero_samples = zero_norms.T.nonzero() # need to transpose to have dim-axis last new_shape = coord.T[zero_samples].shape # resample the zero norm samples coord.T[zero_samples] = self.random.normal(size=new_shape) # project onto sphere coord = coord / norm return np.reshape(coord, dim) if size is None else coord
@property def random(self): """:any:`numpy.random.RandomState`: Randomstate. Get a stream to the numpy Random number generator. You can use this, to call any provided distribution from :any:`numpy.random.RandomState`. """ return rand.RandomState(self._master_rng()) @property # pragma: no cover def seed(self): """:class:`int`: Seed of the master RNG. The setter property not only saves the new seed, but also creates a new master RNG function with the new seed. """ return self._master_rng.seed @seed.setter def seed(self, new_seed=None): self._master_rng = MasterRNG(new_seed) def __repr__(self): """Return String representation.""" return f"RNG(seed={self.seed})"