Source code for welltestpy.data.data_io

# -*- coding: utf-8 -*-
"""
welltestpy subpackage providing input-output routines.

.. currentmodule:: welltestpy.data.data_io

The following functions are provided

.. autosummary::
"""
import os
import csv
import shutil
import zipfile
import tempfile
from io import TextIOWrapper as TxtIO, BytesIO as BytIO

import numpy as np

from . import varlib, campaignlib, testslib


# TOOLS ###


def _formstr(string):
    # remove spaces, tabs, linebreaks and other separators
    return "".join(str(string).split())


def _formname(string):
    # remove slashes
    string = "".join(str(string).split(os.path.sep))
    # remove spaces, tabs, linebreaks and other separators
    return _formstr(string)


def _nextr(data):
    return tuple(filter(None, next(data)))


# SAVE ###


[docs]def save_var(var, path="", name=None): """Save a variable to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Var_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".var"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Var_" + var.name # ensure the name ends with '.var' if name[-4:] != ".var": name += ".var" name = _formname(name) file_path = os.path.join(path, name) # write the csv-file with open(file_path, "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Variable"]) writer.writerow(["name", var.name]) writer.writerow(["symbol", var.symbol]) writer.writerow(["units", var.units]) writer.writerow(["description", var.description]) if np.asanyarray(var.value).dtype == np.int: writer.writerow(["integer"]) else: writer.writerow(["float"]) if var.scalar: writer.writerow(["scalar"]) writer.writerow(["value", var.value]) else: writer.writerow(["shape"] + list(np.shape(var.value))) tmpvalue = np.reshape(var.value, -1) writer.writerow(["values", len(tmpvalue)]) for val in tmpvalue: writer.writerow([val]) return file_path
[docs]def save_obs(obs, path="", name=None): """Save an observation to file. This writes the observation to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Obs_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".obs"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Obs_" + obs.name # ensure the name ends with '.obs' if name[-4:] != ".obs": name += ".obs" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file # with open(patht+name[:-4]+".csv", 'w') as csvf: with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Observation"]) writer.writerow(["name", obs.name]) writer.writerow(["state", obs.state]) writer.writerow(["description", obs.description]) if obs.state == "steady": obsname = name[:-4] + "_ObsVar.var" writer.writerow(["observation", obsname]) obs._observation.save(patht, obsname) else: timname = name[:-4] + "_TimVar.var" obsname = name[:-4] + "_ObsVar.var" writer.writerow(["time", timname]) writer.writerow(["observation", obsname]) obs._time.save(patht, timname) obs._observation.save(patht, obsname) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: # zfile.write(patht+name[:-4]+".csv", name[:-4]+".csv") zfile.write(os.path.join(patht, "info.csv"), "info.csv") if obs.state == "transient": zfile.write(os.path.join(patht, timname), timname) zfile.write(os.path.join(patht, obsname), obsname) shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_well(well, path="", name=None): """Save a well to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Well_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".wel"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Well_" + well.name # ensure the name ends with '.csv' if name[-4:] != ".wel": name += ".wel" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file # with open(patht+name[:-4]+".csv", 'w') as csvf: with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Well"]) writer.writerow(["name", well.name]) # define names for the variable-files radiuname = name[:-4] + "_RadVar.var" coordname = name[:-4] + "_CooVar.var" welldname = name[:-4] + "_WedVar.var" aquifname = name[:-4] + "_AqdVar.var" # save variable-files writer.writerow(["radius", radiuname]) well.wellradius.save(patht, radiuname) writer.writerow(["coordinates", coordname]) well.coordinates.save(patht, coordname) writer.writerow(["welldepth", welldname]) well.welldepth.save(patht, welldname) writer.writerow(["aquiferdepth", aquifname]) well.aquiferdepth.save(patht, aquifname) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: # zfile.write(patht+name[:-4]+".csv", name[:-4]+".csv") zfile.write(os.path.join(patht, "info.csv"), "info.csv") zfile.write(os.path.join(patht, radiuname), radiuname) zfile.write(os.path.join(patht, coordname), coordname) zfile.write(os.path.join(patht, welldname), welldname) zfile.write(os.path.join(patht, aquifname), aquifname) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_campaign(campaign, path="", name=None): """Save the campaign to file. This writes the campaign to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Cmp_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".cmp"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Cmp_" + campaign.name # ensure the name ends with '.csv' if name[-4:] != ".cmp": name += ".cmp" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file # with open(patht+name[:-4]+".csv", 'w') as csvf: with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Campaign"]) writer.writerow(["name", campaign.name]) writer.writerow(["description", campaign.description]) writer.writerow(["timeframe", campaign.timeframe]) # define names for the variable-files if campaign.fieldsite is not None: fieldsname = name[:-4] + "_Fieldsite.fds" # save variable-files writer.writerow(["fieldsite", fieldsname]) campaign.fieldsite.save(patht, fieldsname) else: writer.writerow(["fieldsite", "None"]) wkeys = tuple(campaign.wells.keys()) writer.writerow(["Wells", len(wkeys)]) wellsname = {} for k in wkeys: wellsname[k] = name[:-4] + "_" + k + "_Well.wel" writer.writerow([k, wellsname[k]]) campaign.wells[k].save(patht, wellsname[k]) tkeys = tuple(campaign.tests.keys()) writer.writerow(["Tests", len(tkeys)]) testsname = {} for k in tkeys: testsname[k] = name[:-4] + "_" + k + "_Test.tst" writer.writerow([k, testsname[k]]) campaign.tests[k].save(patht, testsname[k]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") if campaign.fieldsite is not None: zfile.write(os.path.join(patht, fieldsname), fieldsname) for k in wkeys: zfile.write(os.path.join(patht, wellsname[k]), wellsname[k]) for k in tkeys: zfile.write(os.path.join(patht, testsname[k]), testsname[k]) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_fieldsite(fieldsite, path="", name=None): """Save a field site to file. This writes the field site to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Field_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".fds"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Field_" + fieldsite.name # ensure the name ends with '.csv' if name[-4:] != ".fds": name += ".fds" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file # with open(patht+name[:-4]+".csv", 'w') as csvf: with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Fieldsite"]) writer.writerow(["name", fieldsite.name]) writer.writerow(["description", fieldsite.description]) # define names for the variable-files if fieldsite.coordinates is not None: coordname = name[:-4] + "_CooVar.var" # save variable-files writer.writerow(["coordinates", coordname]) fieldsite.coordinates.save(patht, coordname) else: writer.writerow(["coordinates", "None"]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") if fieldsite.coordinates is not None: zfile.write(os.path.join(patht, coordname), coordname) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
[docs]def save_pumping_test(pump_test, path="", name=None): """Save a pumping test to file. This writes the variable to a csv file. Parameters ---------- path : :class:`str`, optional Path where the variable should be saved. Default: ``""`` name : :class:`str`, optional Name of the file. If ``None``, the name will be generated by ``"Test_"+name``. Default: ``None`` Notes ----- The file will get the suffix ``".tst"``. """ path = os.path.normpath(path) # create the path if not existing if not os.path.exists(path): os.makedirs(path) # create a standard name if None is given if name is None: name = "Test_" + pump_test.name # ensure the name ends with '.csv' if name[-4:] != ".tst": name += ".tst" name = _formname(name) # create temporal directory for the included files patht = tempfile.mkdtemp(dir=path) # write the csv-file # with open(patht+name[:-4]+".csv", 'w') as csvf: with open(os.path.join(patht, "info.csv"), "w") as csvf: writer = csv.writer( csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n" ) writer.writerow(["Testtype", "PumpingTest"]) writer.writerow(["name", pump_test.name]) writer.writerow(["description", pump_test.description]) writer.writerow(["timeframe", pump_test.timeframe]) writer.writerow(["pumpingwell", pump_test.pumpingwell]) # define names for the variable-files (file extension added autom.) pumprname = name[:-4] + "_PprVar" aquidname = name[:-4] + "_AqdVar" aquirname = name[:-4] + "_AqrVar" # save variable-files pumpr_path = pump_test.pumpingrate.save(patht, pumprname) pumpr_base = os.path.basename(pumpr_path) writer.writerow(["pumpingrate", pumpr_base]) aquid_path = pump_test.aquiferdepth.save(patht, aquidname) aquid_base = os.path.basename(aquid_path) writer.writerow(["aquiferdepth", aquid_base]) aquir_path = pump_test.aquiferradius.save(patht, aquirname) aquir_base = os.path.basename(aquir_path) writer.writerow(["aquiferradius", aquir_base]) okeys = tuple(pump_test.observations.keys()) writer.writerow(["Observations", len(okeys)]) obsname = {} for k in okeys: obsname[k] = name[:-4] + "_" + k + "_Obs.obs" writer.writerow([k, obsname[k]]) pump_test.observations[k].save(patht, obsname[k]) # compress everything to one zip-file file_path = os.path.join(path, name) with zipfile.ZipFile(file_path, "w") as zfile: zfile.write(os.path.join(patht, "info.csv"), "info.csv") zfile.write(pumpr_path, pumpr_base) zfile.write(aquir_path, aquir_base) zfile.write(aquid_path, aquid_base) for k in okeys: zfile.write(os.path.join(patht, obsname[k]), obsname[k]) # delete the temporary directory shutil.rmtree(patht, ignore_errors=True) return file_path
# LOAD ###
[docs]def load_var(varfile): """Load a variable from file. This reads a variable from a csv file. Parameters ---------- varfile : :class:`str` Path to the file """ try: with open(varfile, "r") as vfile: data = csv.reader(vfile) if next(data)[0] != "Variable": raise Exception name = next(data)[1] symbol = next(data)[1] units = next(data)[1] description = next(data)[1] integer = next(data)[0] == "integer" shapenfo = _nextr(data) if shapenfo[0] == "scalar": if integer: value = np.int(next(data)[1]) else: value = np.float(next(data)[1]) else: shape = tuple(np.array(shapenfo[1:], dtype=np.int)) vcnt = np.int(next(data)[1]) vlist = [] for __ in range(vcnt): vlist.append(next(data)[0]) if integer: value = np.array(vlist, dtype=np.int).reshape(shape) else: value = np.array(vlist, dtype=np.float).reshape(shape) var = varlib.Variable(name, value, symbol, units, description) except Exception: try: data = csv.reader(varfile) if next(data)[0] != "Variable": raise Exception name = next(data)[1] symbol = next(data)[1] units = next(data)[1] description = next(data)[1] integer = next(data)[0] == "integer" shapenfo = _nextr(data) if shapenfo[0] == "scalar": if integer: value = np.int(next(data)[1]) else: value = np.float(next(data)[1]) else: shape = tuple(np.array(shapenfo[1:], dtype=np.int)) vcnt = np.int(next(data)[1]) vlist = [] for __ in range(vcnt): vlist.append(next(data)[0]) if integer: value = np.array(vlist, dtype=np.int).reshape(shape) else: value = np.array(vlist, dtype=np.float).reshape(shape) var = varlib.Variable(name, value, symbol, units, description) except Exception: raise Exception("loadVar: loading the variable was not possible") return var
[docs]def load_obs(obsfile): """Load an observation from file. This reads a observation from a csv file. Parameters ---------- obsfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(obsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) if next(data)[0] != "Observation": raise Exception name = next(data)[1] steady = next(data)[1] == "steady" description = next(data)[1] if not steady: timef = next(data)[1] obsf = next(data)[1] if not steady: time = load_var(TxtIO(zfile.open(timef))) else: time = None obs = load_var(TxtIO(zfile.open(obsf))) observation = varlib.Observation(name, obs, time, description) except Exception: raise Exception("loadObs: loading the observation was not possible") return observation
[docs]def load_well(welfile): """Load a well from file. This reads a well from a csv file. Parameters ---------- welfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(welfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) if next(data)[0] != "Well": raise Exception name = next(data)[1] radf = next(data)[1] coordf = next(data)[1] welldf = next(data)[1] aquidf = next(data)[1] rad = load_var(TxtIO(zfile.open(radf))) coord = load_var(TxtIO(zfile.open(coordf))) welld = load_var(TxtIO(zfile.open(welldf))) aquid = load_var(TxtIO(zfile.open(aquidf))) well = varlib.Well(name, rad, coord, welld, aquid) except Exception: raise Exception("loadWell: loading the well was not possible") return well
[docs]def load_campaign(cmpfile): """Load a campaign from file. This reads a campaign from a csv file. Parameters ---------- cmpfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(cmpfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) if next(data)[0] != "Campaign": raise Exception name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] row = _nextr(data) if row[1] == "None": fieldsite = None else: fieldsite = load_fieldsite(BytIO(zfile.read(row[1]))) wcnt = np.int(next(data)[1]) wells = {} for __ in range(wcnt): row = _nextr(data) wells[row[0]] = load_well(BytIO(zfile.read(row[1]))) tcnt = np.int(next(data)[1]) tests = {} for __ in range(tcnt): row = _nextr(data) tests[row[0]] = load_test(BytIO(zfile.read(row[1]))) campaign = campaignlib.Campaign( name, fieldsite, wells, tests, timeframe, description ) except Exception: raise Exception( "loadPumpingTest: loading the pumpingtest " + "was not possible" ) return campaign
[docs]def load_fieldsite(fdsfile): """Load a field site from file. This reads a field site from a csv file. Parameters ---------- fdsfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(fdsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) if next(data)[0] != "Fieldsite": raise Exception name = next(data)[1] description = next(data)[1] coordinfo = next(data)[1] if coordinfo == "None": coordinates = None else: coordinates = load_var(TxtIO(zfile.open(coordinfo))) fieldsite = campaignlib.FieldSite(name, description, coordinates) except Exception: raise Exception( "loadFieldSite: loading the fieldsite " + "was not possible" ) return fieldsite
[docs]def load_test(tstfile): """Load a test from file. This reads a test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) row = _nextr(data) if row[0] != "Testtype": raise Exception if row[1] == "PumpingTest": routine = _load_pumping_test else: raise Exception except Exception: raise Exception("loadTest: loading the test " + "was not possible") return routine(tstfile)
def _load_pumping_test(tstfile): """Load a pumping test from file. This reads a pumping test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) if next(data)[1] != "PumpingTest": raise Exception name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] pumpingwell = next(data)[1] rate_raw = TxtIO(zfile.open(next(data)[1])) try: pumpingrate = load_var(rate_raw) except Exception: pumpingrate = load_obs(rate_raw) aquiferdepth = load_var(TxtIO(zfile.open(next(data)[1]))) aquiferradius = load_var(TxtIO(zfile.open(next(data)[1]))) obscnt = np.int(next(data)[1]) observations = {} for __ in range(obscnt): row = _nextr(data) observations[row[0]] = load_obs(BytIO(zfile.read(row[1]))) pumpingtest = testslib.PumpingTest( name, pumpingwell, pumpingrate, observations, aquiferdepth, aquiferradius, description, timeframe, ) except Exception: raise Exception( "loadPumpingTest: loading the pumpingtest " + "was not possible" ) return pumpingtest