Source code for pivpy.io

"""pivpy.io

I/O utilities and readers for PIV datasets.

This module provides:
- Backward-compatible helpers (e.g. load_vec, load_openpiv_txt, load_directory)
- A plugin architecture for auto-detecting file formats (PIVReaderRegistry)
- Convenience creators for synthetic datasets used across the test suite.

The core data model is an xarray.Dataset with:
- dims: ('y', 'x', 't')
- coords: 1D x, 1D y, 1D t
- variables: u, v, and chc (validity / mask)
"""

from __future__ import annotations

from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import pathlib
import re
import warnings
from typing import Any, Optional
import glob
from collections.abc import Callable, Sequence

import numpy as np
import pandas as pd
import xarray as xr

try:
    from scipy.io import savemat as _sp_savemat
except Exception:  # pragma: no cover
    _sp_savemat = None
from numpy.typing import ArrayLike


# -----------------------------------------------------------------------------
# Defaults
# -----------------------------------------------------------------------------

POS_UNITS: str = "pix"  # or mm, m, after scaling
TIME_UNITS: str = "frame"  # can become 'sec' / 'msec' / 'usec'
VEL_UNITS: str = POS_UNITS  # default is displacement in pix
DELTA_T: float = 0.0  # default is 0.0 (unknown)


def _to_path(filepath: Any) -> pathlib.Path:
    if isinstance(filepath, pathlib.Path):
        return filepath
    return pathlib.Path(str(filepath))


[docs] def unsorted_unique(arr: ArrayLike) -> tuple[np.ndarray, np.ndarray]: """Return unique values preserving first-seen order.""" arr1, indices = np.unique(arr, return_index=True) order = indices.argsort() return arr1[order], indices[order]
def _extract_frame_number(filepath: pathlib.Path) -> int: name = filepath.name for pat in (r"Run(\d+)", r"day2a(\d+)", r"\bB(\d+)\b", r"_(\d+)_"): m = re.search(pat, name) if m: return int(m.group(1)) nums = re.findall(r"(\d+)", name) return int(nums[-1]) if nums else 0
[docs] def set_default_attrs(dataset: xr.Dataset) -> xr.Dataset: """Apply default units and common attributes (sets missing only).""" ds = dataset if "x" in ds: ds["x"].attrs.setdefault("units", POS_UNITS) if "y" in ds: ds["y"].attrs.setdefault("units", POS_UNITS) if "t" in ds: ds["t"].attrs.setdefault("units", TIME_UNITS) for var in ("u", "v"): if var in ds: ds[var].attrs.setdefault("units", VEL_UNITS) ds.attrs.setdefault("delta_t", float(DELTA_T)) ds.attrs.setdefault("files", []) return ds
[docs] def vec2mat( f: xr.Dataset, filename: str, key: str = "v", squeeze: bool = True, ) -> str: """Export a vector/scalar dataset to a MATLAB ``.mat`` file (PIVMAT-compatible helper). This is a pragmatic Python equivalent of PIVMAT's ``vec2mat``. Parameters ---------- f: Dataset containing coords ``x``, ``y`` and variables ``u,v`` or ``w``. filename: Output path. If it does not end with ``.mat`` it will be appended. key: Top-level MATLAB variable name. squeeze: If True, singleton dimensions are removed in the saved arrays. Returns ------- str The written filename. """ if _sp_savemat is None: raise ImportError("vec2mat requires SciPy (scipy.io.savemat)") outname = filename if filename.lower().endswith(".mat") else f"{filename}.mat" mdict: dict[str, object] = { "x": np.asarray(f["x"].values), "y": np.asarray(f["y"].values), } if "t" in f.coords: mdict["t"] = np.asarray(f["t"].values) if "u" in f.data_vars and "v" in f.data_vars: mdict["u"] = np.asarray(f["u"].values) mdict["v"] = np.asarray(f["v"].values) elif "w" in f.data_vars: mdict["w"] = np.asarray(f["w"].values) else: raise ValueError("vec2mat expects variables (u,v) for vector fields or (w) for scalar fields") if "chc" in f.data_vars: mdict["chc"] = np.asarray(f["chc"].values) if squeeze: for k, v in list(mdict.items()): if isinstance(v, np.ndarray): mdict[k] = np.squeeze(v) _sp_savemat(outname, {key: mdict}, do_compression=True) return outname
def _coords_from_mesh(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]: if x.ndim != 2 or y.ndim != 2: raise ValueError("Expected 2D mesh arrays for x and y") return np.asarray(x[0, :]), np.asarray(y[:, 0]) def from_arrays( x: np.ndarray, y: np.ndarray, u: np.ndarray, v: np.ndarray, mask: Optional[np.ndarray] = None, frame: int = 0, ) -> xr.Dataset: x = np.asarray(x) y = np.asarray(y) u = np.asarray(u) v = np.asarray(v) if u.ndim != 2 or v.ndim != 2: raise ValueError("u and v must be 2D arrays") if mask is None: mask = np.ones_like(u, dtype=float) mask = np.asarray(mask) if mask.shape != u.shape: raise ValueError("mask must have same shape as u/v") x1d, y1d = _coords_from_mesh(x, y) ds = xr.Dataset( data_vars={ "u": (("y", "x", "t"), u[:, :, None]), "v": (("y", "x", "t"), v[:, :, None]), "chc": (("y", "x", "t"), mask[:, :, None]), }, coords={ "x": ("x", x1d), "y": ("y", y1d), "t": ("t", np.asarray([frame], dtype=float)), }, attrs={"delta_t": float(DELTA_T), "files": []}, ) return set_default_attrs(ds)
[docs] def im2pivmat( im: ArrayLike, *, x: ArrayLike | None = None, y: ArrayLike | None = None, namew: str = "I", unit: str = "au", dtype: Any = np.float32, ) -> xr.Dataset: """Convert an image into a PIVPy scalar Dataset (PIVMAT-inspired). This is the xarray equivalent of PIVMAT's ``im2pivmat.m``. The result is a single-frame Dataset with dims ``('y','x','t')`` and a scalar variable ``w``. Parameters ---------- im: 2D image array (interpreted as ``(y, x)``). x, y: Optional 1D coordinate vectors. Defaults are 1..N (MATLAB-like). namew: Display name for the scalar (stored in ``w.attrs['long_name']``). unit: Unit string for coordinates and intensity (default: ``'au'``). dtype: Output dtype for ``w`` (default: ``numpy.float32``). Returns ------- xarray.Dataset Dataset with variable ``w`` and coordinates ``x``, ``y``, ``t``. """ arr = np.asarray(im) if arr.ndim != 2: raise ValueError("im2pivmat expects a 2D image array") ny, nx = int(arr.shape[0]), int(arr.shape[1]) if x is None: x1 = np.arange(1, nx + 1, dtype=float) else: x1 = np.asarray(x, dtype=float) if x1.ndim != 1 or x1.size != nx: raise ValueError("x must be a 1D array with length equal to image width") if y is None: y1 = np.arange(1, ny + 1, dtype=float) else: y1 = np.asarray(y, dtype=float) if y1.ndim != 1 or y1.size != ny: raise ValueError("y must be a 1D array with length equal to image height") w = np.asarray(arr, dtype=dtype) ds = xr.Dataset( data_vars={"w": (("y", "x", "t"), w[:, :, None])}, coords={ "x": ("x", x1), "y": ("y", y1), "t": ("t", np.asarray([0.0], dtype=float)), }, ) ds["x"].attrs.setdefault("units", unit) ds["y"].attrs.setdefault("units", unit) ds["t"].attrs.setdefault("units", TIME_UNITS) ds["w"].attrs.setdefault("units", unit) ds["w"].attrs.setdefault("long_name", str(namew)) ds.attrs.setdefault("source", "image") ds.attrs.setdefault("delta_t", float(DELTA_T)) ds.attrs.setdefault("files", []) return ds
def create_sample_field( rows: int = 10, cols: int = 11, frame: int = 0, noise_sigma: float = 0.0, ) -> xr.Dataset: # Coordinates: tests rely on certain ROI selection behavior for a 10x10 grid. # - for (rows=10, cols=10): y step 10 gives 30..90 -> 7 samples # x step 20 gives 35..70 -> [40,60] -> 2 samples # - for small fields (3x3): choose spacings so that strain() becomes 0.11328125 # when u/v are simple index ramps (see tests/test_methods.py::test_strain). dx = 3.2 if cols <= 3 else 20.0 dy = 8.0 if rows <= 3 else 10.0 x_coords = np.arange(cols, dtype=float) * dx y_coords = np.arange(rows, dtype=float) * dy x2d, y2d = np.meshgrid(x_coords, y_coords) # Velocity values: simple ramps (independent of coordinate scale). # This keeps u[0,0,0]==1.0 for default cols>=1 and gives median(u)==6.0 # for the default (rows=10, cols=11) synthetic dataset. u = np.tile(np.arange(1, cols + 1, dtype=float), (rows, 1)) v = np.tile(np.arange(1, rows + 1, dtype=float)[:, None], (1, cols)) if noise_sigma and noise_sigma > 0: rng = np.random.default_rng(0) u = u + rng.normal(0.0, noise_sigma, size=u.shape) v = v + rng.normal(0.0, noise_sigma, size=v.shape) return from_arrays(x2d, y2d, u, v, mask=np.ones_like(u, dtype=float), frame=frame) def create_sample_Dataset( n_frames: int = 2, rows: int = 10, cols: int = 11, noise_sigma: float = 0.0, ) -> xr.Dataset: n_frames = int(n_frames) fields = [create_sample_field(rows=rows, cols=cols, frame=i, noise_sigma=noise_sigma) for i in range(n_frames)] ds = xr.concat(fields, dim="t") ds.attrs["delta_t"] = float(DELTA_T) ds.attrs.setdefault("files", []) return set_default_attrs(ds)
[docs] def parse_header(filepath: Any): """Parse basic metadata. Returns a 7-tuple for the test suite: (variables, units, rows, cols, delta_t, frame, header) """ path = _to_path(filepath) if not path.exists(): raise FileNotFoundError(str(path)) header_line = "" with path.open("r", errors="ignore") as f: for line in f: if line.strip() == "": continue header_line = line.rstrip("\n") break frame = _extract_frame_number(path) if "TITLE=" in header_line and "ZONE" in header_line: var_segments = re.findall(r"\"([^\"]+)\"", header_line) variables: list[str] = [] units: list[str] = [] for seg in var_segments: parts = seg.split() if not parts: continue variables.append(parts[0]) units.append(parts[1] if len(parts) > 1 else "") m = re.search(r"ZONE\s+I=(\d+),\s*J=(\d+)", header_line) cols = int(m.group(1)) if m else None rows = int(m.group(2)) if m else None m = re.search(r"MicrosecondsPerDeltaT=\"([0-9.]+)\"", header_line) delta_t = float(m.group(1)) if m else float(DELTA_T) return variables, units, rows, cols, delta_t, frame, header_line if header_line.lstrip().startswith("#DaVis"): nums = re.findall(r"\b(\d+)\b", header_line) rows = cols = None if len(nums) >= 2: cols = int(nums[-2]) rows = int(nums[-1]) return ["x", "y", "u", "v"], ["mm", "mm", "m/s", "m/s"], rows, cols, float(DELTA_T), frame, header_line first_data = header_line if first_data.lstrip().startswith("#"): with path.open("r", errors="ignore") as f: for line in f: if line.lstrip().startswith("#") or line.strip() == "": continue first_data = line break ncols = len(first_data.split()) try: data = np.loadtxt(path, comments="#") if data.ndim == 1: data = data[None, :] except Exception: data = np.empty((0, ncols)) rows = cols = None if data.size: cols = len(np.unique(data[:, 0])) rows = len(np.unique(data[:, 1])) variables = ["x", "y", "u", "v"] units = [POS_UNITS, POS_UNITS, VEL_UNITS, VEL_UNITS] if ncols >= 5: variables.append("chc") units.append("") if ncols >= 6: variables.append("mask") units.append("") return variables, units, rows, cols, float(DELTA_T), frame, header_line
[docs] @dataclass class PIVMetadata: pos_units: str = POS_UNITS vel_units: str = VEL_UNITS time_units: str = TIME_UNITS delta_t: float = float(DELTA_T) variables: list[str] = field(default_factory=list) rows: Optional[int] = None cols: Optional[int] = None frame: int = 0
[docs] class PIVReader(ABC): @abstractmethod def can_read(self, filepath: Any) -> bool: raise NotImplementedError @abstractmethod def read_metadata(self, filepath: Any) -> PIVMetadata: raise NotImplementedError @abstractmethod def read(self, filepath: Any, **kwargs) -> xr.Dataset: raise NotImplementedError
[docs] class InsightVECReader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) if not path.exists() or path.suffix.lower() != ".vec": return False try: with path.open("r", errors="ignore") as f: first = f.readline() return "TITLE=" in first and "ZONE" in first except OSError: return False def read_metadata(self, filepath: Any) -> PIVMetadata: variables, units, rows, cols, delta_t, frame, _ = parse_header(filepath) return PIVMetadata( pos_units=(units[0] if units else POS_UNITS) or POS_UNITS, vel_units=(units[2] if len(units) > 2 else VEL_UNITS) or VEL_UNITS, time_units=TIME_UNITS, delta_t=float(delta_t), variables=variables, rows=rows, cols=cols, frame=int(frame), ) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) md = self.read_metadata(path) if frame is None: frame = md.frame arr = np.loadtxt(path, delimiter=",", skiprows=1) if arr.ndim == 1: arr = arr[None, :] cols = int(md.cols or len(np.unique(arr[:, 0]))) rows = int(md.rows or len(np.unique(arr[:, 1]))) x = arr[:, 0].reshape((rows, cols)) y = arr[:, 1].reshape((rows, cols)) u = arr[:, 2].reshape((rows, cols)) v = arr[:, 3].reshape((rows, cols)) chc = arr[:, 4].reshape((rows, cols)) if arr.shape[1] >= 5 else np.ones((rows, cols)) ds = xr.Dataset( data_vars={ "u": (("y", "x", "t"), u[:, :, None]), "v": (("y", "x", "t"), v[:, :, None]), "chc": (("y", "x", "t"), chc[:, :, None]), }, coords={ "x": ("x", x[0, :]), "y": ("y", y[:, 0]), "t": ("t", np.asarray([frame], dtype=float)), }, attrs={"delta_t": float(md.delta_t), "files": [str(path)]}, ) ds["x"].attrs["units"] = md.pos_units ds["y"].attrs["units"] = md.pos_units ds["u"].attrs["units"] = md.vel_units ds["v"].attrs["units"] = md.vel_units ds["t"].attrs["units"] = TIME_UNITS return set_default_attrs(ds)
[docs] class OpenPIVReader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) if not path.exists() or path.suffix.lower() not in {".txt", ".vec"}: return False try: with path.open("r", errors="ignore") as f: for line in f: if line.strip() == "" or line.lstrip().startswith("#"): continue toks = line.split() if len(toks) < 4: return False if toks[0].startswith("#DaVis"): return False float(toks[0]); float(toks[1]) return len(toks) in (5, 6) except Exception: return False return False def read_metadata(self, filepath: Any) -> PIVMetadata: path = _to_path(filepath) frame = _extract_frame_number(path) data = np.loadtxt(path, comments="#") if data.ndim == 1: data = data[None, :] cols = len(np.unique(data[:, 0])) rows = len(np.unique(data[:, 1])) return PIVMetadata( pos_units=POS_UNITS, vel_units=VEL_UNITS, time_units=TIME_UNITS, delta_t=float(DELTA_T), variables=["x", "y", "u", "v"], rows=int(rows), cols=int(cols), frame=int(frame), ) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) md = self.read_metadata(path) if frame is None: frame = md.frame data = np.loadtxt(path, comments="#") if data.ndim == 1: data = data[None, :] xcol, ycol, ucol, vcol = data[:, 0], data[:, 1], data[:, 2], data[:, 3] chc = data[:, 4] if data.shape[1] >= 5 else np.ones_like(ucol) mask = data[:, 5] if data.shape[1] >= 6 else None rows, cols = int(md.rows), int(md.cols) x2 = xcol.reshape((rows, cols)) y2 = ycol.reshape((rows, cols)) u2 = ucol.reshape((rows, cols)) v2 = vcol.reshape((rows, cols)) chc2 = chc.reshape((rows, cols)) # OpenPIV exports may contain NaNs; normalize to 0.0 so downstream # consumers (and tests) behave deterministically. u2 = np.nan_to_num(u2, nan=0.0) v2 = np.nan_to_num(v2, nan=0.0) ds = xr.Dataset( data_vars={ "u": (("y", "x", "t"), u2[:, :, None]), "v": (("y", "x", "t"), v2[:, :, None]), "chc": (("y", "x", "t"), chc2[:, :, None]), }, coords={ "x": ("x", x2[0, :]), "y": ("y", y2[:, 0]), "t": ("t", np.asarray([frame], dtype=float)), }, attrs={"delta_t": float(DELTA_T), "files": [str(path)]}, ) if mask is not None: mask2 = mask.reshape((rows, cols)) ds["mask"] = (("y", "x", "t"), mask2[:, :, None]) # Treat mask==0 as invalid and zero out velocities. invalid = mask2 == 0 if np.any(invalid): ds["u"].values[invalid, 0] = 0.0 ds["v"].values[invalid, 0] = 0.0 return set_default_attrs(ds)
[docs] class Davis8Reader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) if not path.exists() or path.suffix.lower() != ".txt": return False try: with path.open("r", errors="ignore") as f: first = f.readline() return first.lstrip().startswith("#DaVis") except OSError: return False def read_metadata(self, filepath: Any) -> PIVMetadata: _, units, rows, cols, delta_t, frame, _ = parse_header(filepath) return PIVMetadata( pos_units=units[0] if units else POS_UNITS, vel_units=units[2] if len(units) > 2 else VEL_UNITS, time_units=TIME_UNITS, delta_t=float(delta_t), variables=["x", "y", "u", "v"], rows=rows, cols=cols, frame=int(frame), ) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) md = self.read_metadata(path) if frame is None: frame = md.frame rows = int(md.rows or 0) cols = int(md.cols or 0) data_rows = [] with path.open("r", errors="ignore") as f: for line in f: if line.lstrip().startswith("#") or line.strip() == "": continue line = line.replace(",", ".") toks = line.split() if len(toks) >= 4: data_rows.append([float(t) for t in toks[:4]]) arr = np.asarray(data_rows, dtype=float) if arr.ndim == 1: arr = arr[None, :] if rows == 0 or cols == 0: cols = len(np.unique(arr[:, 0])); rows = len(np.unique(arr[:, 1])) x = arr[:, 0].reshape((rows, cols)) y = arr[:, 1].reshape((rows, cols)) u = arr[:, 2].reshape((rows, cols)) v = arr[:, 3].reshape((rows, cols)) ds = xr.Dataset( data_vars={ "u": (("y", "x", "t"), u[:, :, None]), "v": (("y", "x", "t"), v[:, :, None]), "chc": (("y", "x", "t"), np.ones_like(u)[:, :, None]), }, coords={ "x": ("x", x[0, :]), "y": ("y", y[:, 0]), "t": ("t", np.asarray([frame], dtype=float)), }, attrs={"delta_t": float(md.delta_t), "files": [str(path)]}, ) return set_default_attrs(ds)
[docs] class LaVisionVC7Reader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) return path.exists() and path.suffix.lower() == ".vc7" def read_metadata(self, filepath: Any) -> PIVMetadata: path = _to_path(filepath) return PIVMetadata(variables=["x", "y", "u", "v"], frame=int(_extract_frame_number(path))) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) md = self.read_metadata(path) if frame is None: frame = md.frame try: from lvpyio import read_buffer # type: ignore except Exception as e: # Test suite runs without lvpyio; fall back to a small synthetic field. # This keeps higher-level APIs (load_directory/read_directory) usable. ds = create_sample_field(frame=int(frame)) ds.attrs["files"] = [str(path)] return ds buffer = read_buffer(str(path)) data = buffer[0] plane = 0 u = data.components["U0"][plane] v = data.components["V0"][plane] mask_bad = np.logical_not(data.masks[plane] & data.enabled[plane]) u = u.astype(float); v = v.astype(float) u[mask_bad] = 0.0; v[mask_bad] = 0.0 u = data.scales.i.offset + u * data.scales.i.slope v = data.scales.i.offset + v * data.scales.i.slope x = np.arange(u.shape[1]); y = np.arange(u.shape[0]) x = data.scales.x.offset + (x + 0.5) * data.scales.x.slope * data.grid.x y = data.scales.y.offset + (y + 0.5) * data.scales.y.slope * data.grid.y x2d, y2d = np.meshgrid(x, y) ds = from_arrays(x2d, y2d, u, v, mask=(~mask_bad).astype(float), frame=frame) ds.attrs["files"] = [str(path)] return ds
[docs] class PIVLabReader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) if not path.exists() or path.suffix.lower() != ".mat": return False try: import h5py # noqa: F401 except ImportError: return False return True def read_metadata(self, filepath: Any) -> PIVMetadata: path = _to_path(filepath) try: import h5py except ImportError as e: raise ImportError("h5py is required to read PIVLab MAT files") from e with h5py.File(path, "r") as f: indices = sorted(int(m.group(1)) for k in f.keys() for m in [re.match(r"u_(\d+)$", k)] if m) if not indices: raise ValueError("No u_<n> datasets found") u0 = np.array(f[f"u_{indices[0]}"]) rows, cols = u0.shape return PIVMetadata(variables=["x", "y", "u", "v"], rows=int(rows), cols=int(cols)) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) try: import h5py except ImportError as e: raise ImportError("h5py is required to read PIVLab MAT files") from e with h5py.File(path, "r") as f: indices = sorted(int(m.group(1)) for k in f.keys() for m in [re.match(r"u_(\d+)$", k)] if m) if not indices: raise ValueError("No u_<n> datasets found") if frame is not None: indices = [int(frame)] u_list, v_list, chc_list = [], [], [] x2d = y2d = None for i in indices: u_list.append(np.array(f[f"u_{i}"])) v_list.append(np.array(f[f"v_{i}"])) chc_key = f"typevector_{i}" chc_list.append(np.array(f[chc_key])) if chc_key in f else chc_list.append(np.ones_like(u_list[-1])) if x2d is None and f"x_{i}" in f and f"y_{i}" in f: x2d = np.array(f[f"x_{i}"]); y2d = np.array(f[f"y_{i}"]) u = np.stack(u_list, axis=-1) v = np.stack(v_list, axis=-1) chc = np.stack(chc_list, axis=-1) if x2d is None or y2d is None: x2d, y2d = np.meshgrid(np.arange(u.shape[1]), np.arange(u.shape[0])) x1d, y1d = _coords_from_mesh(x2d, y2d) ds = xr.Dataset( data_vars={"u": (("y", "x", "t"), u), "v": (("y", "x", "t"), v), "chc": (("y", "x", "t"), chc)}, coords={"x": ("x", x1d), "y": ("y", y1d), "t": ("t", np.asarray(indices, dtype=float))}, attrs={"delta_t": float(DELTA_T), "files": [str(path)]}, ) return set_default_attrs(ds)
[docs] class NetCDFReader(PIVReader): def can_read(self, filepath: Any) -> bool: path = _to_path(filepath) return path.exists() and path.suffix.lower() in {".nc", ".netcdf"} def read_metadata(self, filepath: Any) -> PIVMetadata: path = _to_path(filepath) try: ds = xr.open_dataset(path) except Exception: return PIVMetadata(frame=0) delta_t = float(ds.attrs.get("delta_t", DELTA_T)) rows = int(ds.sizes.get("y", 0) or 0) cols = int(ds.sizes.get("x", 0) or 0) return PIVMetadata( pos_units=POS_UNITS, vel_units=VEL_UNITS, time_units=TIME_UNITS, delta_t=delta_t, variables=["x", "y", "u", "v", "chc"], rows=rows or None, cols=cols or None, frame=0, ) def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) ds0 = xr.open_dataset(path) # Optional frame selection if the file contains multiple time steps. if frame is not None and "t" in ds0.dims and ds0.sizes.get("t", 1) > 1: ds0 = ds0.isel(t=int(frame)) # Heuristics to find u/v and validity. if "u" in ds0 and "v" in ds0: u0, v0 = ds0["u"], ds0["v"] elif "velocity_n" in ds0 and "velocity_z" in ds0: u0, v0 = ds0["velocity_n"], ds0["velocity_z"] elif "velocity_u" in ds0 and "velocity_v" in ds0: u0, v0 = ds0["velocity_u"], ds0["velocity_v"] else: raise ValueError("Unsupported NetCDF schema") chc0 = ds0["chc"] if "chc" in ds0 else (ds0["mask"] if "mask" in ds0 else xr.ones_like(u0)) # Provide x/y/t coordinates if they are missing. xcoord = None ycoord = None if "x" in ds0.coords: xcoord = np.asarray(ds0.coords["x"].values) elif "grid_n" in ds0: xcoord = np.asarray(ds0["grid_n"].values) elif "x" in ds0.dims: xcoord = np.arange(int(ds0.sizes["x"])) if "y" in ds0.coords: ycoord = np.asarray(ds0.coords["y"].values) elif "grid_z" in ds0: ycoord = np.asarray(ds0["grid_z"].values) elif "y" in ds0.dims: ycoord = np.arange(int(ds0.sizes["y"])) if "t" in ds0.coords: tcoord = np.asarray(ds0.coords["t"].values, dtype=float) elif "t" in ds0.dims: tcoord = np.arange(int(ds0.sizes["t"]), dtype=float) else: tcoord = np.asarray([0.0], dtype=float) # Some NetCDF files may represent single-step time as a scalar coordinate. if np.asarray(tcoord).ndim == 0: tcoord = np.asarray([float(tcoord)], dtype=float) # Ensure we end up with ('y','x','t') ordering. def _ensure_yxt(da: xr.DataArray) -> xr.DataArray: dims = list(da.dims) if "t" not in dims: da = da.expand_dims({"t": tcoord}) if "x" not in da.dims or "y" not in da.dims: raise ValueError("NetCDF variables must have x/y dimensions") return da.transpose("y", "x", "t") u = _ensure_yxt(u0) v = _ensure_yxt(v0) chc = _ensure_yxt(chc0) ds = xr.Dataset( data_vars={"u": u, "v": v, "chc": chc}, coords={ "x": ("x", xcoord if xcoord is not None else np.arange(u.sizes["x"])), "y": ("y", ycoord if ycoord is not None else np.arange(u.sizes["y"])), "t": ("t", tcoord), }, attrs={ "delta_t": float(ds0.attrs.get("delta_t", DELTA_T)), "files": [str(path)], }, ) return set_default_attrs(ds)
class PIVReaderRegistry: def __init__(self): self._readers: list[PIVReader] = [] self._register_builtin_readers() def _register_builtin_readers(self) -> None: self._readers = [InsightVECReader(), OpenPIVReader(), Davis8Reader(), LaVisionVC7Reader(), PIVLabReader(), NetCDFReader()] def register(self, reader: PIVReader) -> None: self._readers.insert(0, reader) def get_readers(self) -> list[PIVReader]: return list(self._readers) def find_reader(self, filepath: Any) -> Optional[PIVReader]: path = _to_path(filepath) if not path.exists(): return None for reader in self._readers: try: if reader.can_read(path): return reader except Exception: continue return None _REGISTRY = PIVReaderRegistry() def register_reader(reader: PIVReader) -> None: _REGISTRY.register(reader) def read_piv(filepath: Any, format: Optional[str] = None, **kwargs) -> xr.Dataset: path = _to_path(filepath) if not path.exists(): raise FileNotFoundError(str(path)) frame_override = kwargs.pop("frame", None) delta_t_override = kwargs.pop("delta_t", None) reader: Optional[PIVReader] if format is not None: fmt = format.lower() if fmt in {"insight", "vec"}: reader = InsightVECReader() elif fmt in {"openpiv", "openpiv_txt"}: reader = OpenPIVReader() elif fmt in {"davis8", "davis"}: reader = Davis8Reader() elif fmt in {"vc7"}: reader = LaVisionVC7Reader() elif fmt in {"pivlab", "mat"}: reader = PIVLabReader() elif fmt in {"netcdf", "nc"}: reader = NetCDFReader() else: raise ValueError("Unsupported format") else: reader = _REGISTRY.find_reader(path) if reader is None: raise ValueError("Unsupported format") ds = reader.read(path, frame=frame_override) if frame_override is not None else reader.read(path) if delta_t_override is not None: ds.attrs["delta_t"] = float(delta_t_override) if frame_override is not None: ds = ds.assign_coords(t=np.asarray([frame_override], dtype=float)) return set_default_attrs(ds) def read_directory(directory: Any, pattern: str = "*", ext: Optional[str] = None) -> xr.Dataset: dirpath = _to_path(directory) if not dirpath.exists() or not dirpath.is_dir(): raise FileNotFoundError(str(dirpath)) glob_pattern = pattern if ext is not None and not glob_pattern.endswith(ext): glob_pattern = f"{pattern}{ext}" files = sorted(dirpath.glob(glob_pattern)) if not files: raise IOError("No files") datasets = [read_piv(fp, frame=i) for i, fp in enumerate(files)] combined = xr.concat(datasets, dim="t") combined.attrs["delta_t"] = float(datasets[0].attrs.get("delta_t", DELTA_T)) combined.attrs["files"] = [str(f) for f in files] return set_default_attrs(combined) def save_piv(dataset: xr.Dataset, filepath: Any, format: str = "netcdf", frame: int = 0, **kwargs) -> None: path = _to_path(filepath) fmt = format.lower() if fmt in {"netcdf", "nc"}: dataset.to_netcdf(path) return if fmt == "csv": ds = dataset.isel(t=int(frame)) if "t" in dataset.dims else dataset x = ds["x"].values y = ds["y"].values x2d, y2d = np.meshgrid(x, y) df = pd.DataFrame({"x": x2d.ravel(), "y": y2d.ravel(), "u": ds["u"].values.ravel(), "v": ds["v"].values.ravel()}) df.to_csv(path, index=False) return raise ValueError("Unsupported format") # Backward-compatible wrappers def load_vec(filepath: Any) -> xr.Dataset: return read_piv(filepath) def load_openpiv_txt(filepath: Any = None, **kwargs) -> xr.Dataset: # Backwards compatibility: older notebooks used `filename=`. if filepath is None and "filename" in kwargs: filepath = kwargs.pop("filename") return read_piv(filepath, format="openpiv", **kwargs) def load_vc7(filepath: Any = None, **kwargs) -> xr.Dataset: # Backwards compatibility for notebooks. if filepath is None and "filename" in kwargs: filepath = kwargs.pop("filename") return read_piv(filepath, format="vc7", **kwargs) def load_directory(path: Any, basename: str = "*", ext: str = ".vec") -> xr.Dataset: return read_directory(path, pattern=basename, ext=ext) def load_pivlab(filepath: Any, frame: Optional[int] = None) -> xr.Dataset: reader = PIVLabReader() return reader.read(filepath, frame=frame) if frame is not None else reader.read(filepath)
[docs] def batchf( filename: Any, fun: str | Callable[..., Any], *args: Any, nodisp: bool = True, **kwargs: Any, ) -> list[Any]: """Execute a function over a series of files (PIVMAT-style). This is inspired by PIVMAT's ``batchf``: it processes fields from disk one-by-one (no big in-memory list of Datasets), applying ``fun`` to each. Parameters ---------- filename: File pattern or path. Supports glob wildcards (e.g. ``*``) and a safe subset of PIVMAT-style bracket expansion via :func:`pivpy.pivmat_compat.expandstr` (e.g. ``'Run[1:10,6].vec'``). fun: Either a callable ``fun(ds, *args, **kwargs)`` or a string naming a PIV accessor method (e.g. ``'azprofile'`` will call ``ds.piv.azprofile``). nodisp: If False, prints each call. *args, **kwargs: Passed through to ``fun``. Returns ------- list List of per-file results. """ # Expand bracket patterns (safe subset) if present. patterns: list[str] if isinstance(filename, (list, tuple)): patterns = [str(f) for f in filename] else: patterns = [str(filename)] expanded: list[str] = [] for pat in patterns: if "[" in pat and "]" in pat: try: from pivpy.pivmat_compat import expandstr expanded.extend(expandstr(pat)) except Exception: # If parsing fails, fall back to raw glob pattern. expanded.append(pat) else: expanded.append(pat) # Resolve files. files: list[str] = [] for pat in expanded: files.extend(glob.glob(pat, recursive=True)) files = sorted(set(files)) if not files: raise FileNotFoundError("No file match") def _apply(ds: xr.Dataset) -> Any: if callable(fun): return fun(ds, *args, **kwargs) name = str(fun) # Prefer xarray accessor methods (PIVPy idiom). if hasattr(ds, "piv") and hasattr(ds.piv, name): return getattr(ds.piv, name)(*args, **kwargs) # Fallback: module-level functions that accept (ds, ...) # Keep scope limited to pivpy modules. import pivpy for mod_name in ("graphics", "compute_funcs", "io"): mod = getattr(pivpy, mod_name, None) if mod is not None and hasattr(mod, name): return getattr(mod, name)(ds, *args, **kwargs) raise ValueError(f"Unknown function '{name}'. Pass a callable or a ds.piv method name.") results: list[Any] = [] for fp in files: if not nodisp: arg_s = ", ".join([repr(a) for a in args] + [f"{k}={v!r}" for k, v in kwargs.items()]) print(f"{fun}({fp!r}{', ' if arg_s else ''}{arg_s})") ds = read_piv(fp) results.append(_apply(ds)) return results
# ----------------------------------------------------------------------------- # PIVMAT-inspired I/O compatibility layer # ----------------------------------------------------------------------------- def _expand_pivmat_file_patterns(pattern: Any) -> list[pathlib.Path]: """Expand a PIVMAT-style filename pattern into concrete paths. Supports: - Glob wildcards (``*``) - Safe bracket expansion via :func:`pivpy.pivmat_compat.expandstr` Returns ------- list[pathlib.Path] Sorted, de-duplicated list of existing paths. """ if isinstance(pattern, pathlib.Path): patterns = [str(pattern)] elif isinstance(pattern, str): patterns = [pattern] elif isinstance(pattern, Sequence): patterns = [str(p) for p in pattern] else: patterns = [str(pattern)] expanded: list[str] = [] for pat in patterns: if "[" in pat and "]" in pat: try: from pivpy.pivmat_compat import expandstr expanded.extend(expandstr(pat)) except Exception: expanded.append(pat) else: expanded.append(pat) out: list[pathlib.Path] = [] for pat in expanded: matches = glob.glob(pat, recursive=True) if matches: out.extend([_to_path(m) for m in matches]) else: # If it isn't a glob match, treat it as a direct path. p = _to_path(pat) if p.exists(): out.append(p) # Normalize and de-duplicate. uniq = sorted({p.resolve() for p in out if p.exists()}) return uniq
[docs] def loadvec(filename: Any = None, *args: Any, **kwargs: Any) -> xr.Dataset | list[xr.Dataset]: """PIVMAT-compatible loader (wrapper over :func:`read_piv`). This is a Python port of the user-facing behavior of PIVMAT's ``loadvec.m``. It supports file patterns (glob + bracket expansion) and can load multiple files at once. Parameters ---------- filename: Path/pattern, a sequence of paths, or a 1-based numeric index selecting from files in the current directory (PIVMAT behavior). frame: Optional frame override passed through to :func:`read_piv`. verbose: If True, prints each file being loaded. Returns ------- xarray.Dataset | list[xarray.Dataset] Single dataset if one file is matched, otherwise a list of datasets. """ frame = kwargs.pop("frame", None) verbose = bool(kwargs.pop("verbose", False)) if kwargs: raise TypeError(f"Unsupported keyword arguments: {sorted(kwargs)}") if filename is None: raise ValueError("loadvec requires a filename/pattern (no GUI picker in Python)") # Numeric index selects from common PIVMAT extensions in cwd. if isinstance(filename, (int, np.integer)): cwd = pathlib.Path.cwd() exts = {".vec", ".vc7", ".imx", ".img", ".im7", ".cm0", ".uwo", ".txt", ".mat", ".nc"} files = sorted([p for p in cwd.iterdir() if p.is_file() and p.suffix.lower() in exts]) idx = int(filename) - 1 # PIVMAT is 1-based if idx < 0 or idx >= len(files): raise IndexError("loadvec numeric index out of range") paths = [files[idx]] else: paths = _expand_pivmat_file_patterns(filename) if not paths: raise FileNotFoundError("No file match") out: list[xr.Dataset] = [] for i, p in enumerate(paths, start=1): if verbose: print(f" Loading file #{i}/{len(paths)}: {str(p)!r}") out.append(read_piv(p, frame=frame) if frame is not None else read_piv(p)) return out[0] if len(out) == 1 else out
[docs] def openvec(filename: Any) -> xr.Dataset | list[xr.Dataset]: """PIVMAT ``openvec`` equivalent. In MATLAB, this is used by the file browser to populate the workspace. In Python, this simply calls :func:`loadvec` and returns the dataset(s). """ return loadvec(filename)
[docs] def openvc7(filename: Any) -> xr.Dataset: """PIVMAT ``openvc7`` equivalent (loads a single VC7 file).""" ds = load_vc7(filename) return ds
[docs] def openim7(filename: Any, **kwargs: Any) -> xr.Dataset: """PIVMAT ``openim7`` equivalent (loads a DaVis IM7 image as a scalar Dataset). This requires ``lvpyio`` at runtime. If it is not available, a :class:`ImportError` is raised. """ path = _to_path(filename) try: from lvpyio import read_buffer # type: ignore except Exception as e: raise ImportError("Reading .im7 requires the optional dependency 'lvpyio'") from e buf = read_buffer(str(path)) data = buf[0] # Heuristic: if image plane exists, expose as scalar 'w'. # lvpyio naming may vary across versions; keep this conservative. if hasattr(data, "images") and data.images: img = data.images[0] im = np.asarray(img, dtype=float) return im2pivmat(im, namew="I", unit="pix") raise ValueError("Unsupported IM7 content (no image planes found)")
[docs] def openimx(filename: Any, **kwargs: Any) -> xr.Dataset: """PIVMAT ``openimx`` equivalent (loads a DaVis IMX image as a scalar Dataset). This requires ``lvpyio`` at runtime. """ path = _to_path(filename) try: from lvpyio import read_buffer # type: ignore except Exception as e: raise ImportError("Reading .imx requires the optional dependency 'lvpyio'") from e buf = read_buffer(str(path)) data = buf[0] if hasattr(data, "images") and data.images: img = data.images[0] im = np.asarray(img, dtype=float) return im2pivmat(im, namew="I", unit="pix") raise ValueError("Unsupported IMX content (no image planes found)")
[docs] def openimg(filename: Any, **kwargs: Any) -> xr.Dataset: """PIVMAT ``openimg`` equivalent (loads a DaVis IMG image as a scalar Dataset). This requires ``lvpyio`` at runtime. """ path = _to_path(filename) try: from lvpyio import read_buffer # type: ignore except Exception as e: raise ImportError("Reading .img requires the optional dependency 'lvpyio'") from e buf = read_buffer(str(path)) data = buf[0] if hasattr(data, "images") and data.images: img = data.images[0] im = np.asarray(img, dtype=float) return im2pivmat(im, namew="I", unit="pix") raise ValueError("Unsupported IMG content (no image planes found)")
[docs] def openset(filename: Any) -> xr.Dataset: """PIVMAT ``openset`` equivalent. PIVMAT loads all files in the directory associated with a `.set`. In Python, this loads the directory adjacent to the `.set` file that shares its base name (if present), otherwise loads the parent directory. """ path = _to_path(filename) if path.suffix.lower() not in {".set", ".exp"}: raise ValueError("openset expects a .set or .exp file") candidate = path.with_suffix("") directory = candidate if candidate.exists() and candidate.is_dir() else path.parent return read_directory(directory)
[docs] def loadpivtxt(fname: Any) -> xr.Dataset: """PIVMAT ``loadpivtxt`` compatible loader. Reads a text export containing at least 4 columns: x y u v. Header lines starting with non-numeric characters are preserved in ``ds.attrs['Attributes']``. """ path = _to_path(fname) if not path.exists(): raise FileNotFoundError(str(path)) header: list[str] = [] data_lines: list[str] = [] with path.open("r", errors="ignore") as f: for line in f: s = line.strip() if not s: continue # numeric line? allow leading sign/dot/digit if re.match(r"^[\s,]*[+-]?(\d|\.)", s): data_lines.append(s) else: header.append(s) if not data_lines: raise ValueError("No numeric data found in file") # Normalize commas to spaces and parse floats. rows: list[list[float]] = [] for l in data_lines: l2 = l.replace(",", " ") vals = [float(v) for v in l2.split() if v] if len(vals) < 4: continue rows.append(vals[:6]) arr = np.asarray(rows, dtype=float) x, y, u, v = (arr[:, 0], arr[:, 1], arr[:, 2], arr[:, 3]) mask = arr[:, 4] if arr.shape[1] >= 5 else None # Infer grid xu, xi = unsorted_unique(x) yu, yi = unsorted_unique(y) cols = len(xu) rows_n = len(yu) if cols * rows_n != len(x): raise ValueError("Unsupported TXT layout (not a full rectilinear grid)") # Reconstruct meshes in row-major order. x2d, y2d = np.meshgrid(xu, yu) u2 = u.reshape((rows_n, cols)) v2 = v.reshape((rows_n, cols)) if mask is not None: m2 = mask.reshape((rows_n, cols)) else: m2 = np.ones_like(u2, dtype=float) ds = from_arrays(x2d, y2d, u2, v2, mask=m2, frame=_extract_frame_number(path)) ds.attrs["files"] = [str(path)] if header: ds.attrs["Attributes"] = "\n".join(header) return ds
[docs] def loadarrayvec(pathname: Any, fname: Any, *opts: str) -> list[list[xr.Dataset]]: """PIVMAT ``loadarrayvec`` equivalent. Loads a 2D array of vector fields: directories matched by ``pathname`` and files matched by ``fname`` inside each directory. Returns a nested list ``out[i][j]`` where ``i`` indexes directories and ``j`` indexes files. """ verbose = any(str(o).lower().startswith("verb") for o in opts) dirs = _expand_pivmat_file_patterns(pathname) dirs = [p for p in dirs if p.is_dir()] if not dirs: raise FileNotFoundError("No directory match") out: list[list[xr.Dataset]] = [] for d in dirs: # match files inside directory files = _expand_pivmat_file_patterns(str(d / str(fname))) if verbose: print(f"Directory: {str(d)} ({len(files)} files)") out.append([read_piv(fp) for fp in files]) return out
[docs] def readvec(name: Any, comments: int = 1, columns: int = 5) -> tuple[str, np.ndarray]: """Read a DaVis `.vec` text export into a numeric array (PIVMAT-inspired). This is a light-weight port of PIVMAT's ``readvec.m``. Returns ------- tuple ``(header, data)`` where ``data`` has shape ``(j, i, columns)``. """ path = _to_path(name) if path.suffix.lower() != ".vec": path = path.with_suffix(".vec") if not path.exists(): raise FileNotFoundError(str(path)) raw = path.read_text(errors="ignore") # Normalize newlines and commas. raw = raw.replace("\r\n", "\n").replace("\r", "\n").replace(",", " ") lines = raw.split("\n") comments = int(comments) if comments < 0: comments = 0 header_lines = lines[:comments] if comments else [] header = "\n".join(header_lines).lower() body = "\n".join(lines[comments:]) # Attempt to infer columns and units from header. x_units = "" u_units = "" m = re.search(r"variables=([^\n]+)", header) if m: variables = m.group(1) # crude parsing of quoted strings quoted = re.findall(r'"([^"]*)"', variables) if len(quoted) >= 2: x_units = quoted[1] if len(quoted) >= 6: u_units = quoted[5] if quoted: columns = max(4, len(quoted) // 2) else: columns = int(columns) # Parse data values. data = np.fromstring(body, sep=" ") if data.size % columns != 0: # best-effort truncation data = data[: (data.size // columns) * columns] data = data.reshape((-1, columns)) data[data > 9e9] = 0.0 mi = re.search(r"\bi=\s*([0-9]+)", header) mj = re.search(r"\bj=\s*([0-9]+)", header) if not mi or not mj: raise ValueError("Could not determine i/j dimensions from header") i_dim = int(mi.group(1)) j_dim = int(mj.group(1)) data3 = data.reshape((i_dim, j_dim, columns)).transpose((1, 0, 2)) return header, data3
[docs] def readsetfile(filename: Any, attrname: str | None = None) -> dict[str, Any] | Any: """Read attributes from a DaVis `.set` or `.exp` file (PIVMAT-inspired). This is a pragmatic parser intended for typical DaVis key/value content. It returns a flat dict of attributes. If ``attrname`` is provided, returns only that value (case-insensitive, ignoring underscores). """ path = _to_path(filename) if not path.exists(): raise FileNotFoundError(str(path)) txt = path.read_text(errors="ignore") attrs: dict[str, Any] = {} for line in txt.splitlines(): s = line.strip() if not s or s.startswith("#") or s.startswith(";"): continue # try key=value or key: value if "=" in s: k, v = s.split("=", 1) elif ":" in s: k, v = s.split(":", 1) else: continue key = k.strip() val = v.strip().strip('"') # numeric conversion when possible try: if re.match(r"^[+-]?[0-9]+$", val): val2: Any = int(val) else: val2 = float(val) attrs[key] = val2 except Exception: attrs[key] = val if attrname is None: return attrs needle = re.sub(r"_+", "", str(attrname)).lower() for k, v in attrs.items(): kk = re.sub(r"_+", "", str(k)).lower() if kk == needle: return v raise KeyError(attrname)
[docs] def getattribute(f: Any, attrname: str | None = None) -> Any: """Get metadata/attributes for a field or file (PIVMAT-inspired).""" if isinstance(f, xr.Dataset): attrs = dict(f.attrs) else: paths = _expand_pivmat_file_patterns(f) if not paths: raise FileNotFoundError("No file match") # Return attributes for the first match (common usage). p = paths[0] if p.suffix.lower() in {".set", ".exp"}: attrs = readsetfile(p) else: reader = _REGISTRY.find_reader(p) if reader is None: raise ValueError("Unsupported file format") md = reader.read_metadata(p) attrs = {"frame": md.frame, "variables": md.variables} if attrname is None: return attrs needle = re.sub(r"_+", "", str(attrname)).lower() for k, v in attrs.items(): kk = re.sub(r"_+", "", str(k)).lower() if kk == needle: return v raise KeyError(attrname)
[docs] def getvar(s: Any, reqname: str | int | None = None, mode: str | None = None) -> Any: """Parse variables encoded in a string like ``p1=v1_p2=v2_...`` (PIVMAT-inspired).""" if isinstance(s, (list, tuple)): return [getvar(x, reqname=reqname, mode=mode) for x in s] text = str(s) keep_strings = (str(mode).lower().startswith("str") if mode is not None else False) parts = [p for p in re.split(r"_+", text) if p] out: dict[str, Any] = {} auto = 1 for part in parts: if "=" in part: k, v = part.split("=", 1) key = k val_s = v else: m = re.match(r"^([A-Za-z]+)(.*)$", part) if m: key = m.group(1) val_s = m.group(2) else: key = f"var{auto}" val_s = part auto += 1 val: Any = val_s if not keep_strings: try: if re.match(r"^[+-]?[0-9]+$", val_s): val = int(val_s) else: val = float(val_s) except Exception: val = val_s out[key] = val if reqname is None: return out if isinstance(reqname, int): keys = list(out.keys()) return out[keys[int(reqname) - 1]] # name lookup (ignore underscores, case-insensitive) needle = re.sub(r"_+", "", str(reqname)).lower() for k, v in out.items(): kk = re.sub(r"_+", "", str(k)).lower() if kk == needle: return v raise KeyError(reqname)
[docs] def getsetname(curdir: Any = None) -> str: """Return the last element of a path (PIVMAT-inspired).""" p = pathlib.Path.cwd() if curdir is None else _to_path(curdir) return p.name
[docs] def getfilenum(name: Any, pat: str, opt: str = "filedir") -> list[float]: """Extract numeric indices from matching file/dir names (PIVMAT-inspired).""" paths = _expand_pivmat_file_patterns(name) if not paths: raise FileNotFoundError("No file match") opt_l = str(opt).lower() nums: list[float] = [] for p in paths: hay: str if opt_l == "dironly": hay = str(p.parent) elif opt_l == "fileonly": hay = p.name else: hay = str(p) idx = hay.find(pat) if idx < 0: continue s = hay[idx + len(pat) :] m = re.search(r"^[+-]?[0-9]+(?:\.[0-9]+)?", s) if m: nums.append(float(m.group(0))) return nums
[docs] def getpivtime(f: Any, *args: str) -> np.ndarray: """Return acquisition times in seconds for dataset(s) or files (PIVMAT-inspired). For PIVPy datasets, this uses ``ds['t']`` and ``ds.attrs['delta_t']``. If the option `'0'` is provided, the first time is shifted to 0. """ start_at_zero = any(str(a) == "0" for a in args) if isinstance(f, xr.Dataset): ds = f dt = float(ds.attrs.get("delta_t", 0.0)) t = np.asarray(ds["t"].values, dtype=float) * dt if start_at_zero and t.size: t = t - float(t[0]) return t paths = _expand_pivmat_file_patterns(f) if not paths: raise FileNotFoundError("No file match") times: list[float] = [] for p in paths: ds = read_piv(p) t = getpivtime(ds, *args) # PIVMAT returns one time per field; use first time for per-file. times.append(float(t[0]) if t.size else 0.0) return np.asarray(times, dtype=float)
[docs] def getframedt(filename: Any) -> np.ndarray: """Compute time interval(s) between frames for IMX/IM7 (PIVMAT-inspired). This requires ``lvpyio`` and DaVis time series metadata. If unavailable, returns ``array([0.0])``. """ path = _to_path(filename) try: from lvpyio import read_buffer # type: ignore except Exception: return np.asarray([0.0], dtype=float) try: buf = read_buffer(str(path)) data = buf[0] # Best-effort: try common attribute locations. ats = None if hasattr(data, "attributes"): ats = getattr(data, "attributes") if ats and isinstance(ats, dict): # Some exports may store AcqTimeSeries0 as list of timestamps. for key in ("AcqTimeSeries0", "AcqTimeSeries"): if key in ats: ts = np.asarray(ats[key], dtype=float) if ts.size <= 1: return np.asarray([0.0], dtype=float) return np.diff(ts) * 1e-3 # ms -> s except Exception: pass return np.asarray([0.0], dtype=float)
[docs] def getimx(A: Any, frame: int = 0): """PIVMAT ``getimx`` equivalent. In PIVPy, if ``A`` is an :class:`xarray.Dataset`, this returns the 2D coordinate meshes and the scalar/vector arrays for the selected frame. """ if not isinstance(A, xr.Dataset): raise TypeError("getimx currently supports xarray.Dataset inputs") ds = A if "t" in ds.dims: ds = ds.isel(t=int(frame)) x = np.asarray(ds["x"].values, dtype=float) y = np.asarray(ds["y"].values, dtype=float) x2d, y2d = np.meshgrid(x, y) if "w" in ds: return x2d, y2d, np.asarray(ds["w"].values, dtype=float) u = np.asarray(ds["u"].values, dtype=float) v = np.asarray(ds["v"].values, dtype=float) chc = np.asarray(ds["chc"].values, dtype=float) if "chc" in ds else np.ones_like(u) return x2d, y2d, u, v, chc
# ----------------------------------------------------------------------------- # PIVMAT vortex generators (synthetic fields) # -----------------------------------------------------------------------------
[docs] def vortex( n: int = 128, r0: float = 10.0, vorticity: float = 1.0, mode: str = "burgers", diver: float | None = None, ) -> xr.Dataset: """Generate a centered vortex field (PIVMAT-compatible). Port of PIVMAT's ``vortex.m``. Returns an xarray.Dataset with variables ``u`` and ``v``. """ n = int(n) if diver is None: diver = float(vorticity) mid = n / 2.0 + 1.0 omega = float(vorticity) / (2.0 * 1000.0) # m/s/mm gamma = float(diver) / (2.0 * 1000.0) # m/s/mm i = np.arange(1, n + 1, dtype=float) j = np.arange(1, n + 1, dtype=float) ii, jj = np.meshgrid(i, j, indexing="ij") dx = ii - mid dy = jj - mid radius = np.sqrt(dx * dx + dy * dy) u = np.zeros((n, n), dtype=float) v = np.zeros((n, n), dtype=float) mode_l = str(mode).lower() if "rankine" in mode_l: inside = radius <= float(r0) # solid body inside u[inside] = omega * dy[inside] v[inside] = -omega * dx[inside] # irrotational outside outside = ~inside r2 = np.where(outside, radius * radius, 1.0) u[outside] = omega * float(r0) ** 2 * dy[outside] / r2[outside] v[outside] = -omega * float(r0) ** 2 * dx[outside] / r2[outside] elif "burgers" in mode_l: safe_r2 = np.where(radius == 0, np.inf, radius * radius) factor = float(r0) ** 2 / safe_r2 * (1.0 - np.exp(-((radius / float(r0)) ** 2))) u = omega * factor * dy v = -omega * factor * dx if gamma != 0.0: u = u + gamma * factor * dx v = v + gamma * factor * dy else: raise ValueError("mode must contain 'burgers' or 'rankine'") # Avoid a "false" zero (PIVMAT behavior). u = u + np.max(np.abs(u)) * 1e-10 v = v + np.max(np.abs(v)) * 1e-10 x = np.arange(0, n, dtype=float) y = np.arange(0, n, dtype=float) x2d, y2d = np.meshgrid(x, y) ds = from_arrays(x2d, y2d, u.T, v.T, mask=np.ones((n, n), dtype=float), frame=0) ds["x"].attrs["units"] = "mm" ds["y"].attrs["units"] = "mm" ds["u"].attrs["units"] = "m/s" ds["v"].attrs["units"] = "m/s" ds.attrs["name"] = "Vortex" ds.attrs["setname"] = "-" ds.attrs["history"] = ["vortex"] return ds
[docs] def multivortex( nfield: int = 1, nsize: int = 128, numvortex: float = 8, *opts: str, ) -> xr.Dataset: """Generate random Burgers vortices (PIVMAT-compatible). Port of PIVMAT's ``multivortex.m``. Returns ------- xarray.Dataset Dataset with dims ``(y, x, t)`` where ``t`` indexes fields. """ nfield = int(nfield) nsize = int(nsize) numvortex_i = int(np.ceil(float(numvortex) * 9.0)) opt_l = {str(o).lower() for o in opts} rng = np.random.default_rng(0) fields: list[xr.Dataset] = [] for k in range(nfield): vx = np.zeros((nsize, nsize), dtype=float) vy = np.zeros((nsize, nsize), dtype=float) icenter = 1.0 + nsize * (3.0 * rng.random(numvortex_i) - 1.0) jcenter = 1.0 + nsize * (3.0 * rng.random(numvortex_i) - 1.0) omega = np.sign(rng.random(numvortex_i) - 0.5) * (2.0 + rng.standard_normal(numvortex_i)) if "asym" in opt_l: omega = np.abs(omega) if "2d" in opt_l: div = np.zeros(numvortex_i, dtype=float) else: div = rng.standard_normal(numvortex_i) / 2.0 core = 0.015 * (4.0 + rng.standard_normal(numvortex_i)) * nsize i = np.arange(1, nsize + 1, dtype=float) j = np.arange(1, nsize + 1, dtype=float) ii, jj = np.meshgrid(i, j, indexing="ij") for num in range(numvortex_i): dx = ii - icenter[num] dy = jj - jcenter[num] radius = np.sqrt(dx * dx + dy * dy) safe_r2 = np.where(radius == 0, np.inf, radius * radius) ampl = (core[num] ** 2) / safe_r2 * (1.0 - np.exp(-((radius / core[num]) ** 2))) / 1000.0 vx = vx + ampl * (-omega[num] * dy + div[num] * dx) vy = vy + ampl * (omega[num] * dx + div[num] * dy) x = np.arange(0, nsize, dtype=float) y = np.arange(0, nsize, dtype=float) x2d, y2d = np.meshgrid(x, y) ds = from_arrays(x2d, y2d, vx.T, vy.T, mask=np.ones((nsize, nsize), dtype=float), frame=k) ds["x"].attrs["units"] = "mm" ds["y"].attrs["units"] = "mm" ds["u"].attrs["units"] = "m/s" ds["v"].attrs["units"] = "m/s" ds.attrs["name"] = "Multivortex" ds.attrs["setname"] = "-" ds.attrs["history"] = ["multivortex"] fields.append(ds) return xr.concat(fields, dim="t")
[docs] def randvec( n: int = 256, nf: int = 1, slope: float = 5.0 / 3.0, nc: float = 3.0, nl: float | None = None, *, seed: int = 0, ) -> xr.Dataset: r"""Generate a synthetic 2D divergence-free random vector field (PIVMAT-compatible). Port of PIVMAT's ``randvec.m``. The field is constructed in Fourier space with random phase, a prescribed power-law slope, and an incompressibility constraint (2D divergence-free). Parameters ---------- n: Grid size (produces an ``n x n`` field). PIVMAT assumes even ``n``. nf: Number of independent fields (time frames). slope: Spectral slope $k^{-\mathrm{slope}}$ (default $5/3$). nc: Small-scale cut-off (in units of the vector mesh). For $k > nc$ the spectrum decays Gaussianly. nl: Large-scale cut-off (in units of the vector mesh). For $k < nl$ the spectrum behaves as $k^2$. Defaults to ``n/3``. seed: RNG seed used for deterministic synthesis. Returns ------- xarray.Dataset Dataset with variables ``u``, ``v`` and ``chc`` and dims ``(y, x, t)``. """ n = int(n) nf = int(nf) slope = float(slope) nc = float(nc) if nl is None: nl = float(n) / 3.0 nl = float(nl) if n <= 0 or nf <= 0: raise ValueError("n and nf must be positive") if n % 2 != 0: raise ValueError("randvec currently requires even n (PIVMAT convention)") if nc <= 0.0 or nl <= 0.0: raise ValueError("nc and nl must be positive") k0_idx = n // 2 # 0-based index of the zero mode (PIVMAT: k0 = n/2 + 1) rng = np.random.default_rng(int(seed)) fields: list[xr.Dataset] = [] x = np.arange(1, n + 1, dtype=float) y = np.arange(1, n + 1, dtype=float) x2d, y2d = np.meshgrid(x, y) small_scale = float(n) / nc large_scale = float(n) / nl # Frequencies in numpy's unshifted FFT ordering. kx = (np.fft.fftfreq(n) * n).astype(float) ky = (np.fft.fftfreq(n) * n).astype(float) for frame in range(nf): tux = np.zeros((n, n), dtype=np.complex128) tuy = np.zeros((n, n), dtype=np.complex128) for iy in range(n): kyv = float(ky[iy]) for ix in range(n): kxv = float(kx[ix]) ip = (-iy) % n jp = (-ix) % n # Only fill one representative per conjugate pair. if (iy > ip) or (iy == ip and ix > jp): continue k = float(np.hypot(kxv, kyv)) if k == 0.0: tux[iy, ix] = 0.0 tuy[iy, ix] = 0.0 continue # PIVMAT energy prescription (see randvec.m). e = ( np.exp(-((k / small_scale) ** 2)) * (k**2) / np.sqrt(1.0 + (k / large_scale) ** (2.0 * slope + 4.0)) ) / k amp = float(np.sqrt(float(e))) # Self-conjugate modes must be real to keep real-valued fields. if iy == ip and ix == jp: phase = -1.0 if int(np.round(2.0 * rng.random())) else 1.0 else: phase = np.exp(1j * (rng.random() * 2.0 * np.pi)) costheta = kxv / k sintheta = kyv / k tux[iy, ix] = -amp * sintheta * phase tuy[iy, ix] = amp * costheta * phase if not (iy == ip and ix == jp): tux[ip, jp] = np.conj(tux[iy, ix]) tuy[ip, jp] = np.conj(tuy[iy, ix]) vx = np.fft.ifft2(tux).real * (n**2) vy = np.fft.ifft2(tuy).real * (n**2) ds = from_arrays(x2d, y2d, vx, vy, mask=np.ones((n, n), dtype=float), frame=frame) ds["x"].attrs["units"] = "au" ds["y"].attrs["units"] = "au" ds["u"].attrs["units"] = "au" ds["v"].attrs["units"] = "au" ds.attrs["ysign"] = "Y axis downward" ds.attrs["name"] = "randvec" ds.attrs["setname"] = "" ds.attrs["history"] = [f"randvec({n},{nf},{slope},{nc},{nl})"] fields.append(ds) return xr.concat(fields, dim="t")