Source code for welltestpy.estimate.steady_lib

"""welltestpy subpackage providing base classes for steady state estimations."""
import os
import time as timemodule
from copy import deepcopy as dcopy

import numpy as np
import spotpy

from ..data import testslib
from ..process import processlib
from ..tools import plotter
from . import spotpylib

__all__ = [
    "SteadyPumping",
]


[docs]class SteadyPumping: """Class to estimate steady Type-Curve parameters. Parameters ---------- name : :class:`str` Name of the Estimation. campaign : :class:`welltestpy.data.Campaign` The pumping test campaign which should be used to estimate the parameters type_curve : :any:`callable` The given type-curve. Output will be reshaped to flat array. val_ranges : :class:`dict` Dictionary containing the fit-ranges for each value in the type-curve. Names should be as in the type-curve signature. Ranges should be a tuple containing min and max value. make_steady : :class:`bool`, optional State if the tests should be converted to steady observations. See: :any:`PumpingTest.make_steady`. Default: True val_fix : :class:`dict` or :any:`None` Dictionary containing fixed values for the type-curve. Names should be as in the type-curve signature. Default: None val_fit_type : :class:`dict` or :any:`None` Dictionary containing fitting transformation type for each value. Names should be as in the type-curve signature. val_fit_type can be "lin", "log", "exp", "sqrt", "quad", "inv" or a tuple of two callable functions where the first is the transformation and the second is its inverse. "log" is for example equivalent to ``(np.log, np.exp)``. By default, values will be fitted linear. Default: None val_fit_name : :class:`dict` or :any:`None` Display name of the fitting transformation. Will be the val_fit_type string if it is a predefined one, or ``f`` if it is a given callable as default for each value. Default: None val_plot_names : :class:`dict` or :any:`None` Dictionary containing keyword names in the type-curve for each value. {value-name: string for plot legend} This is useful to get better plots. By default, parameter names will be value names. Default: None testinclude : :class:`dict`, optional Dictionary of which tests should be included. If ``None`` is given, all available tests are included. Default: ``None`` generate : :class:`bool`, optional State if time stepping, processed observation data and estimation setup should be generated with default values. Default: ``False`` """ def __init__( self, name, campaign, type_curve, val_ranges, make_steady=True, val_fix=None, val_fit_type=None, val_fit_name=None, val_plot_names=None, testinclude=None, generate=False, ): val_fix = {} if val_fix is None else val_fix self.setup_kw = { "type_curve": type_curve, "val_ranges": val_ranges, "val_fix": val_fix, "val_fit_type": val_fit_type, "val_fit_name": val_fit_name, "val_plot_names": val_plot_names, } """:class:`dict`: TypeCurve Spotpy Setup definition""" self.name = name """:class:`str`: Name of the Estimation""" self.campaign_raw = dcopy(campaign) """:class:`welltestpy.data.Campaign`:\ Copy of the original input campaign""" self.campaign = dcopy(campaign) """:class:`welltestpy.data.Campaign`:\ Copy of the input campaign to be modified""" self.prate = None """:class:`float`: Pumpingrate at the pumping well""" self.rad = None """:class:`numpy.ndarray`: array of the radii from the wells""" self.data = None """:class:`numpy.ndarray`: observation data""" self.radnames = None """:class:`numpy.ndarray`: names of the radii well combination""" self.r_ref = None """:class:`float`: reference radius of the biggest distance""" self.h_ref = None """:class:`float`: reference head at the biggest distance""" self.estimated_para = {} """:class:`dict`: estimated parameters by name""" self.result = None """:class:`list`: result of the spotpy estimation""" self.sens = None """:class:`dict`: result of the spotpy sensitivity analysis""" self.testinclude = {} """:class:`dict`: dictionary of which tests should be included""" if testinclude is None: tests = list(self.campaign.tests.keys()) self.testinclude = {} for test in tests: self.testinclude[test] = self.campaign.tests[ test ].observationwells elif not isinstance(testinclude, dict): self.testinclude = {} for test in testinclude: self.testinclude[test] = self.campaign.tests[ test ].observationwells else: self.testinclude = testinclude for test in self.testinclude: if not isinstance(self.campaign.tests[test], testslib.PumpingTest): raise ValueError(test + " is not a pumping test.") if make_steady is not False: if make_steady is True: make_steady = "latest" self.campaign.tests[test].make_steady(make_steady) if not self.campaign.tests[test].constant_rate: raise ValueError(test + " is not a constant rate test.") if ( not self.campaign.tests[test].state( wells=self.testinclude[test] ) == "steady" ): raise ValueError(test + ": selection is not steady.") rwell_list = [] rinf_list = [] for test in self.testinclude: pwell = self.campaign.tests[test].pumpingwell rwell_list.append(self.campaign.wells[pwell].radius) rinf_list.append(self.campaign.tests[test].radius) self.rwell = min(rwell_list) """:class:`float`: radius of the pumping wells""" self.rinf = max(rinf_list) """:class:`float`: radius of the furthest wells""" if generate: self.setpumprate() self.gen_data() self.gen_setup()
[docs] def setpumprate(self, prate=-1.0): """Set a uniform pumping rate at all pumpingwells wells. We assume linear scaling by the pumpingrate. Parameters ---------- prate : :class:`float`, optional Pumping rate. Default: ``-1.0`` """ for test in self.testinclude: processlib.normpumptest( self.campaign.tests[test], pumpingrate=prate ) self.prate = prate
[docs] def gen_data(self): """Generate the observed drawdown. It will also generate an array containing all radii of all well combinations. """ rad = np.array([]) data = np.array([]) radnames = [] for test in self.testinclude: pwell = self.campaign.wells[self.campaign.tests[test].pumpingwell] for obs in self.testinclude[test]: temphead = self.campaign.tests[test].observations[obs]() data = np.hstack((data, temphead)) owell = self.campaign.wells[obs] if pwell == owell: temprad = pwell.radius else: temprad = pwell - owell rad = np.hstack((rad, temprad)) tempname = (self.campaign.tests[test].pumpingwell, obs) radnames.append(tempname) # sort everything by the radii idx = rad.argsort() radnames = np.array(radnames) self.rad = rad[idx] self.data = data[idx] self.radnames = radnames[idx] self.r_ref = self.rad[-1] self.h_ref = self.data[-1]
[docs] def gen_setup( self, prate_kw="rate", rad_kw="rad", r_ref_kw="r_ref", h_ref_kw="h_ref", dummy=False, ): """Generate the Spotpy Setup. Parameters ---------- prate_kw : :class:`str`, optional Keyword name for the pumping rate in the used type curve. Default: "rate" rad_kw : :class:`str`, optional Keyword name for the radius in the used type curve. Default: "rad" r_ref_kw : :class:`str`, optional Keyword name for the reference radius in the used type curve. Default: "r_ref" h_ref_kw : :class:`str`, optional Keyword name for the reference head in the used type curve. Default: "h_ref" dummy : :class:`bool`, optional Add a dummy parameter to the model. This could be used to equalize sensitivity analysis. Default: False """ self.extra_kw_names = {"rad": rad_kw} setup_kw = dcopy(self.setup_kw) # create a copy here setup_kw["val_fix"].setdefault(prate_kw, self.prate) setup_kw["val_fix"].setdefault(rad_kw, self.rad) setup_kw["val_fix"].setdefault(r_ref_kw, self.r_ref) setup_kw["val_fix"].setdefault(h_ref_kw, self.h_ref) setup_kw.setdefault("data", self.data) setup_kw["dummy"] = dummy self.setup = spotpylib.TypeCurve(**setup_kw)
[docs] def run( self, rep=5000, parallel="seq", run=True, folder=None, dbname=None, traceplotname=None, fittingplotname=None, interactplotname=None, estname=None, plot_style="WTP", ): """Run the estimation. Parameters ---------- rep : :class:`int`, optional The number of repetitions within the SCEua algorithm in spotpy. Default: ``5000`` parallel : :class:`str`, optional State if the estimation should be run in parallel or not. Options: * ``"seq"``: sequential on one CPU * ``"mpi"``: use the mpi4py package Default: ``"seq"`` run : :class:`bool`, optional State if the estimation should be executed. Otherwise all plots will be done with the previous results. Default: ``True`` folder : :class:`str`, optional Path to the output folder. If ``None`` the CWD is used. Default: ``None`` dbname : :class:`str`, optional File-name of the database of the spotpy estimation. If ``None``, it will be the current time + ``"_db"``. Default: ``None`` traceplotname : :class:`str`, optional File-name of the parameter trace plot of the spotpy estimation. If ``None``, it will be the current time + ``"_paratrace.pdf"``. Default: ``None`` fittingplotname : :class:`str`, optional File-name of the fitting plot of the estimation. If ``None``, it will be the current time + ``"_fit.pdf"``. Default: ``None`` interactplotname : :class:`str`, optional File-name of the parameter interaction plot of the spotpy estimation. If ``None``, it will be the current time + ``"_parainteract.pdf"``. Default: ``None`` estname : :class:`str`, optional File-name of the results of the spotpy estimation. If ``None``, it will be the current time + ``"_estimate"``. Default: ``None`` plot_style : str, optional Plot style. The default is "WTP". """ if self.setup.dummy: raise ValueError( "Estimate: for parameter estimation" " you can't use a dummy parameter." ) act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S") # generate the filenames if folder is None: folder = os.path.join(os.getcwd(), self.name) folder = os.path.abspath(folder) if not os.path.exists(folder): os.makedirs(folder) if dbname is None: dbname = os.path.join(folder, act_time + "_db") elif not os.path.isabs(dbname): dbname = os.path.join(folder, dbname) if traceplotname is None: traceplotname = os.path.join(folder, act_time + "_paratrace.pdf") elif not os.path.isabs(traceplotname): traceplotname = os.path.join(folder, traceplotname) if fittingplotname is None: fittingplotname = os.path.join(folder, act_time + "_fit.pdf") elif not os.path.isabs(fittingplotname): fittingplotname = os.path.join(folder, fittingplotname) if interactplotname is None: interactplotname = os.path.join(folder, act_time + "_interact.pdf") elif not os.path.isabs(interactplotname): interactplotname = os.path.join(folder, interactplotname) if estname is None: paraname = os.path.join(folder, act_time + "_estimate.txt") elif not os.path.isabs(estname): paraname = os.path.join(folder, estname) # generate the parameter-names for plotting paranames = dcopy(self.setup.para_names) paralabels = [] for name in paranames: p_label = self.setup.val_plot_names[name] fit_n = self.setup.val_fit_name[name] paralabels.append(f"{fit_n}({p_label})" if fit_n else p_label) if parallel == "mpi": # send the dbname of rank0 from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() if rank == 0: print(rank, "send dbname:", dbname) for i in range(1, size): comm.send(dbname, dest=i, tag=0) else: dbname = comm.recv(source=0, tag=0) print(rank, "got dbname:", dbname) else: rank = 0 # initialize the sampler sampler = spotpy.algorithms.sceua( self.setup, dbname=dbname, dbformat="csv", parallel=parallel, save_sim=True, db_precision=np.float64, ) # start the estimation with the sce-ua algorithm if run: sampler.sample(rep, ngs=10, kstop=100, pcento=1e-4, peps=1e-3) if rank == 0: if run: self.result = sampler.getdata() else: self.result = np.genfromtxt( dbname + ".csv", delimiter=",", names=True ) para_opt = spotpy.analyser.get_best_parameterset( self.result, maximize=False ) void_names = para_opt.dtype.names para = [] header = [] for name in void_names: para.append(para_opt[0][name]) fit_n = self.setup.val_fit_name[name[3:]] header.append(f"{fit_n}-{name[3:]}" if fit_n else name[3:]) self.estimated_para[name[3:]] = para[-1] np.savetxt(paraname, para, header=" ".join(header)) # plot the estimation-results plotter.plotparatrace( result=self.result, parameternames=paranames, parameterlabels=paralabels, stdvalues=self.estimated_para, plotname=traceplotname, style=plot_style, ) plotter.plotfit_steady( setup=self.setup, data=self.data, para=self.estimated_para, rad=self.rad, radnames=self.radnames, extra=self.extra_kw_names, plotname=fittingplotname, style=plot_style, ) plotter.plotparainteract( self.result, paralabels, interactplotname, style=plot_style )
[docs] def sensitivity( self, rep=None, parallel="seq", folder=None, dbname=None, plotname=None, traceplotname=None, sensname=None, plot_style="WTP", ): """Run the sensitivity analysis. Parameters ---------- rep : :class:`int`, optional The number of repetitions within the FAST algorithm in spotpy. Default: estimated parallel : :class:`str`, optional State if the estimation should be run in parallel or not. Options: * ``"seq"``: sequential on one CPU * ``"mpi"``: use the mpi4py package Default: ``"seq"`` folder : :class:`str`, optional Path to the output folder. If ``None`` the CWD is used. Default: ``None`` dbname : :class:`str`, optional File-name of the database of the spotpy estimation. If ``None``, it will be the current time + ``"_sensitivity_db"``. Default: ``None`` plotname : :class:`str`, optional File-name of the result plot of the sensitivity analysis. If ``None``, it will be the current time + ``"_sensitivity.pdf"``. Default: ``None`` traceplotname : :class:`str`, optional File-name of the parameter trace plot of the spotpy sensitivity analysis. If ``None``, it will be the current time + ``"_senstrace.pdf"``. Default: ``None`` sensname : :class:`str`, optional File-name of the results of the FAST estimation. If ``None``, it will be the current time + ``"_estimate"``. Default: ``None`` plot_style : str, optional Plot style. The default is "WTP". """ if len(self.setup.para_names) == 1 and not self.setup.dummy: raise ValueError( "Sensitivity: for estimation with only one parameter" " you have to use a dummy parameter." ) if rep is None: rep = spotpylib.fast_rep( len(self.setup.para_names) + int(self.setup.dummy) ) act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S") # generate the filenames if folder is None: folder = os.path.join(os.getcwd(), self.name) folder = os.path.abspath(folder) if not os.path.exists(folder): os.makedirs(folder) if dbname is None: dbname = os.path.join(folder, act_time + "_sensitivity_db") elif not os.path.isabs(dbname): dbname = os.path.join(folder, dbname) if plotname is None: plotname = os.path.join(folder, act_time + "_sensitivity.pdf") elif not os.path.isabs(plotname): plotname = os.path.join(folder, plotname) if traceplotname is None: traceplotname = os.path.join(folder, act_time + "_senstrace.pdf") elif not os.path.isabs(traceplotname): traceplotname = os.path.join(folder, traceplotname) if sensname is None: sensname = os.path.join(folder, act_time + "_FAST_estimate.txt") elif not os.path.isabs(sensname): sensname = os.path.join(folder, sensname) sens_base, sens_ext = os.path.splitext(sensname) sensname1 = sens_base + "_S1" + sens_ext # generate the parameter-names for plotting paranames = dcopy(self.setup.para_names) paralabels = [] for name in paranames: p_label = self.setup.val_plot_names[name] fit_n = self.setup.val_fit_name[name] paralabels.append(f"{fit_n}({p_label})" if fit_n else p_label) if self.setup.dummy: paranames.append("dummy") paralabels.append("dummy") if parallel == "mpi": # send the dbname of rank0 from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() if rank == 0: print(rank, "send dbname:", dbname) for i in range(1, size): comm.send(dbname, dest=i, tag=0) else: dbname = comm.recv(source=0, tag=0) print(rank, "got dbname:", dbname) else: rank = 0 # initialize the sampler sampler = spotpy.algorithms.fast( self.setup, dbname=dbname, dbformat="csv", parallel=parallel, save_sim=True, db_precision=np.float64, ) sampler.sample(rep) if rank == 0: data = sampler.getdata() parmin = sampler.parameter()["minbound"] parmax = sampler.parameter()["maxbound"] bounds = list(zip(parmin, parmax)) sens_est = sampler.analyze( bounds, np.nan_to_num(data["like1"]), len(paranames), paranames ) self.sens = {} for sen_typ in sens_est: self.sens[sen_typ] = { par: sen for par, sen in zip(paranames, sens_est[sen_typ]) } header = " ".join(paranames) np.savetxt(sensname, sens_est["ST"], header=header) np.savetxt(sensname1, sens_est["S1"], header=header) plotter.plotsensitivity( paralabels, sens_est, plotname, style=plot_style ) plotter.plotparatrace( data, parameternames=paranames, parameterlabels=paralabels, stdvalues=None, plotname=traceplotname, style=plot_style, )