Source code for welltestpy.data.data_io

"""welltestpy subpackage providing input-output routines."""
import csv
import numbers
import os
import shutil
import tempfile
import zipfile
from io import BytesIO as BytIO
from io import TextIOWrapper as TxtIO

import numpy as np
from packaging.version import parse as version_parse

from . import campaignlib, testslib, varlib

try:
    from .._version import __version__
except ImportError:  # pragma: nocover
    # package is not installed
    __version__ = "0.0.0.dev0"


# TOOLS ###


class LoadError(Exception):
    """Loading error for all reading routines."""

    pass


def _formstr(string):
    # remove spaces, tabs, linebreaks and other separators
    return "".join(str(string).split())


def _formname(string):
    # remove slashes
    string = "".join(str(string).split(os.path.sep))
    # remove spaces, tabs, linebreaks and other separators
    return _formstr(string)


def _nextr(data):
    return tuple(filter(None, next(data)))


def _check_version(version):
    """At least check major version."""
    if version.major > version_parse(__version__).major:
        raise ValueError(f"Unknown version '{version.public}'")


# SAVE ###


def save_var(var, path="", name=None):
    """Save a variable to file.

    This writes the variable to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Var_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".var"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Var_" + var.name
    # ensure the name ends with '.var'
    if name[-4:] != ".var":
        name += ".var"
    name = _formname(name)
    file_path = os.path.join(path, name)
    # write the csv-file
    with open(file_path, "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Variable"])
        writer.writerow(["name", var.name])
        writer.writerow(["symbol", var.symbol])
        writer.writerow(["units", var.units])
        writer.writerow(["description", var.description])
        if issubclass(np.asanyarray(var.value).dtype.type, numbers.Integral):
            writer.writerow(["integer"])
        else:
            writer.writerow(["float"])
        if var.scalar:
            writer.writerow(["scalar"])
            writer.writerow(["value", var.value])
        else:
            writer.writerow(["shape"] + list(np.shape(var.value)))
            tmpvalue = np.reshape(var.value, -1)
            writer.writerow(["values", len(tmpvalue)])
            for val in tmpvalue:
                writer.writerow([val])
    return file_path


def save_obs(obs, path="", name=None):
    """Save an observation to file.

    This writes the observation to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Obs_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".obs"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Obs_" + obs.name
    # ensure the name ends with '.obs'
    if name[-4:] != ".obs":
        name += ".obs"
    name = _formname(name)
    # create temporal directory for the included files
    patht = tempfile.mkdtemp(dir=path)
    # write the csv-file
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Observation"])
        writer.writerow(["name", obs.name])
        writer.writerow(["state", obs.state])
        writer.writerow(["description", obs.description])
        if obs.state == "steady":
            obsname = name[:-4] + "_ObsVar.var"
            writer.writerow(["observation", obsname])
            obs._observation.save(patht, obsname)
        else:
            timname = name[:-4] + "_TimVar.var"
            obsname = name[:-4] + "_ObsVar.var"
            writer.writerow(["time", timname])
            writer.writerow(["observation", obsname])
            obs._time.save(patht, timname)
            obs._observation.save(patht, obsname)
    # compress everything to one zip-file
    file_path = os.path.join(path, name)
    with zipfile.ZipFile(file_path, "w") as zfile:
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
        if obs.state == "transient":
            zfile.write(os.path.join(patht, timname), timname)
        zfile.write(os.path.join(patht, obsname), obsname)
    shutil.rmtree(patht, ignore_errors=True)
    return file_path


def save_well(well, path="", name=None):
    """Save a well to file.

    This writes the variable to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Well_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".wel"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Well_" + well.name
    # ensure the name ends with '.wel'
    if name[-4:] != ".wel":
        name += ".wel"
    name = _formname(name)
    # create temporal directory for the included files
    patht = tempfile.mkdtemp(dir=path)
    # write the csv-file
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Well"])
        writer.writerow(["name", well.name])
        # define names for the variable-files
        radiuname = name[:-4] + "_RadVar.var"
        coordname = name[:-4] + "_CooVar.var"
        welldname = name[:-4] + "_WedVar.var"
        aquifname = name[:-4] + "_AqdVar.var"
        screename = name[:-4] + "_ScrVar.var"
        # save variable-files
        writer.writerow(["radius", radiuname])
        well.wellradius.save(patht, radiuname)
        writer.writerow(["coordinates", coordname])
        well.coordinates.save(patht, coordname)
        writer.writerow(["welldepth", welldname])
        well.welldepth.save(patht, welldname)
        writer.writerow(["aquiferdepth", aquifname])
        well.aquiferdepth.save(patht, aquifname)
        writer.writerow(["screensize", screename])
        well.screensize.save(patht, screename)
    # compress everything to one zip-file
    file_path = os.path.join(path, name)
    with zipfile.ZipFile(file_path, "w") as zfile:
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
        zfile.write(os.path.join(patht, radiuname), radiuname)
        zfile.write(os.path.join(patht, coordname), coordname)
        zfile.write(os.path.join(patht, welldname), welldname)
        zfile.write(os.path.join(patht, aquifname), aquifname)
        zfile.write(os.path.join(patht, screename), screename)
    # delete the temporary directory
    shutil.rmtree(patht, ignore_errors=True)
    return file_path


def save_campaign(campaign, path="", name=None):
    """Save the campaign to file.

    This writes the campaign to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Cmp_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".cmp"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Cmp_" + campaign.name
    # ensure the name ends with '.cmp'
    if name[-4:] != ".cmp":
        name += ".cmp"
    name = _formname(name)
    # create temporal directory for the included files
    patht = tempfile.mkdtemp(dir=path)
    # write the csv-file
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Campaign"])
        writer.writerow(["name", campaign.name])
        writer.writerow(["description", campaign.description])
        writer.writerow(["timeframe", campaign.timeframe])
        # define names for the variable-files
        if campaign.fieldsite is not None:
            fieldsname = name[:-4] + "_Fieldsite.fds"
            # save variable-files
            writer.writerow(["fieldsite", fieldsname])
            campaign.fieldsite.save(patht, fieldsname)
        else:
            writer.writerow(["fieldsite", "None"])

        wkeys = tuple(campaign.wells.keys())
        writer.writerow(["Wells", len(wkeys)])
        wellsname = {}
        for k in wkeys:
            wellsname[k] = name[:-4] + "_" + k + "_Well.wel"
            writer.writerow([k, wellsname[k]])
            campaign.wells[k].save(patht, wellsname[k])

        tkeys = tuple(campaign.tests.keys())
        writer.writerow(["Tests", len(tkeys)])
        testsname = {}
        for k in tkeys:
            testsname[k] = name[:-4] + "_" + k + "_Test.tst"
            writer.writerow([k, testsname[k]])
            campaign.tests[k].save(patht, testsname[k])

    # compress everything to one zip-file
    file_path = os.path.join(path, name)
    with zipfile.ZipFile(file_path, "w") as zfile:
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
        if campaign.fieldsite is not None:
            zfile.write(os.path.join(patht, fieldsname), fieldsname)
        for k in wkeys:
            zfile.write(os.path.join(patht, wellsname[k]), wellsname[k])
        for k in tkeys:
            zfile.write(os.path.join(patht, testsname[k]), testsname[k])
    # delete the temporary directory
    shutil.rmtree(patht, ignore_errors=True)
    return file_path


def save_fieldsite(fieldsite, path="", name=None):
    """Save a field site to file.

    This writes the field site to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Field_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".fds"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Field_" + fieldsite.name
    # ensure the name ends with '.fds'
    if name[-4:] != ".fds":
        name += ".fds"
    name = _formname(name)
    # create temporal directory for the included files
    patht = tempfile.mkdtemp(dir=path)
    # write the csv-file
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Fieldsite"])
        writer.writerow(["name", fieldsite.name])
        writer.writerow(["description", fieldsite.description])
        # define names for the variable-files
        if fieldsite.coordinates is not None:
            coordname = name[:-4] + "_CooVar.var"
            # save variable-files
            writer.writerow(["coordinates", coordname])
            fieldsite.coordinates.save(patht, coordname)
        else:
            writer.writerow(["coordinates", "None"])
    # compress everything to one zip-file
    file_path = os.path.join(path, name)
    with zipfile.ZipFile(file_path, "w") as zfile:
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
        if fieldsite.coordinates is not None:
            zfile.write(os.path.join(patht, coordname), coordname)
    # delete the temporary directory
    shutil.rmtree(patht, ignore_errors=True)
    return file_path


def save_pumping_test(pump_test, path="", name=None):
    """Save a pumping test to file.

    This writes the variable to a csv file.

    Parameters
    ----------
    path : :class:`str`, optional
        Path where the variable should be saved. Default: ``""``
    name : :class:`str`, optional
        Name of the file. If ``None``, the name will be generated by
        ``"Test_"+name``. Default: ``None``

    Notes
    -----
    The file will get the suffix ``".tst"``.
    """
    path = os.path.normpath(path)
    # create the path if not existing
    if not os.path.exists(path):
        os.makedirs(path)
    # create a standard name if None is given
    if name is None:
        name = "Test_" + pump_test.name
    # ensure the name ends with '.tst'
    if name[-4:] != ".tst":
        name += ".tst"
    name = _formname(name)
    # create temporal directory for the included files
    patht = tempfile.mkdtemp(dir=path)
    # write the csv-file
    with open(os.path.join(patht, "info.csv"), "w") as csvf:
        writer = csv.writer(
            csvf, quoting=csv.QUOTE_NONNUMERIC, lineterminator="\n"
        )
        writer.writerow(["wtp-version", __version__])
        writer.writerow(["Testtype", "PumpingTest"])
        writer.writerow(["name", pump_test.name])
        writer.writerow(["description", pump_test.description])
        writer.writerow(["timeframe", pump_test.timeframe])
        writer.writerow(["pumpingwell", pump_test.pumpingwell])
        # define names for the variable-files (file extension added autom.)
        pumprname = name[:-4] + "_PprVar"
        aquidname = name[:-4] + "_AqdVar"
        aquirname = name[:-4] + "_AqrVar"
        # save variable-files
        pumpr_path = pump_test.pumpingrate.save(patht, pumprname)
        pumpr_base = os.path.basename(pumpr_path)
        writer.writerow(["pumpingrate", pumpr_base])
        aquid_path = pump_test.aquiferdepth.save(patht, aquidname)
        aquid_base = os.path.basename(aquid_path)
        writer.writerow(["aquiferdepth", aquid_base])
        aquir_path = pump_test.aquiferradius.save(patht, aquirname)
        aquir_base = os.path.basename(aquir_path)
        writer.writerow(["aquiferradius", aquir_base])
        okeys = tuple(pump_test.observations.keys())
        writer.writerow(["Observations", len(okeys)])
        obsname = {}
        for k in okeys:
            obsname[k] = name[:-4] + "_" + k + "_Obs.obs"
            writer.writerow([k, obsname[k]])
            pump_test.observations[k].save(patht, obsname[k])
    # compress everything to one zip-file
    file_path = os.path.join(path, name)
    with zipfile.ZipFile(file_path, "w") as zfile:
        zfile.write(os.path.join(patht, "info.csv"), "info.csv")
        zfile.write(pumpr_path, pumpr_base)
        zfile.write(aquir_path, aquir_base)
        zfile.write(aquid_path, aquid_base)
        for k in okeys:
            zfile.write(os.path.join(patht, obsname[k]), obsname[k])
    # delete the temporary directory
    shutil.rmtree(patht, ignore_errors=True)
    return file_path


# LOAD ###


def _load_var_data(data):
    # default version string
    version_string = "1.0.0"
    first_line = _nextr(data)
    if first_line[0] == "wtp-version":
        version_string = first_line[1]
        header = _nextr(data)
    else:
        header = first_line
    version = version_parse(version_string)
    _check_version(version)
    if header[0] != "Variable":
        raise ValueError(
            f"load_var: expected 'Variable' but got '{header[0]}'"
        )
    name = next(data)[1]
    symbol = next(data)[1]
    units = next(data)[1]
    description = next(data)[1]
    integer = next(data)[0] == "integer"
    shapenfo = _nextr(data)
    if shapenfo[0] == "scalar":
        if integer:
            value = int(next(data)[1])
        else:
            value = float(next(data)[1])
    else:
        shape = tuple(np.array(shapenfo[1:], dtype=int))
        vcnt = int(next(data)[1])
        vlist = []
        for __ in range(vcnt):
            vlist.append(next(data)[0])
        if integer:
            value = np.array(vlist, dtype=int).reshape(shape)
        else:
            value = np.array(vlist, dtype=float).reshape(shape)

    return varlib.Variable(name, value, symbol, units, description)


[docs]def load_var(varfile): """Load a variable from file. This reads a variable from a csv file. Parameters ---------- varfile : :class:`str` Path to the file """ cleanup = False try: # read file data_file = open(varfile, "r") except TypeError: # if it is an instance of TextIOWrapper try: # read stream data = csv.reader(varfile) except Exception as exc: raise LoadError( f"load_var: couldn't read file '{varfile}'" ) from exc else: data = csv.reader(data_file) cleanup = True try: var = _load_var_data(data) except Exception as exc: raise LoadError( f"load_var: couldn't load variable '{varfile}'" ) from exc if cleanup: data_file.close() return var
[docs]def load_obs(obsfile): """Load an observation from file. This reads a observation from a csv file. Parameters ---------- obsfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(obsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Observation": raise ValueError( f"load_obs: expected 'Observation' but got '{header[0]}'" ) name = next(data)[1] steady = next(data)[1] == "steady" description = next(data)[1] if not steady: timef = next(data)[1] obsf = next(data)[1] # read time if not steady time = None if not steady: time = load_var(TxtIO(zfile.open(timef))) # read observation obs = load_var(TxtIO(zfile.open(obsf))) # generate observation object observation = varlib.Observation(name, obs, time, description) except Exception as exc: raise LoadError( f"load_obs: couldn't load observation '{obsfile}'" ) from exc return observation
[docs]def load_well(welfile): """Load a well from file. This reads a well from a csv file. Parameters ---------- welfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(welfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Well": raise ValueError( f"load_well: expected 'Well' but got '{header[0]}'" ) name = next(data)[1] # radius radf = next(data)[1] rad = load_var(TxtIO(zfile.open(radf))) # coordinates coordf = next(data)[1] coord = load_var(TxtIO(zfile.open(coordf))) # well depth welldf = next(data)[1] welld = load_var(TxtIO(zfile.open(welldf))) # aquifer depth aquidf = next(data)[1] aquid = load_var(TxtIO(zfile.open(aquidf))) # read screensize implemented in v1.1 screend = None if version.release >= (1, 1): screenf = next(data)[1] screend = load_var(TxtIO(zfile.open(screenf))) well = varlib.Well(name, rad, coord, welld, aquid, screend) except Exception as exc: raise LoadError(f"load_well: couldn't load well '{welfile}'") from exc return well
[docs]def load_campaign(cmpfile): """Load a campaign from file. This reads a campaign from a csv file. Parameters ---------- cmpfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(cmpfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Campaign": raise ValueError( f"load_campaign: expected 'Campaign' but got '{header[0]}'" ) name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] row = _nextr(data) if row[1] == "None": fieldsite = None else: fieldsite = load_fieldsite(BytIO(zfile.read(row[1]))) wcnt = int(next(data)[1]) wells = {} for __ in range(wcnt): row = _nextr(data) wells[row[0]] = load_well(BytIO(zfile.read(row[1]))) tcnt = int(next(data)[1]) tests = {} for __ in range(tcnt): row = _nextr(data) tests[row[0]] = load_test(BytIO(zfile.read(row[1]))) campaign = campaignlib.Campaign( name, fieldsite, wells, tests, timeframe, description ) except Exception as exc: raise LoadError( f"load_campaign: couldn't load campaign '{cmpfile}'" ) from exc return campaign
[docs]def load_fieldsite(fdsfile): """Load a field site from file. This reads a field site from a csv file. Parameters ---------- fdsfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(fdsfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Fieldsite": raise ValueError( "load_fieldsite: expected 'Fieldsite' " f"but got '{header[0]}'" ) name = next(data)[1] description = next(data)[1] coordinfo = next(data)[1] if coordinfo == "None": coordinates = None else: coordinates = load_var(TxtIO(zfile.open(coordinfo))) fieldsite = campaignlib.FieldSite(name, description, coordinates) except Exception as exc: raise LoadError( f"load_fieldsite: couldn't load fieldsite '{fdsfile}'" ) from exc return fieldsite
[docs]def load_test(tstfile): """Load a test from file. This reads a test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[0] != "Testtype": raise ValueError( f"load_test: expected 'Testtype' but got '{header[0]}'" ) if header[1] == "PumpingTest": routine = _load_pumping_test else: raise ValueError(f"load_test: unknown test type '{header[1]}'") except Exception as exc: raise LoadError(f"load_test: couldn't load test '{tstfile}'") from exc return routine(tstfile)
def _load_pumping_test(tstfile): """Load a pumping test from file. This reads a pumping test from a csv file. Parameters ---------- tstfile : :class:`str` Path to the file """ # default version string version_string = "1.0.0" try: with zipfile.ZipFile(tstfile, "r") as zfile: info = TxtIO(zfile.open("info.csv")) data = csv.reader(info) first_line = _nextr(data) if first_line[0] == "wtp-version": version_string = first_line[1] header = _nextr(data) else: header = first_line version = version_parse(version_string) _check_version(version) if header[1] != "PumpingTest": raise ValueError( f"load_test: expected 'PumpingTest' but got '{header[1]}'" ) name = next(data)[1] description = next(data)[1] timeframe = next(data)[1] pumpingwell = next(data)[1] rate_raw = TxtIO(zfile.open(next(data)[1])) try: pumpingrate = load_var(rate_raw) except Exception: pumpingrate = load_obs(rate_raw) aquiferdepth = load_var(TxtIO(zfile.open(next(data)[1]))) aquiferradius = load_var(TxtIO(zfile.open(next(data)[1]))) obscnt = int(next(data)[1]) observations = {} for __ in range(obscnt): row = _nextr(data) observations[row[0]] = load_obs(BytIO(zfile.read(row[1]))) pumpingtest = testslib.PumpingTest( name, pumpingwell, pumpingrate, observations, aquiferdepth, aquiferradius, description, timeframe, ) except Exception as exc: raise LoadError( f"load_test: couldn't load pumpingtest '{tstfile}'" ) from exc return pumpingtest