Source code for gstools.random.rng

# -*- coding: utf-8 -*-
"""
GStools subpackage providing the core of the spatial random field generation.

.. currentmodule:: gstools.random.rng

The following classes are provided

.. autosummary::
   RNG
"""
# pylint: disable=no-member
from __future__ import division, absolute_import, print_function

import numpy as np
import numpy.random as rand
import emcee as mc
from gstools.random.tools import MasterRNG, dist_gen

__all__ = ["RNG"]

MC_VER = int(mc.__version__.split(".")[0])


[docs]class RNG(object): """ A random number generator for different distributions and multiple streams. Parameters ---------- seed : :class:`int` or :any:`None`, optional The seed of the master RNG, if ``None``, a random seed is used. Default: ``None`` """ def __init__(self, seed=None): """Initialize a random number generator""" # set seed self._master_rng = None self.seed = seed
[docs] def sample_ln_pdf( self, ln_pdf, size=None, sample_around=1.0, nwalkers=50, burn_in=20, oversampling_factor=10, ): """Sample from a distribution given by ln(pdf) This algorithm uses the :any:`emcee.EnsembleSampler` Parameters ---------- ln_pdf : :any:`callable` The logarithm of the Probability density function of the given distribution, that takes a single argument size : :class:`int` or :any:`None`, optional sample size. Default: None sample_around : :class:`float`, optional Starting point for initial guess Default: 1. nwalkers : :class:`int`, optional The number of walkers in the mcmc sampler. Used for the emcee.EnsembleSampler class. Default: 100 burn_in : :class:`int`, optional Number of burn-in runs in the mcmc algorithm. Default: 100 oversampling_factor : :class:`int`, optional To guess the sample number needed for proper results, we use a factor for oversampling. The intern used sample-size is calculated by ``sample_size = max(burn_in, (size/nwalkers)*oversampling_factor)`` So at least, as much as the burn-in runs. Default: 10 """ if size is None: sample_size = burn_in else: sample_size = max(burn_in, (size / nwalkers) * oversampling_factor) # initial guess init_guess = ( self.random.rand(nwalkers).reshape((nwalkers, 1)) * sample_around ) # initialize the sampler sampler = mc.EnsembleSampler(nwalkers, 1, ln_pdf) # burn in phase with saving of last position ##################### mc 2 and 3 compatibility if MC_VER < 3: # pragma: no cover burn_in_state, __, __ = sampler.run_mcmc( pos0=init_guess, N=burn_in, rstate0=self.random.get_state() ) else: # pragma: no cover from emcee.state import State initial_state = State(init_guess, copy=True) initial_state.random_state = self.random.get_state() burn_in_state = sampler.run_mcmc( initial_state=initial_state, nsteps=burn_in ) ##################### mc 2 and 3 compatibility # reset after burn_in sampler.reset() # actual sampling ##################### mc 2 and 3 compatibility if MC_VER < 3: # pragma: no cover sampler.run_mcmc( pos0=burn_in_state, N=sample_size, rstate0=self.random.get_state(), ) samples = sampler.flatchain[:, 0] else: # pragma: no cover from emcee.state import State initial_state = State(burn_in_state, copy=True) initial_state.random_state = self.random.get_state() sampler.run_mcmc(initial_state=initial_state, nsteps=sample_size) samples = sampler.get_chain(flat=True)[:, 0] ##################### mc 2 and 3 compatibility # choose samples according to size return self.random.choice(samples, size)
[docs] def sample_dist(self, pdf=None, cdf=None, ppf=None, size=None, **kwargs): """Sample from a distribution given by pdf, cdf and/or ppf Parameters ---------- pdf : :any:`callable` or :any:`None`, optional Probability density function of the given distribution, that takes a single argument Default: ``None`` cdf : :any:`callable` or :any:`None`, optional Cumulative distribution function of the given distribution, that takes a single argument Default: ``None`` ppf : :any:`callable` or :any:`None`, optional Percent point function of the given distribution, that takes a single argument Default: ``None`` size : :class:`int` or :any:`None`, optional sample size. Default: None **kwargs Keyword-arguments that are forwarded to :any:`scipy.stats.rv_continuous`. Returns ------- samples : :class:`float` or :class:`numpy.ndarray` the samples from the given distribution Notes ----- At least pdf or cdf needs to be given. """ kwargs["seed"] = self.random dist = dist_gen(pdf_in=pdf, cdf_in=cdf, ppf_in=ppf, **kwargs) return dist.rvs(size=size)
[docs] def sample_sphere(self, dim, size=None): """Uniform sampling on a d-dimensional sphere Parameters ---------- dim : :class:`int` Dimension of the sphere. Just 1, 2, and 3 supported. size : :class:`int`, optional sample size Returns ------- coord : :class:`numpy.ndarray` x[, y[, z]] coordinates on the sphere with shape (dim, size) """ if size is None: coord = np.empty(dim, dtype=float) else: coord = np.empty((dim, size), dtype=float) if dim == 1: coord[0] = self.random.choice([-1, 1], size=size) elif dim == 2: ang1 = self.random.uniform(0.0, 2 * np.pi, size) coord[0] = np.cos(ang1) coord[1] = np.sin(ang1) elif dim == 3: ang1 = self.random.uniform(0.0, 2 * np.pi, size) ang2 = self.random.uniform(-1.0, 1.0, size) coord[0] = np.sqrt(1.0 - ang2 ** 2) * np.cos(ang1) coord[1] = np.sqrt(1.0 - ang2 ** 2) * np.sin(ang1) coord[2] = ang2 return coord
@property def random(self): """:any:`numpy.random.RandomState`: Get a stream to the numpy Random number generator You can use this, to call any provided distribution from :any:`numpy.random.RandomState`. """ return rand.RandomState(self._master_rng()) @property # pragma: no cover def seed(self): """:class:`int`: the seed of the master RNG The setter property not only saves the new seed, but also creates a new master RNG function with the new seed. """ return self._master_rng.seed @seed.setter def seed(self, new_seed=None): self._master_rng = MasterRNG(new_seed) def __str__(self): return self.__repr__() def __repr__(self): return "RNG(seed={})".format(self.seed)
if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod()