Source code for welltestpy.estimate.steady_lib

# -*- coding: utf-8 -*-
"""
welltestpy subpackage providing base classe for steady state estimations.

.. currentmodule:: welltestpy.estimate.steady_lib

The following classes are provided

.. autosummary::
   SteadyPumping
"""
from copy import deepcopy as dcopy
import os
import time as timemodule

import numpy as np
import spotpy

from ..data import testslib
from ..process import processlib
from . import spotpylib
from ..tools import plotter

__all__ = [
    "SteadyPumping",
]


[docs]class SteadyPumping: """Class to estimate steady Type-Curve parameters. Parameters ---------- name : :class:`str` Name of the Estimation. campaign : :class:`welltestpy.data.Campaign` The pumping test campaign which should be used to estimate the paramters type_curve : :any:`callable` The given type-curve. Output will be reshaped to flat array. val_ranges : :class:`dict` Dictionary containing the fit-ranges for each value in the type-curve. Names should be as in the type-curve signiture or replaced in val_kw_names. Ranges should be a tuple containing min and max value. make_steady : :class:`bool`, optional State if the tests should be converted to steady observations. See: :any:`PumpingTest.make_steady`. Default: True val_fix : :class:`dict` or :any:`None` Dictionary containing fixed values for the type-curve. Names should be as in the type-curve signiture or replaced in val_kw_names. Default: None fit_type : :class:`dict` or :any:`None` Dictionary containing fitting type for each value in the type-curve. Names should be as in the type-curve signiture or replaced in val_kw_names. fit_type can be "lin", "log" (np.exp(val) will be used) or a callable function. By default, values will be fit linearly. Default: None val_kw_names : :class:`dict` or :any:`None` Dictionary containing keyword names in the type-curve for each value. {value-name: kwargs-name in type_curve} This is usefull if fitting is not done by linear values. By default, parameter names will be value names. Default: None val_plot_names : :class:`dict` or :any:`None` Dictionary containing keyword names in the type-curve for each value. {value-name: string for plot legend} This is usefull to get better plots. By default, parameter names will be value names. Default: None testinclude : :class:`dict`, optional dictonary of which tests should be included. If ``None`` is given, all available tests are included. Default: ``None`` generate : :class:`bool`, optional State if time stepping, processed observation data and estimation setup should be generated with default values. Default: ``False`` """ def __init__( self, name, campaign, type_curve, val_ranges, make_steady=True, val_fix=None, fit_type=None, val_kw_names=None, val_plot_names=None, testinclude=None, generate=False, ): val_fix = {} if val_fix is None else val_fix fit_type = {} if fit_type is None else fit_type val_kw_names = {} if val_kw_names is None else val_kw_names val_plot_names = {} if val_plot_names is None else val_plot_names self.setup_kw = { "type_curve": type_curve, "val_ranges": val_ranges, "val_fix": val_fix, "fit_type": fit_type, "val_kw_names": val_kw_names, "val_plot_names": val_plot_names, } """:class:`dict`: TypeCurve Spotpy Setup definition""" self.name = name """:class:`str`: Name of the Estimation""" self.campaign_raw = dcopy(campaign) """:class:`welltestpy.data.Campaign`:\ Copy of the original input campaign""" self.campaign = dcopy(campaign) """:class:`welltestpy.data.Campaign`:\ Copy of the input campaign to be modified""" self.prate = None """:class:`float`: Pumpingrate at the pumping well""" self.rad = None """:class:`numpy.ndarray`: array of the radii from the wells""" self.data = None """:class:`numpy.ndarray`: observation data""" self.radnames = None """:class:`numpy.ndarray`: names of the radii well combination""" self.r_ref = None """:class:`float`: reference radius of the biggest distance""" self.h_ref = None """:class:`float`: reference head at the biggest distance""" self.estimated_para = {} """:class:`dict`: estimated parameters by name""" self.result = None """:class:`list`: result of the spotpy estimation""" self.sens = None """:class:`dict`: result of the spotpy sensitivity analysis""" self.testinclude = {} """:class:`dict`: dictonary of which tests should be included""" if testinclude is None: tests = list(self.campaign.tests.keys()) self.testinclude = {} for test in tests: self.testinclude[test] = self.campaign.tests[ test ].observationwells elif not isinstance(testinclude, dict): self.testinclude = {} for test in testinclude: self.testinclude[test] = self.campaign.tests[ test ].observationwells else: self.testinclude = testinclude for test in self.testinclude: if not isinstance(self.campaign.tests[test], testslib.PumpingTest): raise ValueError(test + " is not a pumping test.") if make_steady is not False: if make_steady is True: make_steady = "latest" self.campaign.tests[test].make_steady(make_steady) if not self.campaign.tests[test].constant_rate: raise ValueError(test + " is not a constant rate test.") if ( not self.campaign.tests[test].state( wells=self.testinclude[test] ) == "steady" ): raise ValueError(test + ": selection is not steady.") rwell_list = [] rinf_list = [] for test in self.testinclude: pwell = self.campaign.tests[test].pumpingwell rwell_list.append(self.campaign.wells[pwell].radius) rinf_list.append(self.campaign.tests[test].radius) self.rwell = min(rwell_list) """:class:`float`: radius of the pumping wells""" self.rinf = max(rinf_list) """:class:`float`: radius of the furthest wells""" if generate: self.setpumprate() self.gen_data() self.gen_setup()
[docs] def setpumprate(self, prate=-1.0): """Set a uniform pumping rate at all pumpingwells wells. We assume linear scaling by the pumpingrate. Parameters ---------- prate : :class:`float`, optional Pumping rate. Default: ``-1.0`` """ for test in self.testinclude: processlib.normpumptest( self.campaign.tests[test], pumpingrate=prate ) self.prate = prate
[docs] def gen_data(self): """Generate the observed drawdown. It will also generate an array containing all radii of all well combinations. """ rad = np.array([]) data = np.array([]) radnames = [] for test in self.testinclude: pwell = self.campaign.wells[self.campaign.tests[test].pumpingwell] for obs in self.testinclude[test]: temphead = self.campaign.tests[test].observations[obs]() data = np.hstack((data, temphead)) owell = self.campaign.wells[obs] if pwell == owell: temprad = pwell.radius else: temprad = pwell - owell rad = np.hstack((rad, temprad)) tempname = (self.campaign.tests[test].pumpingwell, obs) radnames.append(tempname) # sort everything by the radii idx = rad.argsort() radnames = np.array(radnames) self.rad = rad[idx] self.data = data[idx] self.radnames = radnames[idx] self.r_ref = self.rad[-1] self.h_ref = self.data[-1]
[docs] def gen_setup( self, prate_kw="rate", rad_kw="rad", r_ref_kw="r_ref", h_ref_kw="h_ref", dummy=False, ): """Generate the Spotpy Setup. Parameters ---------- prate_kw : :class:`str`, optional Keyword name for the pumping rate in the used type curve. Default: "rate" rad_kw : :class:`str`, optional Keyword name for the radius in the used type curve. Default: "rad" r_ref_kw : :class:`str`, optional Keyword name for the reference radius in the used type curve. Default: "r_ref" h_ref_kw : :class:`str`, optional Keyword name for the reference head in the used type curve. Default: "h_ref" dummy : :class:`bool`, optional Add a dummy parameter to the model. This could be used to equalize sensitivity analysis. Default: False """ self.extra_kw_names = { "Qw": prate_kw, "rad": rad_kw, "r_ref": r_ref_kw, "h_ref": h_ref_kw, } self.setup_kw["val_fix"].setdefault(prate_kw, self.prate) self.setup_kw["val_fix"].setdefault(rad_kw, self.rad) self.setup_kw["val_fix"].setdefault(r_ref_kw, self.r_ref) self.setup_kw["val_fix"].setdefault(h_ref_kw, self.h_ref) self.setup_kw.setdefault("data", self.data) self.setup_kw["dummy"] = dummy self.setup = spotpylib.TypeCurve(**self.setup_kw)
[docs] def run( self, rep=5000, parallel="seq", run=True, folder=None, dbname=None, traceplotname=None, fittingplotname=None, interactplotname=None, estname=None, plot_style="WTP", ): """Run the estimation. Parameters ---------- rep : :class:`int`, optional The number of repetitions within the SCEua algorithm in spotpy. Default: ``5000`` parallel : :class:`str`, optional State if the estimation should be run in parallel or not. Options: * ``"seq"``: sequential on one CPU * ``"mpi"``: use the mpi4py package Default: ``"seq"`` run : :class:`bool`, optional State if the estimation should be executed. Otherwise all plots will be done with the previous results. Default: ``True`` folder : :class:`str`, optional Path to the output folder. If ``None`` the CWD is used. Default: ``None`` dbname : :class:`str`, optional File-name of the database of the spotpy estimation. If ``None``, it will be the current time + ``"_db"``. Default: ``None`` traceplotname : :class:`str`, optional File-name of the parameter trace plot of the spotpy estimation. If ``None``, it will be the current time + ``"_paratrace.pdf"``. Default: ``None`` fittingplotname : :class:`str`, optional File-name of the fitting plot of the estimation. If ``None``, it will be the current time + ``"_fit.pdf"``. Default: ``None`` interactplotname : :class:`str`, optional File-name of the parameter interaction plot of the spotpy estimation. If ``None``, it will be the current time + ``"_parainteract.pdf"``. Default: ``None`` estname : :class:`str`, optional File-name of the results of the spotpy estimation. If ``None``, it will be the current time + ``"_estimate"``. Default: ``None`` plot_style : str, optional Plot stlye. The default is "WTP". """ if self.setup.dummy: raise ValueError( "Estimate: for parameter estimation" " you can't use a dummy paramter." ) act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S") # generate the filenames if folder is None: folder = os.path.join(os.getcwd(), self.name) folder = os.path.abspath(folder) if not os.path.exists(folder): os.makedirs(folder) if dbname is None: dbname = os.path.join(folder, act_time + "_db") elif not os.path.isabs(dbname): dbname = os.path.join(folder, dbname) if traceplotname is None: traceplotname = os.path.join(folder, act_time + "_paratrace.pdf") elif not os.path.isabs(traceplotname): traceplotname = os.path.join(folder, traceplotname) if fittingplotname is None: fittingplotname = os.path.join(folder, act_time + "_fit.pdf") elif not os.path.isabs(fittingplotname): fittingplotname = os.path.join(folder, fittingplotname) if interactplotname is None: interactplotname = os.path.join(folder, act_time + "_interact.pdf") elif not os.path.isabs(interactplotname): interactplotname = os.path.join(folder, interactplotname) if estname is None: paraname = os.path.join(folder, act_time + "_estimate.txt") elif not os.path.isabs(estname): paraname = os.path.join(folder, estname) # generate the parameter-names for plotting paranames = dcopy(self.setup.para_names) paralabels = [self.setup.val_plot_names[name] for name in paranames] if parallel == "mpi": # send the dbname of rank0 from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() if rank == 0: print(rank, "send dbname:", dbname) for i in range(1, size): comm.send(dbname, dest=i, tag=0) else: dbname = comm.recv(source=0, tag=0) print(rank, "got dbname:", dbname) else: rank = 0 # initialize the sampler sampler = spotpy.algorithms.sceua( self.setup, dbname=dbname, dbformat="csv", parallel=parallel, save_sim=True, db_precision=np.float64, ) # start the estimation with the sce-ua algorithm if run: sampler.sample(rep, ngs=10, kstop=100, pcento=1e-4, peps=1e-3) if rank == 0: if run: self.result = sampler.getdata() else: self.result = np.genfromtxt( dbname + ".csv", delimiter=",", names=True ) para_opt = spotpy.analyser.get_best_parameterset( self.result, maximize=False ) void_names = para_opt.dtype.names para = [] header = [] for name in void_names: para.append(para_opt[0][name]) header.append(name[3:]) self.estimated_para[header[-1]] = para[-1] np.savetxt(paraname, para, header=" ".join(header)) # plot the estimation-results plotter.plotparatrace( result=self.result, parameternames=paranames, parameterlabels=paralabels, stdvalues=self.estimated_para, plotname=traceplotname, style=plot_style, ) plotter.plotfit_steady( setup=self.setup, data=self.data, para=self.estimated_para, rad=self.rad, radnames=self.radnames, extra=self.extra_kw_names, plotname=fittingplotname, style=plot_style, ) plotter.plotparainteract( self.result, paralabels, interactplotname, style=plot_style )
[docs] def sensitivity( self, rep=None, parallel="seq", folder=None, dbname=None, plotname=None, traceplotname=None, sensname=None, plot_style="WTP", ): """Run the sensitivity analysis. Parameters ---------- rep : :class:`int`, optional The number of repetitions within the FAST algorithm in spotpy. Default: estimated parallel : :class:`str`, optional State if the estimation should be run in parallel or not. Options: * ``"seq"``: sequential on one CPU * ``"mpi"``: use the mpi4py package Default: ``"seq"`` folder : :class:`str`, optional Path to the output folder. If ``None`` the CWD is used. Default: ``None`` dbname : :class:`str`, optional File-name of the database of the spotpy estimation. If ``None``, it will be the current time + ``"_sensitivity_db"``. Default: ``None`` plotname : :class:`str`, optional File-name of the result plot of the sensitivity analysis. If ``None``, it will be the current time + ``"_sensitivity.pdf"``. Default: ``None`` traceplotname : :class:`str`, optional File-name of the parameter trace plot of the spotpy sensitivity analysis. If ``None``, it will be the current time + ``"_senstrace.pdf"``. Default: ``None`` sensname : :class:`str`, optional File-name of the results of the FAST estimation. If ``None``, it will be the current time + ``"_estimate"``. Default: ``None`` plot_style : str, optional Plot stlye. The default is "WTP". """ if len(self.setup.para_names) == 1 and not self.setup.dummy: raise ValueError( "Sensitivity: for estimation with only one parameter" " you have to use a dummy paramter." ) if rep is None: rep = spotpylib.fast_rep( len(self.setup.para_names) + int(self.setup.dummy) ) act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S") # generate the filenames if folder is None: folder = os.path.join(os.getcwd(), self.name) folder = os.path.abspath(folder) if not os.path.exists(folder): os.makedirs(folder) if dbname is None: dbname = os.path.join(folder, act_time + "_sensitivity_db") elif not os.path.isabs(dbname): dbname = os.path.join(folder, dbname) if plotname is None: plotname = os.path.join(folder, act_time + "_sensitivity.pdf") elif not os.path.isabs(plotname): plotname = os.path.join(folder, plotname) if traceplotname is None: traceplotname = os.path.join(folder, act_time + "_senstrace.pdf") elif not os.path.isabs(traceplotname): traceplotname = os.path.join(folder, traceplotname) if sensname is None: sensname = os.path.join(folder, act_time + "_FAST_estimate.txt") elif not os.path.isabs(sensname): sensname = os.path.join(folder, sensname) sens_base, sens_ext = os.path.splitext(sensname) sensname1 = sens_base + "_S1" + sens_ext # generate the parameter-names for plotting paranames = dcopy(self.setup.para_names) paralabels = [self.setup.val_plot_names[name] for name in paranames] if self.setup.dummy: paranames.append("dummy") paralabels.append("dummy") if parallel == "mpi": # send the dbname of rank0 from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() if rank == 0: print(rank, "send dbname:", dbname) for i in range(1, size): comm.send(dbname, dest=i, tag=0) else: dbname = comm.recv(source=0, tag=0) print(rank, "got dbname:", dbname) else: rank = 0 # initialize the sampler sampler = spotpy.algorithms.fast( self.setup, dbname=dbname, dbformat="csv", parallel=parallel, save_sim=True, db_precision=np.float64, ) sampler.sample(rep) if rank == 0: data = sampler.getdata() parmin = sampler.parameter()["minbound"] parmax = sampler.parameter()["maxbound"] bounds = list(zip(parmin, parmax)) sens_est = sampler.analyze( bounds, np.nan_to_num(data["like1"]), len(paranames), paranames ) self.sens = {} for sen_typ in sens_est: self.sens[sen_typ] = { par: sen for par, sen in zip(paranames, sens_est[sen_typ]) } header = " ".join(paranames) np.savetxt(sensname, sens_est["ST"], header=header) np.savetxt(sensname1, sens_est["S1"], header=header) plotter.plotsensitivity( paralabels, sens_est, plotname, style=plot_style ) plotter.plotparatrace( data, parameternames=paranames, parameterlabels=paralabels, stdvalues=None, plotname=traceplotname, style=plot_style, )