# -*- coding: utf-8 -*-
"""
Tools for ogs5py output files (independent from VTK package).
.. currentmodule:: ogs5py.tools.output
Helpers
^^^^^^^
.. autosummary::
:toctree:
get_output_files
readpvd_single
split_ply_path
split_pnt_path
"""
import glob
import os
import re
import xml.etree.ElementTree as ET
import numpy as np
from ogs5py.tools.types import PCS_TYP
###############################################################################
# retrieve infos from ogs-filenames
###############################################################################
[docs]def split_pnt_path(
infile,
task_id=None,
pnt_name=None,
PCS_name=None,
split_extra=False,
guess_PCS=False,
):
"""
Retrive ogs-infos from filename for tecplot-polyline output.
{id}_time_{pnt}[_{PCS+extra}].tec
"""
# create a workaround for empty PCS string (which is valid)
if PCS_name == "":
temp_id, temp_pnt, temp_PCS, __ = split_pnt_path(
infile=infile,
task_id=task_id,
pnt_name=None,
PCS_name=None,
split_extra=False,
guess_PCS=False,
)
if temp_id is None:
return 4 * (None,)
endstring = temp_pnt + temp_PCS
PCS = ""
if pnt_name is None:
if split_extra:
# here we have to guess the POINT name and maybe an extra suf
# POINT name is guessed as a name without "_"
# the rest will be set as extra
split_pnt = endstring.find("_")
if split_pnt > -1:
pnt = endstring[: endstring.find("_")]
extra = endstring[endstring.find("_") + 1 :]
else:
pnt = endstring
extra = ""
else:
pnt = endstring
extra = ""
else:
if endstring.startswith(pnt_name):
pnt = pnt_name
extra = endstring[len(pnt) :]
if not split_extra and extra != "":
return 4 * (None,)
return temp_id, pnt, PCS, extra
# remove the directory-part from the filepath to get the basename
name = os.path.basename(infile)
# search for the suffix (aka file ending)
suffix_pat = re.compile(r"\.tec$")
suffix_match = suffix_pat.search(name)
# check for the task_id
if task_id is None:
prefix_pat = re.compile("_time_")
prefix_match = prefix_pat.search(name)
if prefix_match is None:
return 4 * (None,)
id_name = name[: prefix_match.span()[0]]
else:
prefix_pat = re.compile("^" + re.escape(task_id) + "_time_")
id_name = task_id
prefix_match = prefix_pat.search(name)
if prefix_match is None:
return 4 * (None,)
if pnt_name is not None:
midtrm_pat = re.compile(
"^"
+ re.escape(id_name)
+ "_time_"
+ re.escape(pnt_name)
+ r"+[\._]"
)
midtrm_match = midtrm_pat.search(name)
if midtrm_match is None:
return 4 * (None,)
PCS = name[midtrm_match.span()[1] : suffix_match.span()[0]]
# check PCS
if PCS_name is None:
pcs_found = False
for pcs_sgl in PCS_TYP[1:]:
if PCS.startswith(pcs_sgl):
pcs_found = True
extra = PCS[len(pcs_sgl) :]
PCS = pcs_sgl
break
if not pcs_found:
extra = ""
else:
if PCS.startswith(PCS_name):
extra = PCS[len(PCS_name) :]
PCS = PCS_name
if not split_extra and extra != "":
return 4 * (None,)
else:
return 4 * (None,)
else:
# serch for the PCS
if PCS_name is None:
pcs_found = False
for pcs_sgl in PCS_TYP[1:]:
# create a pattern to search the actual pcs_type
midtrm_pat = re.compile(
"^"
+ re.escape(id_name)
+ "_time_[^_]+.*_"
+ re.escape(pcs_sgl)
)
midtrm_match = midtrm_pat.search(name)
# if found retrive the PCS name
if midtrm_match is not None:
pcs_found = True
PCS = name[
midtrm_match.span()[1]
- len(pcs_sgl) : suffix_match.span()[0]
]
# cut off extra suffix from PCS
extra = PCS[len(pcs_sgl) :]
PCS = PCS[: len(pcs_sgl)]
# retrive the pnt name from the file-path
PCS_pat = re.compile(
"_" + re.escape(PCS + extra) + r"\.tec$"
)
PCS_match = PCS_pat.search(name)
pnt = name[prefix_match.span()[1] : PCS_match.span()[0]]
break
if not pcs_found:
if guess_PCS:
# here we have to guess the POINT name and maybe a PCS type
# POINT name is guessed as a name without "_"
# the rest will be set as PCS
midtrm_pat = re.compile(
"^" + re.escape(id_name) + r"_time_[^_]+[\._]"
)
midtrm_match = midtrm_pat.search(name)
if midtrm_match is None:
return 4 * (None,)
pnt = name[
prefix_match.span()[1] : midtrm_match.span()[1] - 1
]
PCS = name[midtrm_match.span()[1] : suffix_match.span()[0]]
extra = ""
else:
pnt = name[prefix_match.span()[1] : suffix_match.span()[0]]
PCS = ""
extra = ""
else:
PCS_pat = re.compile("_" + re.escape(PCS_name)) # +".*\.tec$")
PCS_match = PCS_pat.search(name)
if PCS_match is None:
return 4 * (None,)
pnt = name[prefix_match.span()[1] : PCS_match.span()[0]]
extra = name[PCS_match.span()[1] : suffix_match.span()[0]]
# PCS was given, extras should not be split and extra != ""
# thus we get a contradiction
if (not split_extra) and extra != "":
return 4 * (None,)
if not split_extra:
PCS = PCS + extra
extra = ""
elif extra.startswith("_"):
extra = extra[1:]
elif extra != "":
# if PCS starts with given PCS but there's an extra suffix not
# separated by an "_" return None
return 4 * (None,)
return id_name, pnt, PCS, extra
[docs]def split_ply_path(
infile, task_id=None, line_name=None, PCS_name=None, split_extra=False
):
"""
Retrive ogs-infos from filename for tecplot-polyline output.
{id}_ply_{line}_t{n}[_{PCS+extra}].tec
"""
# remove the directory-part from the filepath to get the basename
name = os.path.basename(infile)
# check for the task_id
if task_id is None:
prefix_pat = re.compile("_ply_")
id_name = name[: prefix_pat.search(name).span()[0]]
else:
prefix_pat = re.compile("^" + re.escape(task_id) + "_ply_")
id_name = task_id
# search for different parts in the string
midtrm_pat = re.compile(r"_t\d+[\._]")
suffix_pat = re.compile(r"\.tec$")
prefix_match = prefix_pat.search(name)
midtrm_match = midtrm_pat.search(name)
suffix_match = suffix_pat.search(name)
# if anything was not found, return None for everything
if prefix_match is None or midtrm_match is None or suffix_match is None:
return 5 * (None,)
# get the infos from the file-name
line = name[prefix_match.span()[1] : midtrm_match.span()[0]]
step = int(name[midtrm_match.span()[0] + 2 : midtrm_match.span()[1] - 1])
PCS = name[midtrm_match.span()[1] : suffix_match.span()[0]]
if line_name is not None and line_name != line:
return 5 * (None,)
if PCS_name is None:
pcs_found = False
for pcs_sgl in PCS_TYP[1:]:
if PCS.startswith(pcs_sgl):
pcs_found = True
extra = PCS[len(pcs_sgl) :]
PCS = pcs_sgl
break
if not pcs_found:
extra = ""
else:
if PCS.startswith(PCS_name):
extra = PCS[len(PCS_name) :]
PCS = PCS_name
if not split_extra and extra != "":
return 5 * (None,)
else:
return 5 * (None,)
if not split_extra:
PCS = PCS + extra
if PCS_name is not None and extra != "":
return 5 * (None,)
extra = ""
elif extra.startswith("_"):
extra = extra[1:]
elif extra != "" and PCS_name != "":
# if PCS starts with given PCS (not "") but there's an extra suffix not
# separated by an "_" return None
return 5 * (None,)
return id_name, line, step, PCS, extra
[docs]def readpvd_single(infile):
"""
Read a paraview pvd file.
Convert all concerned files to a dictionary containing their data.
"""
output = {}
# read the pvd file as XML and extract the needed file infos
if not os.path.isfile(infile):
return output
info_root = ET.parse(infile).getroot()
pvd_info = info_root.attrib
files = []
infos = []
# iterate through the data collection
for dataset in info_root[0]:
files.append(dataset.attrib["file"])
infos.append(dataset.attrib)
del infos[-1]["file"]
if "timestep" in infos[-1]:
infos[-1]["timestep"] = float(infos[-1]["timestep"])
if "part" in infos[-1]:
infos[-1]["part"] = int(infos[-1]["part"])
output["pvd_info"] = pvd_info
output["files"] = files
output["infos"] = infos
return output
[docs]def get_output_files(task_root, task_id, pcs=None, typ="VTK", element=None):
r"""
Get a list of output file paths.
Parameters
----------
task_root : string
string containing the path to the directory containing the ogs output
task_id : string
string containing the file name of the ogs task without extension
pcs : string or None, optional
specify the PCS type that should be collected
Possible values are:
- None/"" (no PCS_TYPE specified in \*.out)
- "NO_PCS"
- "GROUNDWATER_FLOW"
- "LIQUID_FLOW"
- "RICHARDS_FLOW"
- "AIR_FLOW"
- "MULTI_PHASE_FLOW"
- "PS_GLOBAL"
- "HEAT_TRANSPORT"
- "DEFORMATION"
- "MASS_TRANSPORT"
- "OVERLAND_FLOW"
- "FLUID_MOMENTUM"
- "RANDOM_WALK"
Default : None
typ : string, optional
Type of the output ("VTK", "PVD", "TEC_POINT" or "TEC_POLYLINE").
Default : "VTK"
element : string or None, optional
For tecplot output you can specify the name of the output element.
(Point-name of Line-name from GLI file)
Default: None
"""
typ = typ.upper()
if pcs is None:
pcs = ""
# if pcs is "ALL" iterate over all known PCS types
if pcs == "ALL":
raise ValueError("get_output_files: specifiy a single PCS not 'ALL'.")
# format task_root proper as directory path
task_root = os.path.normpath(task_root)
if typ == "VTK":
# in the filename, there is a underscore before the PCS-type
if pcs != "":
pcs = "_" + pcs
# YEAHAA.. inconsistency
if pcs == "_RANDOM_WALK":
pcs = "_RWPT"
# get a list of all output files "{id}0000.vtk" ... "{id}999[...]9.vtk"
# if pcs is RWPT the name-sheme is different
if pcs == "_RWPT":
files = glob.glob(
os.path.join(
task_root, task_id + pcs + "_[0-9]*.particles.vtk"
)
)
else:
files = glob.glob(
os.path.join(
task_root, task_id + pcs + "[0-9][0-9][0-9]*[0-9].vtk"
)
)
files.sort()
elif typ == "PVD":
# in the filename, there is a underscore before the PCS-type
if pcs != "":
pcs = "_" + pcs
infile = os.path.join(task_root, task_id + pcs + ".pvd")
# get the pvd information about the concerned files
pvd_info = readpvd_single(infile)
# if pvd is empty: return
if not pvd_info:
return []
# initialize output-time
time = []
for info in pvd_info["infos"]:
time.append(info["timestep"])
time = np.array(time)
time_sort = np.argsort(time)
files = []
for new_pos in time_sort:
files.append(pvd_info["files"][new_pos])
elif typ == "TEC_POINT":
# find point output by keyword "time"
infiles = glob.glob(
os.path.join(task_root, task_id + "_time_*." + "tec")
)
infiles.sort()
files = []
for infile in infiles:
# get the information from the file-name
_, pnt_name, file_pcs, _ = split_pnt_path(infile, task_id)
# check if the given PCS type matches, else skip the file
if file_pcs == pcs and (element is None or element == pnt_name):
files.append(infile)
elif typ == "TEC_POLYLINE":
infiles = glob.glob(
os.path.join(task_root, task_id + "_ply_?*_t[0-9]*.tec")
)
# sort the infiles by name to sort it by timestep (pitfall!!!)
infiles.sort()
files = []
for infile in infiles:
# get the information from the file-name
_, line_name, _, file_pcs, _ = split_ply_path(infile, task_id)
# check if the given PCS type matches, else skip the file
if file_pcs == pcs and (element is None or element == line_name):
files.append(infile)
else:
raise ValueError(f"Unknown output typ: '{typ}'")
return files