Source code for welltestpy.data.data_io

# -*- coding: utf-8 -*-
"""
welltestpy subpackage providing input-output routines.

.. currentmodule:: welltestpy.data.data_io

The following functions are provided

.. autosummary::
"""
import os
import csv
import shutil
import zipfile
import tempfile
from io import TextIOWrapper as TxtIO, BytesIO as BytIO
import numbers
import numpy as np
from packaging.version import parse as version_parse

from . import varlib, campaignlib, testslib

try:
    from .._version import __version__
except ImportError:  # pragma: nocover
    # package is not installed
    __version__ = "0.0.0.dev0"


# TOOLS ###


[docs]class LoadError(Exception): """Loading error for all reading routines.""" pass
def _formstr(string): # remove spaces, tabs, linebreaks and other separators return "".join(str(string).split()) def _formname(string): # remove slashes string = "".join(str(string).split(os.path.sep)) # remove spaces, tabs, linebreaks and other separators return _formstr(string) def _nextr(data): return tuple(filter(None, next(data))) def _check_version(version): """At least check major version.""" if version.major > version_parse(__version__).major: raise ValueError(f"Unknown version '{version.public}'") # SAVE ###
[docs]def save_var(var, path="", name=None): """Save a variable to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Var_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".var"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Var_" + var.name # ensure the name ends with '.var' if name[-4:] != ".var": name += ".var" name = _formname(name) file_path = os.path.join(path, name) # write the csv-file with open(file_path, "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Variable"]) writer.writerow(["name", var.name]) writer.writerow(["symbol", var.symbol]) writer.writerow(["units", var.units]) writer.writerow(["description", var.description]) if issubclass(np.asanyarray(var.value).dtype.type, numbers.Integral): writer.writerow(["integer"]) else: writer.writerow(["float"]) if var.scalar: writer.writerow(["scalar"]) writer.writerow(["value", var.value]) else: writer.writerow(["shape"] + list(np.shape(var.value))) tmpvalue = np.reshape(var.value, -1) writer.writerow(["values", len(tmpvalue)]) for val in tmpvalue: writer.writerow([val]) return file_path
[docs]def save_obs(obs, path="", name=None): """Save an observation to file. This writes the observation to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Obs_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".obs"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Obs_" + obs.name # ensure the name ends with '.obs' if name[-4:] != ".obs": name += ".obs" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Observation"]) writer.writerow(["name", obs.name]) writer.writerow(["state", obs.state]) writer.writerow(["description", obs.description]) if obs.state == "steady": obsname = name[:-4] + "_ObsVar.var" writer.writerow(["observation", obsname]) obs._observation.save(patht, obsname) else: timname = name[:-4] + "_TimVar.var" obsname = name[:-4] + "_ObsVar.var" writer.writerow(["time", timname]) writer.writerow(["observation", obsname]) obs._time.save(patht, timname) obs._observation.save(patht, obsname) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") if obs.state == "transient": zfile.write(os.path.join(patht, timname), timname) zfile.write(os.path.join(patht, obsname), obsname) shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_well(well, path="", name=None): """Save a well to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Well_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".wel"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Well_" + well.name # ensure the name ends with '.wel' if name[-4:] != ".wel": name += ".wel" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Well"]) writer.writerow(["name", well.name]) # define names for the variable-files radiuname = name[:-4] + "_RadVar.var" coordname = name[:-4] + "_CooVar.var" welldname = name[:-4] + "_WedVar.var" aquifname = name[:-4] + "_AqdVar.var" screename = name[:-4] + "_ScrVar.var" # save variable-files writer.writerow(["radius", radiuname]) well.wellradius.save(patht, radiuname) writer.writerow(["coordinates", coordname]) well.coordinates.save(patht, coordname) writer.writerow(["welldepth", welldname]) well.welldepth.save(patht, welldname) writer.writerow(["aquiferdepth", aquifname]) well.aquiferdepth.save(patht, aquifname) writer.writerow(["screensize", screename]) well.screensize.save(patht, screename) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") zfile.write(os.path.join(patht, radiuname), radiuname) zfile.write(os.path.join(patht, coordname), coordname) zfile.write(os.path.join(patht, welldname), welldname) zfile.write(os.path.join(patht, aquifname), aquifname) zfile.write(os.path.join(patht, screename), screename) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_campaign(campaign, path="", name=None): """Save the campaign to file. This writes the campaign to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Cmp_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".cmp"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Cmp_" + campaign.name # ensure the name ends with '.cmp' if name[-4:] != ".cmp": name += ".cmp" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Campaign"]) writer.writerow(["name", campaign.name]) writer.writerow(["description", campaign.description]) writer.writerow(["timeframe", campaign.timeframe]) # define names for the variable-files if campaign.fieldsite is not None: fieldsname = name[:-4] + "_Fieldsite.fds" # save variable-files writer.writerow(["fieldsite", fieldsname]) campaign.fieldsite.save(patht, fieldsname) else: writer.writerow(["fieldsite", "None"]) wkeys = tuple(campaign.wells.keys()) writer.writerow(["Wells", len(wkeys)]) wellsname = {} for k in wkeys: wellsname[k] = name[:-4] + "_" + k + "_Well.wel" writer.writerow([k, wellsname[k]]) campaign.wells[k].save(patht, wellsname[k]) tkeys = tuple(campaign.tests.keys()) writer.writerow(["Tests", len(tkeys)]) testsname = {} for k in tkeys: testsname[k] = name[:-4] + "_" + k + "_Test.tst" writer.writerow([k, testsname[k]]) campaign.tests[k].save(patht, testsname[k]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") if campaign.fieldsite is not None: zfile.write(os.path.join(patht, fieldsname), fieldsname) for k in wkeys: zfile.write(os.path.join(patht, wellsname[k]), wellsname[k]) for k in tkeys: zfile.write(os.path.join(patht, testsname[k]), testsname[k]) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_fieldsite(fieldsite, path="", name=None): """Save a field site to file. This writes the field site to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Field_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".fds"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Field_" + fieldsite.name # ensure the name ends with '.fds' if name[-4:] != ".fds": name += ".fds" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Fieldsite"]) writer.writerow(["name", fieldsite.name]) writer.writerow(["description", fieldsite.description]) # define names for the variable-files if fieldsite.coordinates is not None: coordname = name[:-4] + "_CooVar.var" # save variable-files writer.writerow(["coordinates", coordname]) fieldsite.coordinates.save(patht, coordname) else: writer.writerow(["coordinates", "None"]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") if fieldsite.coordinates is not None: zfile.write(os.path.join(patht, coordname), coordname) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_pumping_test(pump_test, path="", name=None): """Save a pumping test to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Test_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".tst"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Test_" + pump_test.name # ensure the name ends with '.tst' if name[-4:] != ".tst": name += ".tst" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["wtp-version", __version__]) writer.writerow(["Testtype", "PumpingTest"]) writer.writerow(["name", pump_test.name]) writer.writerow(["description", pump_test.description]) writer.writerow(["timeframe", pump_test.timeframe]) writer.writerow(["pumpingwell", pump_test.pumpingwell]) # define names for the variable-files (file extension added autom.) pumprname = name[:-4] + "_PprVar" aquidname = name[:-4] + "_AqdVar" aquirname = name[:-4] + "_AqrVar" # save variable-files pumpr_path = pump_test.pumpingrate.save(patht, pumprname) pumpr_base = os.path.basename(pumpr_path) writer.writerow(["pumpingrate", pumpr_base]) aquid_path = pump_test.aquiferdepth.save(patht, aquidname) aquid_base = os.path.basename(aquid_path) writer.writerow(["aquiferdepth", aquid_base]) aquir_path = pump_test.aquiferradius.save(patht, aquirname) aquir_base = os.path.basename(aquir_path) writer.writerow(["aquiferradius", aquir_base]) okeys = tuple(pump_test.observations.keys()) writer.writerow(["Observations", len(okeys)]) obsname = {} for k in okeys: obsname[k] = name[:-4] + "_" + k + "_Obs.obs" writer.writerow([k, obsname[k]]) pump_test.observations[k].save(patht, obsname[k]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") zfile.write(pumpr_path, pumpr_base) zfile.write(aquir_path, aquir_base) zfile.write(aquid_path, aquid_base) for k in okeys: zfile.write(os.path.join(patht, obsname[k]), obsname[k]) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
# LOAD ### def _load_var_data(data): # default version string version_string = "1.0.0" first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Variable": raise ValueError( f"load_var: expected 'Variable' but got '{header[0]}'" ) name = next(data)[1] symbol = next(data)[1] units = next(data)[1] description = next(data)[1] integer = next(data)[0] == "integer" shapenfo = _nextr(data) if shapenfo[0] == "scalar": if integer: value = int(next(data)[1]) else: value = float(next(data)[1]) else: shape = tuple(np.array(shapenfo[1:], dtype=int)) vcnt = int(next(data)[1]) vlist = [] for __ in range(vcnt): vlist.append(next(data)[0]) if integer: value = np.array(vlist, dtype=int).reshape(shape) else: value = np.array(vlist, dtype=float).reshape(shape) return varlib.Variable(name, value, symbol, units, description)
[docs]def load_var(varfile): """Load a variable from file. This reads a variable from a csv file. Parameters ---------- varfile : :class:`str` Path to the file """ cleanup = False try: # read file data_file = open(varfile, "r") except TypeError: # if it is an instance of TextIOWrapper try: # read stream data = csv.reader(varfile) except Exception as exc: raise LoadError( f"load_var: couldn't read file '{varfile}'" ) from exc else: data = csv.reader(data_file) cleanup = True try: var = _load_var_data(data) except Exception as exc: raise LoadError( f"load_var: couldn't load variable '{varfile}'" ) from exc if cleanup: data_file.close() return var
[docs]def load_obs(obsfile): """Load an observation from file. This reads a observation from a csv file. Parameters ---------- obsfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(obsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Observation": raise ValueError( f"load_obs: expected 'Observation' but got '{header[0]}'" ) name = next(data)[1] steady = next(data)[1] == "steady" description = next(data)[1] if not steady: timef = next(data)[1] obsf = next(data)[1] # read time if not steady time = None if not steady: time = load_var(TxtIO(zfile.open(timef))) # read observation obs = load_var(TxtIO(zfile.open(obsf))) # generate observation object observation = varlib.Observation(name, obs, time, description) except Exception as exc: raise LoadError( f"load_obs: couldn't load observation '{obsfile}'" ) from exc return observation
[docs]def load_well(welfile): """Load a well from file. This reads a well from a csv file. Parameters ---------- welfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(welfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Well": raise ValueError( f"load_well: expected 'Well' but got '{header[0]}'" ) name = next(data)[1] # radius radf = next(data)[1] rad = load_var(TxtIO(zfile.open(radf))) # coordinates coordf = next(data)[1] coord = load_var(TxtIO(zfile.open(coordf))) # well depth welldf = next(data)[1] welld = load_var(TxtIO(zfile.open(welldf))) # aquifer depth aquidf = next(data)[1] aquid = load_var(TxtIO(zfile.open(aquidf))) # read screensize implemented in v1.1 screend = None if version.release >= (1, 1): screenf = next(data)[1] screend = load_var(TxtIO(zfile.open(screenf))) well = varlib.Well(name, rad, coord, welld, aquid, screend) except Exception as exc: raise LoadError(f"load_well: couldn't load well '{welfile}'") from exc return well
[docs]def load_campaign(cmpfile): """Load a campaign from file. This reads a campaign from a csv file. Parameters ---------- cmpfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(cmpfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Campaign": raise ValueError( f"load_campaign: expected 'Campaign' but got '{header[0]}'" ) name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] row = _nextr(data) if row[1] == "None": fieldsite = None else: fieldsite = load_fieldsite(BytIO(zfile.read(row[1]))) wcnt = int(next(data)[1]) wells = {} for __ in range(wcnt): row = _nextr(data) wells[row[0]] = load_well(BytIO(zfile.read(row[1]))) tcnt = int(next(data)[1]) tests = {} for __ in range(tcnt): row = _nextr(data) tests[row[0]] = load_test(BytIO(zfile.read(row[1]))) campaign = campaignlib.Campaign( name, fieldsite, wells, tests, timeframe, description ) except Exception as exc: raise LoadError( f"load_campaign: couldn't load campaign '{cmpfile}'" ) from exc return campaign
[docs]def load_fieldsite(fdsfile): """Load a field site from file. This reads a field site from a csv file. Parameters ---------- fdsfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(fdsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Fieldsite": raise ValueError( "load_fieldsite: expected 'Fieldsite' " f"but got '{header[0]}'" ) name = next(data)[1] description = next(data)[1] coordinfo = next(data)[1] if coordinfo == "None": coordinates = None else: coordinates = load_var(TxtIO(zfile.open(coordinfo))) fieldsite = campaignlib.FieldSite(name, description, coordinates) except Exception as exc: raise LoadError( f"load_fieldsite: couldn't load fieldsite '{fdsfile}'" ) from exc return fieldsite
[docs]def load_test(tstfile): """Load a test from file. This reads a test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Testtype": raise ValueError( f"load_test: expected 'Testtype' but got '{header[0]}'" ) if header[1] == "PumpingTest": routine = _load_pumping_test else: raise ValueError(f"load_test: unknown test type '{header[1]}'") except Exception as exc: raise LoadError(f"load_test: couldn't load test '{tstfile}'") from exc return routine(tstfile)
def _load_pumping_test(tstfile): """Load a pumping test from file. This reads a pumping test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[1] != "PumpingTest": raise ValueError( f"load_test: expected 'PumpingTest' but got '{header[1]}'" ) name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] pumpingwell = next(data)[1] rate_raw = TxtIO(zfile.open(next(data)[1])) try: pumpingrate = load_var(rate_raw) except Exception: pumpingrate = load_obs(rate_raw) aquiferdepth = load_var(TxtIO(zfile.open(next(data)[1]))) aquiferradius = load_var(TxtIO(zfile.open(next(data)[1]))) obscnt = int(next(data)[1]) observations = {} for __ in range(obscnt): row = _nextr(data) observations[row[0]] = load_obs(BytIO(zfile.read(row[1]))) pumpingtest = testslib.PumpingTest( name, pumpingwell, pumpingrate, observations, aquiferdepth, aquiferradius, description, timeframe, ) except Exception as exc: raise LoadError( f"load_test: couldn't load pumpingtest '{tstfile}'" ) from exc return pumpingtest