Source code for ogs5py.reader.reader

# -*- coding: utf-8 -*-
"""Reader for the OGS5 Output."""


import glob
import os

import numpy as np
from vtk import (
    vtkDataReader,
    vtkOutputWindow,
    vtkStringOutputWindow,
    vtkXMLFileReadTester,
)

from ogs5py.reader.techelper import readtec_multi_table, readtec_single_table
from ogs5py.reader.vtkhelper import XMLreader_dict, vtkreader_dict
from ogs5py.tools.output import readpvd_single, split_ply_path, split_pnt_path
from ogs5py.tools.tools import split_file_path
from ogs5py.tools.types import PCS_TYP

# redirect VTK error to a string
VTK_ERR = vtkStringOutputWindow()
"""VTK Error output. Accessible with ``VTK_ERR.GetOutput()``"""

VTK_STD_OUT = vtkOutputWindow()
VTK_STD_OUT.SetInstance(VTK_ERR)


###############################################################################
# vtk readers
###############################################################################


def readvtk_single(infile):
    """Read an arbitrary vtk/vtkXML file to a dictionary wtih its data."""
    xml_checker = vtkXMLFileReadTester()
    xml_checker.SetFileName(infile)
    is_xml = bool(xml_checker.TestReadFile())
    if is_xml:
        # check for vtk-XML-type
        xml_type = xml_checker.GetFileDataType()
        if xml_type in XMLreader_dict:
            reader = XMLreader_dict[xml_type]
        else:
            print(
                infile + ": XML file not valid (Type '" + str(xml_type) + "')"
            )
            if xml_type == "Collection":
                print("...try the 'readpvd' function")
            return {}
    else:
        # check for vtk-type
        checker = vtkDataReader()
        checker.SetFileName(infile)
        # get the right reader depending on the datasettype
        reader_found = False
        for datasettype in vtkreader_dict:
            if checker.IsFileValid(datasettype):
                reader = vtkreader_dict[datasettype]
                reader_found = True
                break
        if not reader_found:
            print(infile + ": vtk file not valid")
            checker.CloseVTKFile()
            return {}
        checker.CloseVTKFile()

    # read in the vtk object
    vtk_reader = reader[0]()
    vtk_reader.SetFileName(infile)
    if not is_xml:
        # https://stackoverflow.com/a/35018175/6696397
        vtk_reader.ReadAllScalarsOn()
        vtk_reader.ReadAllVectorsOn()
        vtk_reader.ReadAllNormalsOn()
        vtk_reader.ReadAllTensorsOn()
        vtk_reader.ReadAllColorScalarsOn()
        vtk_reader.ReadAllTCoordsOn()
        vtk_reader.ReadAllFieldsOn()
    vtk_reader.Update()
    file_obj = vtk_reader.GetOutput()

    # convert vtk object to a dictionary of numpy arrays
    output = reader[1](file_obj)
    if not is_xml:
        output["header"] = vtk_reader.GetHeader()
    else:
        output["header"] = "vtk-XML file"

    return output


[docs]def readvtk(task_root=".", task_id=None, pcs="ALL", single_file=None): r""" A genearal reader for OGS vtk outputfiles. give a dictionary containing their data the Filename of the pvd is structured the following way: {task_id}[_{PCS}]xxxx.vtk thereby the "_{PCS}" is optional and just present if a PCS_TYPE is specified in the \*.out file Parameters ---------- task_root : string, optional string containing the path to the directory containing the ogs output Default : "." task_id : string, optional string containing the file name of the ogs task without extension Default : None pcs : string or None, optional specify the PCS type that should be collected Possible values are: - None/"" (no PCS_TYPE specified in \*.out) - "NO_PCS" - "GROUNDWATER_FLOW" - "LIQUID_FLOW" - "RICHARDS_FLOW" - "AIR_FLOW" - "MULTI_PHASE_FLOW" - "PS_GLOBAL" - "HEAT_TRANSPORT" - "DEFORMATION" - "MASS_TRANSPORT" - "OVERLAND_FLOW" - "FLUID_MOMENTUM" - "RANDOM_WALK" You can get a list with all known PCS-types by setting PCS="ALL" Default : None single_file : string or None, optional If you want to read just a single file, you can set the path here. Default : None Returns ------- result : dict keys are the point names and the items are the data from the corresponding files if pcs="ALL", the output is a dictionary with the PCS-types as keys """ # for a single file return the output immediately if single_file is not None: return readvtk_single(single_file) if pcs is None: pcs = "" # if pcs is "ALL" iterate over all known PCS types if pcs == "ALL": out = {} for pcs_single in PCS_TYP: out_single = readvtk(task_root, task_id, pcs_single) if out_single: out[pcs_single] = out_single return out # in the filename, there is a underscore before the PCS-type if pcs != "": pcs = "_" + pcs # YEAHAA.. inconsistency if pcs == "_RANDOM_WALK": pcs = "_RWPT" output = {} # format task_root proper as directory path task_root = os.path.normpath(task_root) # get a list of all output files "{id}0000.vtk" ... "{id}999[...]9.vtk" # if pcs is RWPT the name-sheme is different if pcs == "_RWPT": infiles = glob.glob( os.path.join(task_root, task_id + pcs + "_[0-9]*.particles.vtk") ) else: infiles = glob.glob( os.path.join( task_root, task_id + pcs + "[0-9][0-9][0-9]*[0-9].vtk" ) ) # sort input files by name, since they are sorted by timesteps infiles.sort() # iterate over all input files time = [] data = [] if not infiles: return output for infile in infiles: # read the single vtk-file out = readvtk_single(infile) # in the RWPT files the TIME is not given as field_data but in header if pcs == "_RWPT" and "header" in out: if out["header"] and "=" in out["header"]: # ndmin = 1 to match the standard format out["field_data"]["TIME"] = np.array( float(out["header"].split("=")[1]), ndmin=1 ) if "field_data" in out and "TIME" in out["field_data"]: time.append(out["field_data"]["TIME"]) data.append(out) else: raise ValueError("vtk file not valid: " + infile) # sort the time-steps time = np.squeeze(time) time_sort = np.argsort(time) time = time[time_sort] data_sort = [] for new_pos in time_sort: data_sort.append(data[new_pos]) # sort output by timesteps output["TIME"] = time output["DATA"] = data_sort return output
############################################################################### # pvd readers ###############################################################################
[docs]def readpvd(task_root=".", task_id=None, pcs="ALL", single_file=None): r""" Read a paraview pvd file. Convert all concerned files to a dictionary containing their data. the Filename of the pvd is structured the following way: {task_id}[_{PCS}].pvd thereby the "_{PCS}" is optional and just present if a PCS_TYPE is specified in the \*.out file Parameters ---------- task_root : string, optional string containing the path to the directory containing the ogs output Default : "." task_id : string, optional string containing the file name of the ogs task without extension Default : None pcs : string or None, optional specify the PCS type that should be collected Possible values are: - None/"" (no PCS_TYPE specified in \*.out) - "NO_PCS" - "GROUNDWATER_FLOW" - "LIQUID_FLOW" - "RICHARDS_FLOW" - "AIR_FLOW" - "MULTI_PHASE_FLOW" - "PS_GLOBAL" - "HEAT_TRANSPORT" - "DEFORMATION" - "MASS_TRANSPORT" - "OVERLAND_FLOW" - "FLUID_MOMENTUM" - "RANDOM_WALK" You can get a list with all known PCS-types by setting PCS="ALL" Default : None single_file : string or None, optional If you want to read just a single file, you can set the path here. Default : None Returns ------- result : dict keys are the point names and the items are the data from the corresponding files if pcs="ALL", the output is a dictionary with the PCS-types as keys """ # for a single file return the output immediately if single_file is not None: root, ext = os.path.splitext(single_file) if ext != ".pvd": print("not a '*.pvd' file") return {} task_root, task_id = os.path.split(root) if task_root == "": task_root = "." return readpvd(task_root, task_id, pcs="") if pcs is None: pcs = "" # if pcs is "ALL" iterate over all known PCS types if pcs == "ALL": out = {} for pcs_single in PCS_TYP: out_single = readpvd(task_root, task_id, pcs_single) if out_single != {}: out[pcs_single] = out_single return out # in the filename, there is a underscore before the PCS-type if pcs != "": pcs = "_" + pcs output = {} # format task_root proper as directory path task_root = os.path.normpath(task_root) infile = os.path.join(task_root, task_id + pcs + ".pvd") # get the pvd information about the concerned files pvd_info = readpvd_single(infile) # if pvd is empty: return if not pvd_info: return output # initialize output-time time = [] for info in pvd_info["infos"]: time.append(info["timestep"]) time = np.array(time) time_sort = np.argsort(time) time = time[time_sort] files = [] for new_pos in time_sort: files.append(pvd_info["files"][new_pos]) data = [] # iterate over all input files for file_i in files: # format the file-path if split_file_path(file_i)[0] in ["", "."]: file_i = os.path.join( task_root, "".join(split_file_path(file_i)[1:]) ) # read the file data.append(readvtk_single(file_i)) # append the infos stored in the pvd header output["TIME"] = time output["DATA"] = data return output
############################################################################### # High level Tecplot reader ###############################################################################
[docs]def readtec_point(task_root=".", task_id=None, pcs="ALL", single_file=None): r""" Collect TECPLOT point output from OGS5. the Filenames are structured the following way: {task_id}_time_{NAME}[_{PCS+extra}].tec thereby the "_{PCS}" is optional and just present if a PCS_TYPE is specified in the \*.out file the "extra" will not be recognized and will destroy the search-process Parameters ---------- task_root : string, optional string containing the path to the directory containing the ogs output Default : "." task_id : string, optional string containing the file name of the ogs task without extension Default : None pcs : string or None, optional specify the PCS type that should be collected Possible values are: - None/"" (no PCS_TYPE specified in \*.out) - "NO_PCS" - "GROUNDWATER_FLOW" - "LIQUID_FLOW" - "RICHARDS_FLOW" - "AIR_FLOW" - "MULTI_PHASE_FLOW" - "PS_GLOBAL" - "HEAT_TRANSPORT" - "DEFORMATION" - "MASS_TRANSPORT" - "OVERLAND_FLOW" - "FLUID_MOMENTUM" - "RANDOM_WALK" You can get a list with all known PCS-types by setting PCS="ALL" Default : None single_file : string or None, optional If you want to read just a single file, you can set the path here. Default : None Returns ------- result : dict keys are the point names and the items are the data from the corresponding files if pcs="ALL", the output is a dictionary with the PCS-types as keys """ # for a single file return the output immediately if single_file is not None: return readtec_single_table(single_file) assert task_id is not None, "You need to specify a task_id" # check PCS if pcs is None: pcs = "" # if pcs is "ALL" iterate over all known PCS types if pcs == "ALL": out = {} for pcs_single in PCS_TYP: out_single = readtec_point(task_root, task_id, pcs_single) if out_single != {}: out[pcs_single] = out_single return out task_root = os.path.normpath(task_root) # find point output by keyword "time" infiles = glob.glob(os.path.join(task_root, task_id + "_time_*." + "tec")) out = {} for infile in infiles: # get the information from the file-name _, pnt_name, file_pcs, _ = split_pnt_path(infile, task_id) # check if the given PCS type matches, else skip the file if file_pcs != pcs: continue # read the file out[pnt_name] = readtec_single_table(infile) return out
[docs]def readtec_polyline( task_root=".", task_id=None, pcs="ALL", single_file=None, trim=True ): r""" Collect TECPLOT polyline output from OGS5. the Filenames are structured the following way: {task_id}_ply_{NAME}_t{ply_id}[_{PCS}].tec thereby the "_{PCS}" is optional and just present if a PCS_TYPE is specified in the \*.out file Parameters ---------- task_root : string, optional string containing the path to the directory containing the ogs output Default : "." task_id : string, optional string containing the file name of the ogs task without extension Default : None pcs : string or None, optional specify the PCS type that should be collected Possible values are: - None/"" (no PCS_TYPE specified in \*.out) - "NO_PCS" - "GROUNDWATER_FLOW" - "LIQUID_FLOW" - "RICHARDS_FLOW" - "AIR_FLOW" - "MULTI_PHASE_FLOW" - "PS_GLOBAL" - "HEAT_TRANSPORT" - "DEFORMATION" - "MASS_TRANSPORT" - "OVERLAND_FLOW" - "FLUID_MOMENTUM" - "RANDOM_WALK" You can get a list with all known PCS-types by setting pcs="ALL" Default : None single_file : string or None, optional If you want to read just a single file, you can set the path here. Default : None trim : Bool, optional if the ply_ids are not continuous, there will be "None" values in the output list. If trim is "True" these values will be eliminated. If there is just one output for a polyline, the list will be eliminated and the output will be the single dict. Default : True Returns ------- result : dict keys are the Polyline names and the items are lists sorted by the ply_id (it is assumed, that the ply_ids are continuous, if not, the corresponding list entries are "None") if pcs="ALL", the output is a dictionary with the PCS-types as keys """ # for a single file return the output immediately if single_file is not None: return readtec_multi_table(single_file) if pcs is None: pcs = "" # if pcs is "ALL" iterate over all known PCS types if pcs == "ALL": out = {} for pcs_single in PCS_TYP: out_single = readtec_polyline(task_root, task_id, pcs_single) if out_single != {}: out[pcs_single] = out_single return out # format the root_path task_root = os.path.normpath(task_root) infiles = glob.glob( os.path.join(task_root, task_id + "_ply_?*_t[0-9]*.tec") ) # sort the infiles by name to sort it by timestep (pitfall!!!) infiles.sort() out = {} for infile in infiles: # get the information from the file-name _, line_name, time_step, file_pcs, _ = split_ply_path(infile, task_id) # check if the given PCS type matches, else skip the file if file_pcs != pcs: continue # check if the given polyline is already listed if line_name in out: cnt = len(out[line_name]) if cnt <= time_step: # if the timesteps are not continous, insert None-values out[line_name] += (1 + time_step - cnt) * [None] else: out[line_name] = (1 + time_step) * [None] # add the actual file-data out[line_name][time_step] = readtec_multi_table(infile) if trim: for line in out: out[line] = [line_i for line_i in out[line] if line_i is not None] if len(out[line]) == 1: out[line] = out[line][0] return out
def readtec_domain(): """ A dummy for the TECPLOT-domain output of OGS. It is not implemented, because the output is separated by element types, where VTK works out much better. """ raise NotImplementedError( "Reader for Tecplot domain " + "output not yet implemented " + "....please(!) generate vtk/pvd here" )