"""welltestpy subpackage providing base classes for transient estimations."""
import os
import time as timemodule
from copy import deepcopy as dcopy
import anaflow as ana
import numpy as np
import spotpy
from ..data import testslib
from ..process import processlib
from ..tools import plotter
from . import spotpylib
__all__ = [
"TransientPumping",
]
[docs]class TransientPumping:
"""Class to estimate transient Type-Curve parameters.
Parameters
----------
name : :class:`str`
Name of the Estimation.
campaign : :class:`welltestpy.data.Campaign`
The pumping test campaign which should be used to estimate the
parameters
type_curve : :any:`callable`
The given type-curve. Output will be reshaped to flat array.
val_ranges : :class:`dict`
Dictionary containing the fit-ranges for each value in the type-curve.
Names should be as in the type-curve signature.
Ranges should be a tuple containing min and max value.
val_fix : :class:`dict` or :any:`None`
Dictionary containing fixed values for the type-curve.
Names should be as in the type-curve signature.
Default: None
val_fit_type : :class:`dict` or :any:`None`
Dictionary containing fitting transformation type for each value.
Names should be as in the type-curve signature.
val_fit_type can be "lin", "log", "exp", "sqrt", "quad", "inv"
or a tuple of two callable functions where the
first is the transformation and the second is its inverse.
"log" is for example equivalent to ``(np.log, np.exp)``.
By default, values will be fitted linear.
Default: None
val_fit_name : :class:`dict` or :any:`None`
Display name of the fitting transformation.
Will be the val_fit_type string if it is a predefined one,
or ``f`` if it is a given callable as default for each value.
Default: None
val_plot_names : :class:`dict` or :any:`None`
Dictionary containing keyword names in the type-curve for each value.
{value-name: string for plot legend}
This is useful to get better plots.
By default, parameter names will be value names.
Default: None
testinclude : :class:`dict`, optional
Dictionary of which tests should be included. If ``None`` is given,
all available tests are included.
Default: ``None``
generate : :class:`bool`, optional
State if time stepping, processed observation data and estimation
setup should be generated with default values.
Default: ``False``
"""
def __init__(
self,
name,
campaign,
type_curve,
val_ranges,
val_fix=None,
val_fit_type=None,
val_fit_name=None,
val_plot_names=None,
testinclude=None,
generate=False,
):
val_fix = val_fix or {}
self.setup_kw = {
"type_curve": type_curve,
"val_ranges": val_ranges,
"val_fix": val_fix,
"val_fit_type": val_fit_type,
"val_fit_name": val_fit_name,
"val_plot_names": val_plot_names,
}
""":class:`dict`: TypeCurve Spotpy Setup definition"""
self.name = name
""":class:`str`: Name of the Estimation"""
self.campaign_raw = dcopy(campaign)
""":class:`welltestpy.data.Campaign`:\
Copy of the original input campaign"""
self.campaign = dcopy(campaign)
""":class:`welltestpy.data.Campaign`:\
Copy of the input campaign to be modified"""
self.prate = None
""":class:`float`: Pumpingrate at the pumping well"""
self.time = None
""":class:`numpy.ndarray`: time points of the observation"""
self.rad = None
""":class:`numpy.ndarray`: array of the radii from the wells"""
self.data = None
""":class:`numpy.ndarray`: observation data"""
self.radnames = None
""":class:`numpy.ndarray`: names of the radii well combination"""
self.estimated_para = {}
""":class:`dict`: estimated parameters by name"""
self.result = None
""":class:`list`: result of the spotpy estimation"""
self.sens = None
""":class:`dict`: result of the spotpy sensitivity analysis"""
self.testinclude = {}
""":class:`dict`: dictionary of which tests should be included"""
if testinclude is None:
tests = list(self.campaign.tests.keys())
self.testinclude = {}
for test in tests:
self.testinclude[test] = self.campaign.tests[
test
].observationwells
elif not isinstance(testinclude, dict):
self.testinclude = {}
for test in testinclude:
self.testinclude[test] = self.campaign.tests[
test
].observationwells
else:
self.testinclude = testinclude
for test in self.testinclude:
if not isinstance(self.campaign.tests[test], testslib.PumpingTest):
raise ValueError(test + " is not a pumping test.")
if not self.campaign.tests[test].constant_rate:
raise ValueError(test + " is not a constant rate test.")
if (
not self.campaign.tests[test].state(
wells=self.testinclude[test]
)
== "transient"
):
raise ValueError(test + ": selection is not transient.")
rwell_list = []
rinf_list = []
for test in self.testinclude:
pwell = self.campaign.tests[test].pumpingwell
rwell_list.append(self.campaign.wells[pwell].radius)
rinf_list.append(self.campaign.tests[test].radius)
self.rwell = min(rwell_list)
""":class:`float`: radius of the pumping wells"""
self.rinf = max(rinf_list)
""":class:`float`: radius of the furthest wells"""
if generate:
self.setpumprate()
self.settime()
self.gen_data()
self.gen_setup()
[docs] def setpumprate(self, prate=-1.0):
"""Set a uniform pumping rate at all pumpingwells wells.
We assume linear scaling by the pumpingrate.
Parameters
----------
prate : :class:`float`, optional
Pumping rate. Default: ``-1.0``
"""
for test in self.testinclude:
processlib.normpumptest(
self.campaign.tests[test], pumpingrate=prate
)
self.prate = prate
[docs] def settime(self, time=None, tmin=10.0, tmax=np.inf, typ="quad", steps=10):
"""Set uniform time points for the observations.
Parameters
----------
time : :class:`numpy.ndarray`, optional
Array of specified time points. If ``None`` is given, they will
be determined by the observation data.
Default: ``None``
tmin : :class:`float`, optional
Minimal time value. It will set a minimal value of 10s.
Default: ``10``
tmax : :class:`float`, optional
Maximal time value.
Default: ``inf``
typ : :class:`str` or :class:`float`, optional
Typ of the time selection. You can select from:
* ``"exp"``: for exponential behavior
* ``"log"``: for logarithmic behavior
* ``"geo"``: for geometric behavior
* ``"lin"``: for linear behavior
* ``"quad"``: for quadratic behavior
* ``"cub"``: for cubic behavior
* :class:`float`: here you can specifi any exponent
("quad" would be equivalent to 2)
Default: "quad"
steps : :class:`int`, optional
Number of generated time steps. Default: 10
"""
if time is None:
for test in self.testinclude:
for obs in self.testinclude[test]:
_, temptime = self.campaign.tests[test].observations[obs]()
tmin = max(tmin, temptime.min())
tmax = min(tmax, temptime.max())
tmin = tmax if tmin > tmax else tmin
time = ana.specialrange(tmin, tmax, steps, typ)
for test in self.testinclude:
for obs in self.testinclude[test]:
processlib.filterdrawdown(
self.campaign.tests[test].observations[obs], tout=time
)
self.time = time
[docs] def gen_data(self):
"""Generate the observed drawdown at given time points.
It will also generate an array containing all radii of all well
combinations.
"""
rad = np.array([])
data = None
radnames = []
for test in self.testinclude:
pwell = self.campaign.wells[self.campaign.tests[test].pumpingwell]
for obs in self.testinclude[test]:
temphead, _ = self.campaign.tests[test].observations[obs]()
temphead = np.array(temphead).reshape(-1)[np.newaxis].T
if data is None:
data = dcopy(temphead)
else:
data = np.hstack((data, temphead))
owell = self.campaign.wells[obs]
if pwell == owell:
temprad = pwell.radius
else:
temprad = pwell - owell
rad = np.hstack((rad, temprad))
tempname = (self.campaign.tests[test].pumpingwell, obs)
radnames.append(tempname)
# sort everything by the radii
idx = rad.argsort()
radnames = np.array(radnames)
self.rad = rad[idx]
self.data = data[:, idx]
self.radnames = radnames[idx]
[docs] def gen_setup(
self, prate_kw="rate", rad_kw="rad", time_kw="time", dummy=False
):
"""Generate the Spotpy Setup.
Parameters
----------
prate_kw : :class:`str`, optional
Keyword name for the pumping rate in the used type curve.
Default: "rate"
rad_kw : :class:`str`, optional
Keyword name for the radius in the used type curve.
Default: "rad"
time_kw : :class:`str`, optional
Keyword name for the time in the used type curve.
Default: "time"
dummy : :class:`bool`, optional
Add a dummy parameter to the model. This could be used to equalize
sensitivity analysis.
Default: False
"""
self.extra_kw_names = {"rad": rad_kw, "time": time_kw}
setup_kw = dcopy(self.setup_kw)
setup_kw["val_fix"].setdefault(prate_kw, self.prate)
setup_kw["val_fix"].setdefault(rad_kw, self.rad)
setup_kw["val_fix"].setdefault(time_kw, self.time)
setup_kw.setdefault("data", self.data)
setup_kw["dummy"] = dummy
self.setup = spotpylib.TypeCurve(**setup_kw)
[docs] def run(
self,
rep=5000,
parallel="seq",
run=True,
folder=None,
dbname=None,
traceplotname=None,
fittingplotname=None,
interactplotname=None,
estname=None,
plot_style="WTP",
):
"""Run the estimation.
Parameters
----------
rep : :class:`int`, optional
The number of repetitions within the SCEua algorithm in spotpy.
Default: ``5000``
parallel : :class:`str`, optional
State if the estimation should be run in parallel or not. Options:
* ``"seq"``: sequential on one CPU
* ``"mpi"``: use the mpi4py package
Default: ``"seq"``
run : :class:`bool`, optional
State if the estimation should be executed. Otherwise all plots
will be done with the previous results.
Default: ``True``
folder : :class:`str`, optional
Path to the output folder. If ``None`` the CWD is used.
Default: ``None``
dbname : :class:`str`, optional
File-name of the database of the spotpy estimation.
If ``None``, it will be the current time +
``"_db"``.
Default: ``None``
traceplotname : :class:`str`, optional
File-name of the parameter trace plot of the spotpy estimation.
If ``None``, it will be the current time +
``"_paratrace.pdf"``.
Default: ``None``
fittingplotname : :class:`str`, optional
File-name of the fitting plot of the estimation.
If ``None``, it will be the current time +
``"_fit.pdf"``.
Default: ``None``
interactplotname : :class:`str`, optional
File-name of the parameter interaction plot
of the spotpy estimation.
If ``None``, it will be the current time +
``"_parainteract.pdf"``.
Default: ``None``
estname : :class:`str`, optional
File-name of the results of the spotpy estimation.
If ``None``, it will be the current time +
``"_estimate"``.
Default: ``None``
plot_style : str, optional
Plot style. The default is "WTP".
"""
if self.setup.dummy:
raise ValueError(
"Estimate: for parameter estimation"
" you can't use a dummy paramter."
)
act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S")
# generate the filenames
if folder is None:
folder = os.path.join(os.getcwd(), self.name)
folder = os.path.abspath(folder)
if not os.path.exists(folder):
os.makedirs(folder)
if dbname is None:
dbname = os.path.join(folder, act_time + "_db")
elif not os.path.isabs(dbname):
dbname = os.path.join(folder, dbname)
if traceplotname is None:
traceplotname = os.path.join(folder, act_time + "_paratrace.pdf")
elif not os.path.isabs(traceplotname):
traceplotname = os.path.join(folder, traceplotname)
if fittingplotname is None:
fittingplotname = os.path.join(folder, act_time + "_fit.pdf")
elif not os.path.isabs(fittingplotname):
fittingplotname = os.path.join(folder, fittingplotname)
if interactplotname is None:
interactplotname = os.path.join(folder, act_time + "_interact.pdf")
elif not os.path.isabs(interactplotname):
interactplotname = os.path.join(folder, interactplotname)
if estname is None:
paraname = os.path.join(folder, act_time + "_estimate.txt")
elif not os.path.isabs(estname):
paraname = os.path.join(folder, estname)
# generate the parameter-names for plotting
paranames = dcopy(self.setup.para_names)
paralabels = []
for name in paranames:
p_label = self.setup.val_plot_names[name]
fit_n = self.setup.val_fit_name[name]
paralabels.append(f"{fit_n}({p_label})" if fit_n else p_label)
if parallel == "mpi":
# send the dbname of rank0
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
print(rank, "send dbname:", dbname)
for i in range(1, size):
comm.send(dbname, dest=i, tag=0)
else:
dbname = comm.recv(source=0, tag=0)
print(rank, "got dbname:", dbname)
else:
rank = 0
# initialize the sampler
sampler = spotpy.algorithms.sceua(
self.setup,
dbname=dbname,
dbformat="csv",
parallel=parallel,
save_sim=True,
db_precision=np.float64,
)
# start the estimation with the sce-ua algorithm
if run:
sampler.sample(rep, ngs=10, kstop=100, pcento=1e-4, peps=1e-3)
if rank == 0:
# save best parameter-set
if run:
self.result = sampler.getdata()
else:
self.result = np.genfromtxt(
dbname + ".csv", delimiter=",", names=True
)
para_opt = spotpy.analyser.get_best_parameterset(
self.result, maximize=False
)
void_names = para_opt.dtype.names
para = []
header = []
for name in void_names:
para.append(para_opt[0][name])
fit_n = self.setup.val_fit_name[name[3:]]
header.append(f"{fit_n}-{name[3:]}" if fit_n else name[3:])
self.estimated_para[name[3:]] = para[-1]
np.savetxt(paraname, para, header=" ".join(header))
# plot the estimation-results
plotter.plotparatrace(
self.result,
parameternames=paranames,
parameterlabels=paralabels,
stdvalues=self.estimated_para,
plotname=traceplotname,
style=plot_style,
)
plotter.plotfit_transient(
setup=self.setup,
data=self.data,
para=self.estimated_para,
rad=self.rad,
time=self.time,
radnames=self.radnames,
extra=self.extra_kw_names,
plotname=fittingplotname,
style=plot_style,
)
plotter.plotparainteract(
self.result, paralabels, interactplotname, style=plot_style
)
[docs] def sensitivity(
self,
rep=None,
parallel="seq",
folder=None,
dbname=None,
plotname=None,
traceplotname=None,
sensname=None,
plot_style="WTP",
):
"""Run the sensitivity analysis.
Parameters
----------
rep : :class:`int`, optional
The number of repetitions within the FAST algorithm in spotpy.
Default: estimated
parallel : :class:`str`, optional
State if the estimation should be run in parallel or not. Options:
* ``"seq"``: sequential on one CPU
* ``"mpi"``: use the mpi4py package
Default: ``"seq"``
folder : :class:`str`, optional
Path to the output folder. If ``None`` the CWD is used.
Default: ``None``
dbname : :class:`str`, optional
File-name of the database of the spotpy estimation.
If ``None``, it will be the current time +
``"_sensitivity_db"``.
Default: ``None``
plotname : :class:`str`, optional
File-name of the result plot of the sensitivity analysis.
If ``None``, it will be the current time +
``"_sensitivity.pdf"``.
Default: ``None``
traceplotname : :class:`str`, optional
File-name of the parameter trace plot of the spotpy sensitivity
analysis.
If ``None``, it will be the current time +
``"_senstrace.pdf"``.
Default: ``None``
sensname : :class:`str`, optional
File-name of the results of the FAST estimation.
If ``None``, it will be the current time +
``"_estimate"``.
Default: ``None``
plot_style : str, optional
Plot style. The default is "WTP".
"""
if len(self.setup.para_names) == 1 and not self.setup.dummy:
raise ValueError(
"Sensitivity: for estimation with only one parameter"
" you have to use a dummy parameter."
)
if rep is None:
rep = spotpylib.fast_rep(
len(self.setup.para_names) + int(self.setup.dummy)
)
act_time = timemodule.strftime("%Y-%m-%d_%H-%M-%S")
# generate the filenames
if folder is None:
folder = os.path.join(os.getcwd(), self.name)
folder = os.path.abspath(folder)
if not os.path.exists(folder):
os.makedirs(folder)
if dbname is None:
dbname = os.path.join(folder, act_time + "_sensitivity_db")
elif not os.path.isabs(dbname):
dbname = os.path.join(folder, dbname)
if plotname is None:
plotname = os.path.join(folder, act_time + "_sensitivity.pdf")
elif not os.path.isabs(plotname):
plotname = os.path.join(folder, plotname)
if traceplotname is None:
traceplotname = os.path.join(folder, act_time + "_senstrace.pdf")
elif not os.path.isabs(traceplotname):
traceplotname = os.path.join(folder, traceplotname)
if sensname is None:
sensname = os.path.join(folder, act_time + "_FAST_estimate.txt")
elif not os.path.isabs(sensname):
sensname = os.path.join(folder, sensname)
sens_base, sens_ext = os.path.splitext(sensname)
sensname1 = sens_base + "_S1" + sens_ext
# generate the parameter-names for plotting
paranames = dcopy(self.setup.para_names)
paralabels = []
for name in paranames:
p_label = self.setup.val_plot_names[name]
fit_n = self.setup.val_fit_name[name]
paralabels.append(f"{fit_n}({p_label})" if fit_n else p_label)
if self.setup.dummy:
paranames.append("dummy")
paralabels.append("dummy")
if parallel == "mpi":
# send the dbname of rank0
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
print(rank, "send dbname:", dbname)
for i in range(1, size):
comm.send(dbname, dest=i, tag=0)
else:
dbname = comm.recv(source=0, tag=0)
print(rank, "got dbname:", dbname)
else:
rank = 0
# initialize the sampler
sampler = spotpy.algorithms.fast(
self.setup,
dbname=dbname,
dbformat="csv",
parallel=parallel,
save_sim=True,
db_precision=np.float64,
)
sampler.sample(rep)
if rank == 0:
data = sampler.getdata()
parmin = sampler.parameter()["minbound"]
parmax = sampler.parameter()["maxbound"]
bounds = list(zip(parmin, parmax))
sens_est = sampler.analyze(
bounds, np.nan_to_num(data["like1"]), len(paranames), paranames
)
self.sens = {}
for sen_typ in sens_est:
self.sens[sen_typ] = {
par: sen for par, sen in zip(paranames, sens_est[sen_typ])
}
header = " ".join(paranames)
np.savetxt(sensname, sens_est["ST"], header=header)
np.savetxt(sensname1, sens_est["S1"], header=header)
plotter.plotsensitivity(
paralabels, sens_est, plotname, style=plot_style
)
plotter.plotparatrace(
data,
parameternames=paranames,
parameterlabels=paralabels,
stdvalues=None,
plotname=traceplotname,
style=plot_style,
)