# -*- coding: utf-8 -*-
"""
welltestpy subpackage providing input-output routines.
.. currentmodule:: welltestpy.data.data_io
The following functions are provided
.. autosummary::
"""
import os
import csv
import shutil
import zipfile
import tempfile
from io import TextIOWrapper as TxtIO, BytesIO as BytIO
import numbers
import numpy as np
from packaging.version import parse as version_parse
from . import varlib, campaignlib, testslib
try:
from .._version import __version__
except ImportError: # pragma: nocover
# package is not installed
__version__ = "0.0.0.dev0"
# TOOLS ###
[docs]class LoadError(Exception):
"""Loading error for all reading routines."""
pass
def _formstr(string):
# remove spaces, tabs, linebreaks and other separators
return "".join(str(string).split())
def _formname(string):
# remove slashes
string = "".join(str(string).split(os.path.sep))
# remove spaces, tabs, linebreaks and other separators
return _formstr(string)
def _nextr(data):
return tuple(filter(None, next(data)))
def _check_version(version):
"""At least check major version."""
if version.major > version_parse(__version__).major:
raise ValueError(f"Unknown version '{version.public}'")
# SAVE ###
[docs]def save_var(var, path="", name=None):
"""Save a variable to file.
This writes the variable to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Var_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".var"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Var_" + var.name
# ensure the name ends with '.var'
if name[-4:] != ".var":
name += ".var"
name = _formname(name)
file_path = os.path.join(path, name)
# write the csv-file
with open(file_path, "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Variable"])
writer.writerow(["name", var.name])
writer.writerow(["symbol", var.symbol])
writer.writerow(["units", var.units])
writer.writerow(["description", var.description])
if issubclass(np.asanyarray(var.value).dtype.type, numbers.Integral):
writer.writerow(["integer"])
else:
writer.writerow(["float"])
if var.scalar:
writer.writerow(["scalar"])
writer.writerow(["value", var.value])
else:
writer.writerow(["shape"] + list(np.shape(var.value)))
tmpvalue = np.reshape(var.value, -1)
writer.writerow(["values", len(tmpvalue)])
for val in tmpvalue:
writer.writerow([val])
return file_path
[docs]def save_obs(obs, path="", name=None):
"""Save an observation to file.
This writes the observation to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Obs_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".obs"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Obs_" + obs.name
# ensure the name ends with '.obs'
if name[-4:] != ".obs":
name += ".obs"
name = _formname(name)
# create temporal directory for the included files
patht = tempfile.mkdtemp(dir=path)
# write the csv-file
with open(os.path.join(patht, "info.csv"), "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Observation"])
writer.writerow(["name", obs.name])
writer.writerow(["state", obs.state])
writer.writerow(["description", obs.description])
if obs.state == "steady":
obsname = name[:-4] + "_ObsVar.var"
writer.writerow(["observation", obsname])
obs._observation.save(patht, obsname)
else:
timname = name[:-4] + "_TimVar.var"
obsname = name[:-4] + "_ObsVar.var"
writer.writerow(["time", timname])
writer.writerow(["observation", obsname])
obs._time.save(patht, timname)
obs._observation.save(patht, obsname)
# compress everything to one zip-file
file_path = os.path.join(path, name)
with zipfile.ZipFile(file_path, "w") as zfile:
zfile.write(os.path.join(patht, "info.csv"), "info.csv")
if obs.state == "transient":
zfile.write(os.path.join(patht, timname), timname)
zfile.write(os.path.join(patht, obsname), obsname)
shutil.rmtree(patht, ignore_errors=True)
return file_path
[docs]def save_well(well, path="", name=None):
"""Save a well to file.
This writes the variable to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Well_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".wel"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Well_" + well.name
# ensure the name ends with '.wel'
if name[-4:] != ".wel":
name += ".wel"
name = _formname(name)
# create temporal directory for the included files
patht = tempfile.mkdtemp(dir=path)
# write the csv-file
with open(os.path.join(patht, "info.csv"), "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Well"])
writer.writerow(["name", well.name])
# define names for the variable-files
radiuname = name[:-4] + "_RadVar.var"
coordname = name[:-4] + "_CooVar.var"
welldname = name[:-4] + "_WedVar.var"
aquifname = name[:-4] + "_AqdVar.var"
screename = name[:-4] + "_ScrVar.var"
# save variable-files
writer.writerow(["radius", radiuname])
well.wellradius.save(patht, radiuname)
writer.writerow(["coordinates", coordname])
well.coordinates.save(patht, coordname)
writer.writerow(["welldepth", welldname])
well.welldepth.save(patht, welldname)
writer.writerow(["aquiferdepth", aquifname])
well.aquiferdepth.save(patht, aquifname)
writer.writerow(["screensize", screename])
well.screensize.save(patht, screename)
# compress everything to one zip-file
file_path = os.path.join(path, name)
with zipfile.ZipFile(file_path, "w") as zfile:
zfile.write(os.path.join(patht, "info.csv"), "info.csv")
zfile.write(os.path.join(patht, radiuname), radiuname)
zfile.write(os.path.join(patht, coordname), coordname)
zfile.write(os.path.join(patht, welldname), welldname)
zfile.write(os.path.join(patht, aquifname), aquifname)
zfile.write(os.path.join(patht, screename), screename)
# delete the temporary directory
shutil.rmtree(patht, ignore_errors=True)
return file_path
[docs]def save_campaign(campaign, path="", name=None):
"""Save the campaign to file.
This writes the campaign to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Cmp_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".cmp"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Cmp_" + campaign.name
# ensure the name ends with '.cmp'
if name[-4:] != ".cmp":
name += ".cmp"
name = _formname(name)
# create temporal directory for the included files
patht = tempfile.mkdtemp(dir=path)
# write the csv-file
with open(os.path.join(patht, "info.csv"), "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Campaign"])
writer.writerow(["name", campaign.name])
writer.writerow(["description", campaign.description])
writer.writerow(["timeframe", campaign.timeframe])
# define names for the variable-files
if campaign.fieldsite is not None:
fieldsname = name[:-4] + "_Fieldsite.fds"
# save variable-files
writer.writerow(["fieldsite", fieldsname])
campaign.fieldsite.save(patht, fieldsname)
else:
writer.writerow(["fieldsite", "None"])
wkeys = tuple(campaign.wells.keys())
writer.writerow(["Wells", len(wkeys)])
wellsname = {}
for k in wkeys:
wellsname[k] = name[:-4] + "_" + k + "_Well.wel"
writer.writerow([k, wellsname[k]])
campaign.wells[k].save(patht, wellsname[k])
tkeys = tuple(campaign.tests.keys())
writer.writerow(["Tests", len(tkeys)])
testsname = {}
for k in tkeys:
testsname[k] = name[:-4] + "_" + k + "_Test.tst"
writer.writerow([k, testsname[k]])
campaign.tests[k].save(patht, testsname[k])
# compress everything to one zip-file
file_path = os.path.join(path, name)
with zipfile.ZipFile(file_path, "w") as zfile:
zfile.write(os.path.join(patht, "info.csv"), "info.csv")
if campaign.fieldsite is not None:
zfile.write(os.path.join(patht, fieldsname), fieldsname)
for k in wkeys:
zfile.write(os.path.join(patht, wellsname[k]), wellsname[k])
for k in tkeys:
zfile.write(os.path.join(patht, testsname[k]), testsname[k])
# delete the temporary directory
shutil.rmtree(patht, ignore_errors=True)
return file_path
[docs]def save_fieldsite(fieldsite, path="", name=None):
"""Save a field site to file.
This writes the field site to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Field_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".fds"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Field_" + fieldsite.name
# ensure the name ends with '.fds'
if name[-4:] != ".fds":
name += ".fds"
name = _formname(name)
# create temporal directory for the included files
patht = tempfile.mkdtemp(dir=path)
# write the csv-file
with open(os.path.join(patht, "info.csv"), "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Fieldsite"])
writer.writerow(["name", fieldsite.name])
writer.writerow(["description", fieldsite.description])
# define names for the variable-files
if fieldsite.coordinates is not None:
coordname = name[:-4] + "_CooVar.var"
# save variable-files
writer.writerow(["coordinates", coordname])
fieldsite.coordinates.save(patht, coordname)
else:
writer.writerow(["coordinates", "None"])
# compress everything to one zip-file
file_path = os.path.join(path, name)
with zipfile.ZipFile(file_path, "w") as zfile:
zfile.write(os.path.join(patht, "info.csv"), "info.csv")
if fieldsite.coordinates is not None:
zfile.write(os.path.join(patht, coordname), coordname)
# delete the temporary directory
shutil.rmtree(patht, ignore_errors=True)
return file_path
[docs]def save_pumping_test(pump_test, path="", name=None):
"""Save a pumping test to file.
This writes the variable to a csv file.
Parameters
----------
path : :class:`str`, optional
Path where the variable should be saved. Default: ``""``
name : :class:`str`, optional
Name of the file. If ``None``, the name will be generated by
``"Test_"+name``. Default: ``None``
Notes
-----
The file will get the suffix ``".tst"``.
"""
path = os.path.normpath(path)
# create the path if not existing
if not os.path.exists(path):
os.makedirs(path)
# create a standard name if None is given
if name is None:
name = "Test_" + pump_test.name
# ensure the name ends with '.tst'
if name[-4:] != ".tst":
name += ".tst"
name = _formname(name)
# create temporal directory for the included files
patht = tempfile.mkdtemp(dir=path)
# write the csv-file
with open(os.path.join(patht, "info.csv"), "w") as csvf:
writer = csv.writer(
csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
)
writer.writerow(["wtp-version", __version__])
writer.writerow(["Testtype", "PumpingTest"])
writer.writerow(["name", pump_test.name])
writer.writerow(["description", pump_test.description])
writer.writerow(["timeframe", pump_test.timeframe])
writer.writerow(["pumpingwell", pump_test.pumpingwell])
# define names for the variable-files (file extension added autom.)
pumprname = name[:-4] + "_PprVar"
aquidname = name[:-4] + "_AqdVar"
aquirname = name[:-4] + "_AqrVar"
# save variable-files
pumpr_path = pump_test.pumpingrate.save(patht, pumprname)
pumpr_base = os.path.basename(pumpr_path)
writer.writerow(["pumpingrate", pumpr_base])
aquid_path = pump_test.aquiferdepth.save(patht, aquidname)
aquid_base = os.path.basename(aquid_path)
writer.writerow(["aquiferdepth", aquid_base])
aquir_path = pump_test.aquiferradius.save(patht, aquirname)
aquir_base = os.path.basename(aquir_path)
writer.writerow(["aquiferradius", aquir_base])
okeys = tuple(pump_test.observations.keys())
writer.writerow(["Observations", len(okeys)])
obsname = {}
for k in okeys:
obsname[k] = name[:-4] + "_" + k + "_Obs.obs"
writer.writerow([k, obsname[k]])
pump_test.observations[k].save(patht, obsname[k])
# compress everything to one zip-file
file_path = os.path.join(path, name)
with zipfile.ZipFile(file_path, "w") as zfile:
zfile.write(os.path.join(patht, "info.csv"), "info.csv")
zfile.write(pumpr_path, pumpr_base)
zfile.write(aquir_path, aquir_base)
zfile.write(aquid_path, aquid_base)
for k in okeys:
zfile.write(os.path.join(patht, obsname[k]), obsname[k])
# delete the temporary directory
shutil.rmtree(patht, ignore_errors=True)
return file_path
# LOAD ###
def _load_var_data(data):
# default version string
version_string = "1.0.0"
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Variable":
raise ValueError(
f"load_var: expected 'Variable' but got '{header[0]}'"
)
name = next(data)[1]
symbol = next(data)[1]
units = next(data)[1]
description = next(data)[1]
integer = next(data)[0] == "integer"
shapenfo = _nextr(data)
if shapenfo[0] == "scalar":
if integer:
value = int(next(data)[1])
else:
value = float(next(data)[1])
else:
shape = tuple(np.array(shapenfo[1:], dtype=int))
vcnt = int(next(data)[1])
vlist = []
for __ in range(vcnt):
vlist.append(next(data)[0])
if integer:
value = np.array(vlist, dtype=int).reshape(shape)
else:
value = np.array(vlist, dtype=float).reshape(shape)
return varlib.Variable(name, value, symbol, units, description)
[docs]def load_var(varfile):
"""Load a variable from file.
This reads a variable from a csv file.
Parameters
----------
varfile : :class:`str`
Path to the file
"""
cleanup = False
try:
# read file
data_file = open(varfile, "r")
except TypeError: # if it is an instance of TextIOWrapper
try:
# read stream
data = csv.reader(varfile)
except Exception as exc:
raise LoadError(
f"load_var: couldn't read file '{varfile}'"
) from exc
else:
data = csv.reader(data_file)
cleanup = True
try:
var = _load_var_data(data)
except Exception as exc:
raise LoadError(
f"load_var: couldn't load variable '{varfile}'"
) from exc
if cleanup:
data_file.close()
return var
[docs]def load_obs(obsfile):
"""Load an observation from file.
This reads a observation from a csv file.
Parameters
----------
obsfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(obsfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Observation":
raise ValueError(
f"load_obs: expected 'Observation' but got '{header[0]}'"
)
name = next(data)[1]
steady = next(data)[1] == "steady"
description = next(data)[1]
if not steady:
timef = next(data)[1]
obsf = next(data)[1]
# read time if not steady
time = None
if not steady:
time = load_var(TxtIO(zfile.open(timef)))
# read observation
obs = load_var(TxtIO(zfile.open(obsf)))
# generate observation object
observation = varlib.Observation(name, obs, time, description)
except Exception as exc:
raise LoadError(
f"load_obs: couldn't load observation '{obsfile}'"
) from exc
return observation
[docs]def load_well(welfile):
"""Load a well from file.
This reads a well from a csv file.
Parameters
----------
welfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(welfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Well":
raise ValueError(
f"load_well: expected 'Well' but got '{header[0]}'"
)
name = next(data)[1]
# radius
radf = next(data)[1]
rad = load_var(TxtIO(zfile.open(radf)))
# coordinates
coordf = next(data)[1]
coord = load_var(TxtIO(zfile.open(coordf)))
# well depth
welldf = next(data)[1]
welld = load_var(TxtIO(zfile.open(welldf)))
# aquifer depth
aquidf = next(data)[1]
aquid = load_var(TxtIO(zfile.open(aquidf)))
# read screensize implemented in v1.1
screend = None
if version.release >= (1, 1):
screenf = next(data)[1]
screend = load_var(TxtIO(zfile.open(screenf)))
well = varlib.Well(name, rad, coord, welld, aquid, screend)
except Exception as exc:
raise LoadError(f"load_well: couldn't load well '{welfile}'") from exc
return well
[docs]def load_campaign(cmpfile):
"""Load a campaign from file.
This reads a campaign from a csv file.
Parameters
----------
cmpfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(cmpfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Campaign":
raise ValueError(
f"load_campaign: expected 'Campaign' but got '{header[0]}'"
)
name = next(data)[1]
description = next(data)[1]
timeframe = next(data)[1]
row = _nextr(data)
if row[1] == "None":
fieldsite = None
else:
fieldsite = load_fieldsite(BytIO(zfile.read(row[1])))
wcnt = int(next(data)[1])
wells = {}
for __ in range(wcnt):
row = _nextr(data)
wells[row[0]] = load_well(BytIO(zfile.read(row[1])))
tcnt = int(next(data)[1])
tests = {}
for __ in range(tcnt):
row = _nextr(data)
tests[row[0]] = load_test(BytIO(zfile.read(row[1])))
campaign = campaignlib.Campaign(
name, fieldsite, wells, tests, timeframe, description
)
except Exception as exc:
raise LoadError(
f"load_campaign: couldn't load campaign '{cmpfile}'"
) from exc
return campaign
[docs]def load_fieldsite(fdsfile):
"""Load a field site from file.
This reads a field site from a csv file.
Parameters
----------
fdsfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(fdsfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Fieldsite":
raise ValueError(
"load_fieldsite: expected 'Fieldsite' "
f"but got '{header[0]}'"
)
name = next(data)[1]
description = next(data)[1]
coordinfo = next(data)[1]
if coordinfo == "None":
coordinates = None
else:
coordinates = load_var(TxtIO(zfile.open(coordinfo)))
fieldsite = campaignlib.FieldSite(name, description, coordinates)
except Exception as exc:
raise LoadError(
f"load_fieldsite: couldn't load fieldsite '{fdsfile}'"
) from exc
return fieldsite
[docs]def load_test(tstfile):
"""Load a test from file.
This reads a test from a csv file.
Parameters
----------
tstfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(tstfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[0] != "Testtype":
raise ValueError(
f"load_test: expected 'Testtype' but got '{header[0]}'"
)
if header[1] == "PumpingTest":
routine = _load_pumping_test
else:
raise ValueError(f"load_test: unknown test type '{header[1]}'")
except Exception as exc:
raise LoadError(f"load_test: couldn't load test '{tstfile}'") from exc
return routine(tstfile)
def _load_pumping_test(tstfile):
"""Load a pumping test from file.
This reads a pumping test from a csv file.
Parameters
----------
tstfile : :class:`str`
Path to the file
"""
# default version string
version_string = "1.0.0"
try:
with zipfile.ZipFile(tstfile, "r") as zfile:
info = TxtIO(zfile.open("info.csv"))
data = csv.reader(info)
first_line = _nextr(data)
if first_line[0] == "wtp-version":
version_string = first_line[1]
header = _nextr(data)
else:
header = first_line
version = version_parse(version_string)
_check_version(version)
if header[1] != "PumpingTest":
raise ValueError(
f"load_test: expected 'PumpingTest' but got '{header[1]}'"
)
name = next(data)[1]
description = next(data)[1]
timeframe = next(data)[1]
pumpingwell = next(data)[1]
rate_raw = TxtIO(zfile.open(next(data)[1]))
try:
pumpingrate = load_var(rate_raw)
except Exception:
pumpingrate = load_obs(rate_raw)
aquiferdepth = load_var(TxtIO(zfile.open(next(data)[1])))
aquiferradius = load_var(TxtIO(zfile.open(next(data)[1])))
obscnt = int(next(data)[1])
observations = {}
for __ in range(obscnt):
row = _nextr(data)
observations[row[0]] = load_obs(BytIO(zfile.read(row[1])))
pumpingtest = testslib.PumpingTest(
name,
pumpingwell,
pumpingrate,
observations,
aquiferdepth,
aquiferradius,
description,
timeframe,
)
except Exception as exc:
raise LoadError(
f"load_test: couldn't load pumpingtest '{tstfile}'"
) from exc
return pumpingtest