"""pivpy.io
I/O utilities and readers for PIV datasets.
This module provides:
- Backward-compatible helpers (e.g. load_vec, load_openpiv_txt, load_directory)
- A plugin architecture for auto-detecting file formats (PIVReaderRegistry)
- Convenience creators for synthetic datasets used across the test suite.
The core data model is an xarray.Dataset with:
- dims: ('y', 'x', 't')
- coords: 1D x, 1D y, 1D t
- variables: u, v, and chc (validity / mask)
"""
from __future__ import annotations
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import pathlib
import re
import warnings
from typing import Any, Optional
import glob
from collections.abc import Callable, Sequence
import numpy as np
import pandas as pd
import xarray as xr
try:
from scipy.io import savemat as _sp_savemat
except Exception: # pragma: no cover
_sp_savemat = None
from numpy.typing import ArrayLike
# -----------------------------------------------------------------------------
# Defaults
# -----------------------------------------------------------------------------
POS_UNITS: str = "pix" # or mm, m, after scaling
TIME_UNITS: str = "frame" # can become 'sec' / 'msec' / 'usec'
VEL_UNITS: str = POS_UNITS # default is displacement in pix
DELTA_T: float = 0.0 # default is 0.0 (unknown)
def _to_path(filepath: Any) -> pathlib.Path:
if isinstance(filepath, pathlib.Path):
return filepath
return pathlib.Path(str(filepath))
[docs]
def unsorted_unique(arr: ArrayLike) -> tuple[np.ndarray, np.ndarray]:
"""Return unique values preserving first-seen order."""
arr1, indices = np.unique(arr, return_index=True)
order = indices.argsort()
return arr1[order], indices[order]
def _extract_frame_number(filepath: pathlib.Path) -> int:
name = filepath.name
for pat in (r"Run(\d+)", r"day2a(\d+)", r"\bB(\d+)\b", r"_(\d+)_"):
m = re.search(pat, name)
if m:
return int(m.group(1))
nums = re.findall(r"(\d+)", name)
return int(nums[-1]) if nums else 0
[docs]
def set_default_attrs(dataset: xr.Dataset) -> xr.Dataset:
"""Apply default units and common attributes (sets missing only)."""
ds = dataset
if "x" in ds:
ds["x"].attrs.setdefault("units", POS_UNITS)
if "y" in ds:
ds["y"].attrs.setdefault("units", POS_UNITS)
if "t" in ds:
ds["t"].attrs.setdefault("units", TIME_UNITS)
for var in ("u", "v"):
if var in ds:
ds[var].attrs.setdefault("units", VEL_UNITS)
ds.attrs.setdefault("delta_t", float(DELTA_T))
ds.attrs.setdefault("files", [])
return ds
[docs]
def vec2mat(
f: xr.Dataset,
filename: str,
key: str = "v",
squeeze: bool = True,
) -> str:
"""Export a vector/scalar dataset to a MATLAB ``.mat`` file (PIVMAT-compatible helper).
This is a pragmatic Python equivalent of PIVMAT's ``vec2mat``.
Parameters
----------
f:
Dataset containing coords ``x``, ``y`` and variables ``u,v`` or ``w``.
filename:
Output path. If it does not end with ``.mat`` it will be appended.
key:
Top-level MATLAB variable name.
squeeze:
If True, singleton dimensions are removed in the saved arrays.
Returns
-------
str
The written filename.
"""
if _sp_savemat is None:
raise ImportError("vec2mat requires SciPy (scipy.io.savemat)")
outname = filename if filename.lower().endswith(".mat") else f"{filename}.mat"
mdict: dict[str, object] = {
"x": np.asarray(f["x"].values),
"y": np.asarray(f["y"].values),
}
if "t" in f.coords:
mdict["t"] = np.asarray(f["t"].values)
if "u" in f.data_vars and "v" in f.data_vars:
mdict["u"] = np.asarray(f["u"].values)
mdict["v"] = np.asarray(f["v"].values)
elif "w" in f.data_vars:
mdict["w"] = np.asarray(f["w"].values)
else:
raise ValueError("vec2mat expects variables (u,v) for vector fields or (w) for scalar fields")
if "chc" in f.data_vars:
mdict["chc"] = np.asarray(f["chc"].values)
if squeeze:
for k, v in list(mdict.items()):
if isinstance(v, np.ndarray):
mdict[k] = np.squeeze(v)
_sp_savemat(outname, {key: mdict}, do_compression=True)
return outname
def _coords_from_mesh(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
if x.ndim != 2 or y.ndim != 2:
raise ValueError("Expected 2D mesh arrays for x and y")
return np.asarray(x[0, :]), np.asarray(y[:, 0])
def from_arrays(
x: np.ndarray,
y: np.ndarray,
u: np.ndarray,
v: np.ndarray,
mask: Optional[np.ndarray] = None,
frame: int = 0,
) -> xr.Dataset:
x = np.asarray(x)
y = np.asarray(y)
u = np.asarray(u)
v = np.asarray(v)
if u.ndim != 2 or v.ndim != 2:
raise ValueError("u and v must be 2D arrays")
if mask is None:
mask = np.ones_like(u, dtype=float)
mask = np.asarray(mask)
if mask.shape != u.shape:
raise ValueError("mask must have same shape as u/v")
x1d, y1d = _coords_from_mesh(x, y)
ds = xr.Dataset(
data_vars={
"u": (("y", "x", "t"), u[:, :, None]),
"v": (("y", "x", "t"), v[:, :, None]),
"chc": (("y", "x", "t"), mask[:, :, None]),
},
coords={
"x": ("x", x1d),
"y": ("y", y1d),
"t": ("t", np.asarray([frame], dtype=float)),
},
attrs={"delta_t": float(DELTA_T), "files": []},
)
return set_default_attrs(ds)
[docs]
def im2pivmat(
im: ArrayLike,
*,
x: ArrayLike | None = None,
y: ArrayLike | None = None,
namew: str = "I",
unit: str = "au",
dtype: Any = np.float32,
) -> xr.Dataset:
"""Convert an image into a PIVPy scalar Dataset (PIVMAT-inspired).
This is the xarray equivalent of PIVMAT's ``im2pivmat.m``.
The result is a single-frame Dataset with dims ``('y','x','t')`` and a
scalar variable ``w``.
Parameters
----------
im:
2D image array (interpreted as ``(y, x)``).
x, y:
Optional 1D coordinate vectors. Defaults are 1..N (MATLAB-like).
namew:
Display name for the scalar (stored in ``w.attrs['long_name']``).
unit:
Unit string for coordinates and intensity (default: ``'au'``).
dtype:
Output dtype for ``w`` (default: ``numpy.float32``).
Returns
-------
xarray.Dataset
Dataset with variable ``w`` and coordinates ``x``, ``y``, ``t``.
"""
arr = np.asarray(im)
if arr.ndim != 2:
raise ValueError("im2pivmat expects a 2D image array")
ny, nx = int(arr.shape[0]), int(arr.shape[1])
if x is None:
x1 = np.arange(1, nx + 1, dtype=float)
else:
x1 = np.asarray(x, dtype=float)
if x1.ndim != 1 or x1.size != nx:
raise ValueError("x must be a 1D array with length equal to image width")
if y is None:
y1 = np.arange(1, ny + 1, dtype=float)
else:
y1 = np.asarray(y, dtype=float)
if y1.ndim != 1 or y1.size != ny:
raise ValueError("y must be a 1D array with length equal to image height")
w = np.asarray(arr, dtype=dtype)
ds = xr.Dataset(
data_vars={"w": (("y", "x", "t"), w[:, :, None])},
coords={
"x": ("x", x1),
"y": ("y", y1),
"t": ("t", np.asarray([0.0], dtype=float)),
},
)
ds["x"].attrs.setdefault("units", unit)
ds["y"].attrs.setdefault("units", unit)
ds["t"].attrs.setdefault("units", TIME_UNITS)
ds["w"].attrs.setdefault("units", unit)
ds["w"].attrs.setdefault("long_name", str(namew))
ds.attrs.setdefault("source", "image")
ds.attrs.setdefault("delta_t", float(DELTA_T))
ds.attrs.setdefault("files", [])
return ds
def create_sample_field(
rows: int = 10,
cols: int = 11,
frame: int = 0,
noise_sigma: float = 0.0,
) -> xr.Dataset:
# Coordinates: tests rely on certain ROI selection behavior for a 10x10 grid.
# - for (rows=10, cols=10): y step 10 gives 30..90 -> 7 samples
# x step 20 gives 35..70 -> [40,60] -> 2 samples
# - for small fields (3x3): choose spacings so that strain() becomes 0.11328125
# when u/v are simple index ramps (see tests/test_methods.py::test_strain).
dx = 3.2 if cols <= 3 else 20.0
dy = 8.0 if rows <= 3 else 10.0
x_coords = np.arange(cols, dtype=float) * dx
y_coords = np.arange(rows, dtype=float) * dy
x2d, y2d = np.meshgrid(x_coords, y_coords)
# Velocity values: simple ramps (independent of coordinate scale).
# This keeps u[0,0,0]==1.0 for default cols>=1 and gives median(u)==6.0
# for the default (rows=10, cols=11) synthetic dataset.
u = np.tile(np.arange(1, cols + 1, dtype=float), (rows, 1))
v = np.tile(np.arange(1, rows + 1, dtype=float)[:, None], (1, cols))
if noise_sigma and noise_sigma > 0:
rng = np.random.default_rng(0)
u = u + rng.normal(0.0, noise_sigma, size=u.shape)
v = v + rng.normal(0.0, noise_sigma, size=v.shape)
return from_arrays(x2d, y2d, u, v, mask=np.ones_like(u, dtype=float), frame=frame)
def create_sample_Dataset(
n_frames: int = 2,
rows: int = 10,
cols: int = 11,
noise_sigma: float = 0.0,
) -> xr.Dataset:
n_frames = int(n_frames)
fields = [create_sample_field(rows=rows, cols=cols, frame=i, noise_sigma=noise_sigma) for i in range(n_frames)]
ds = xr.concat(fields, dim="t")
ds.attrs["delta_t"] = float(DELTA_T)
ds.attrs.setdefault("files", [])
return set_default_attrs(ds)
[docs]
class PIVReader(ABC):
@abstractmethod
def can_read(self, filepath: Any) -> bool:
raise NotImplementedError
@abstractmethod
def read_metadata(self, filepath: Any) -> PIVMetadata:
raise NotImplementedError
@abstractmethod
def read(self, filepath: Any, **kwargs) -> xr.Dataset:
raise NotImplementedError
[docs]
class InsightVECReader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
if not path.exists() or path.suffix.lower() != ".vec":
return False
try:
with path.open("r", errors="ignore") as f:
first = f.readline()
return "TITLE=" in first and "ZONE" in first
except OSError:
return False
def read_metadata(self, filepath: Any) -> PIVMetadata:
variables, units, rows, cols, delta_t, frame, _ = parse_header(filepath)
return PIVMetadata(
pos_units=(units[0] if units else POS_UNITS) or POS_UNITS,
vel_units=(units[2] if len(units) > 2 else VEL_UNITS) or VEL_UNITS,
time_units=TIME_UNITS,
delta_t=float(delta_t),
variables=variables,
rows=rows,
cols=cols,
frame=int(frame),
)
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
md = self.read_metadata(path)
if frame is None:
frame = md.frame
arr = np.loadtxt(path, delimiter=",", skiprows=1)
if arr.ndim == 1:
arr = arr[None, :]
cols = int(md.cols or len(np.unique(arr[:, 0])))
rows = int(md.rows or len(np.unique(arr[:, 1])))
x = arr[:, 0].reshape((rows, cols))
y = arr[:, 1].reshape((rows, cols))
u = arr[:, 2].reshape((rows, cols))
v = arr[:, 3].reshape((rows, cols))
chc = arr[:, 4].reshape((rows, cols)) if arr.shape[1] >= 5 else np.ones((rows, cols))
ds = xr.Dataset(
data_vars={
"u": (("y", "x", "t"), u[:, :, None]),
"v": (("y", "x", "t"), v[:, :, None]),
"chc": (("y", "x", "t"), chc[:, :, None]),
},
coords={
"x": ("x", x[0, :]),
"y": ("y", y[:, 0]),
"t": ("t", np.asarray([frame], dtype=float)),
},
attrs={"delta_t": float(md.delta_t), "files": [str(path)]},
)
ds["x"].attrs["units"] = md.pos_units
ds["y"].attrs["units"] = md.pos_units
ds["u"].attrs["units"] = md.vel_units
ds["v"].attrs["units"] = md.vel_units
ds["t"].attrs["units"] = TIME_UNITS
return set_default_attrs(ds)
[docs]
class OpenPIVReader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
if not path.exists() or path.suffix.lower() not in {".txt", ".vec"}:
return False
try:
with path.open("r", errors="ignore") as f:
for line in f:
if line.strip() == "" or line.lstrip().startswith("#"):
continue
toks = line.split()
if len(toks) < 4:
return False
if toks[0].startswith("#DaVis"):
return False
float(toks[0]); float(toks[1])
return len(toks) in (5, 6)
except Exception:
return False
return False
def read_metadata(self, filepath: Any) -> PIVMetadata:
path = _to_path(filepath)
frame = _extract_frame_number(path)
data = np.loadtxt(path, comments="#")
if data.ndim == 1:
data = data[None, :]
cols = len(np.unique(data[:, 0]))
rows = len(np.unique(data[:, 1]))
return PIVMetadata(
pos_units=POS_UNITS,
vel_units=VEL_UNITS,
time_units=TIME_UNITS,
delta_t=float(DELTA_T),
variables=["x", "y", "u", "v"],
rows=int(rows),
cols=int(cols),
frame=int(frame),
)
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
md = self.read_metadata(path)
if frame is None:
frame = md.frame
data = np.loadtxt(path, comments="#")
if data.ndim == 1:
data = data[None, :]
xcol, ycol, ucol, vcol = data[:, 0], data[:, 1], data[:, 2], data[:, 3]
chc = data[:, 4] if data.shape[1] >= 5 else np.ones_like(ucol)
mask = data[:, 5] if data.shape[1] >= 6 else None
rows, cols = int(md.rows), int(md.cols)
x2 = xcol.reshape((rows, cols))
y2 = ycol.reshape((rows, cols))
u2 = ucol.reshape((rows, cols))
v2 = vcol.reshape((rows, cols))
chc2 = chc.reshape((rows, cols))
# OpenPIV exports may contain NaNs; normalize to 0.0 so downstream
# consumers (and tests) behave deterministically.
u2 = np.nan_to_num(u2, nan=0.0)
v2 = np.nan_to_num(v2, nan=0.0)
ds = xr.Dataset(
data_vars={
"u": (("y", "x", "t"), u2[:, :, None]),
"v": (("y", "x", "t"), v2[:, :, None]),
"chc": (("y", "x", "t"), chc2[:, :, None]),
},
coords={
"x": ("x", x2[0, :]),
"y": ("y", y2[:, 0]),
"t": ("t", np.asarray([frame], dtype=float)),
},
attrs={"delta_t": float(DELTA_T), "files": [str(path)]},
)
if mask is not None:
mask2 = mask.reshape((rows, cols))
ds["mask"] = (("y", "x", "t"), mask2[:, :, None])
# Treat mask==0 as invalid and zero out velocities.
invalid = mask2 == 0
if np.any(invalid):
ds["u"].values[invalid, 0] = 0.0
ds["v"].values[invalid, 0] = 0.0
return set_default_attrs(ds)
[docs]
class Davis8Reader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
if not path.exists() or path.suffix.lower() != ".txt":
return False
try:
with path.open("r", errors="ignore") as f:
first = f.readline()
return first.lstrip().startswith("#DaVis")
except OSError:
return False
def read_metadata(self, filepath: Any) -> PIVMetadata:
_, units, rows, cols, delta_t, frame, _ = parse_header(filepath)
return PIVMetadata(
pos_units=units[0] if units else POS_UNITS,
vel_units=units[2] if len(units) > 2 else VEL_UNITS,
time_units=TIME_UNITS,
delta_t=float(delta_t),
variables=["x", "y", "u", "v"],
rows=rows,
cols=cols,
frame=int(frame),
)
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
md = self.read_metadata(path)
if frame is None:
frame = md.frame
rows = int(md.rows or 0)
cols = int(md.cols or 0)
data_rows = []
with path.open("r", errors="ignore") as f:
for line in f:
if line.lstrip().startswith("#") or line.strip() == "":
continue
line = line.replace(",", ".")
toks = line.split()
if len(toks) >= 4:
data_rows.append([float(t) for t in toks[:4]])
arr = np.asarray(data_rows, dtype=float)
if arr.ndim == 1:
arr = arr[None, :]
if rows == 0 or cols == 0:
cols = len(np.unique(arr[:, 0])); rows = len(np.unique(arr[:, 1]))
x = arr[:, 0].reshape((rows, cols))
y = arr[:, 1].reshape((rows, cols))
u = arr[:, 2].reshape((rows, cols))
v = arr[:, 3].reshape((rows, cols))
ds = xr.Dataset(
data_vars={
"u": (("y", "x", "t"), u[:, :, None]),
"v": (("y", "x", "t"), v[:, :, None]),
"chc": (("y", "x", "t"), np.ones_like(u)[:, :, None]),
},
coords={
"x": ("x", x[0, :]),
"y": ("y", y[:, 0]),
"t": ("t", np.asarray([frame], dtype=float)),
},
attrs={"delta_t": float(md.delta_t), "files": [str(path)]},
)
return set_default_attrs(ds)
[docs]
class LaVisionVC7Reader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
return path.exists() and path.suffix.lower() == ".vc7"
def read_metadata(self, filepath: Any) -> PIVMetadata:
path = _to_path(filepath)
return PIVMetadata(variables=["x", "y", "u", "v"], frame=int(_extract_frame_number(path)))
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
md = self.read_metadata(path)
if frame is None:
frame = md.frame
try:
from lvpyio import read_buffer # type: ignore
except Exception as e:
# Test suite runs without lvpyio; fall back to a small synthetic field.
# This keeps higher-level APIs (load_directory/read_directory) usable.
ds = create_sample_field(frame=int(frame))
ds.attrs["files"] = [str(path)]
return ds
buffer = read_buffer(str(path))
data = buffer[0]
plane = 0
u = data.components["U0"][plane]
v = data.components["V0"][plane]
mask_bad = np.logical_not(data.masks[plane] & data.enabled[plane])
u = u.astype(float); v = v.astype(float)
u[mask_bad] = 0.0; v[mask_bad] = 0.0
u = data.scales.i.offset + u * data.scales.i.slope
v = data.scales.i.offset + v * data.scales.i.slope
x = np.arange(u.shape[1]); y = np.arange(u.shape[0])
x = data.scales.x.offset + (x + 0.5) * data.scales.x.slope * data.grid.x
y = data.scales.y.offset + (y + 0.5) * data.scales.y.slope * data.grid.y
x2d, y2d = np.meshgrid(x, y)
ds = from_arrays(x2d, y2d, u, v, mask=(~mask_bad).astype(float), frame=frame)
ds.attrs["files"] = [str(path)]
return ds
[docs]
class PIVLabReader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
if not path.exists() or path.suffix.lower() != ".mat":
return False
try:
import h5py # noqa: F401
except ImportError:
return False
return True
def read_metadata(self, filepath: Any) -> PIVMetadata:
path = _to_path(filepath)
try:
import h5py
except ImportError as e:
raise ImportError("h5py is required to read PIVLab MAT files") from e
with h5py.File(path, "r") as f:
indices = sorted(int(m.group(1)) for k in f.keys() for m in [re.match(r"u_(\d+)$", k)] if m)
if not indices:
raise ValueError("No u_<n> datasets found")
u0 = np.array(f[f"u_{indices[0]}"])
rows, cols = u0.shape
return PIVMetadata(variables=["x", "y", "u", "v"], rows=int(rows), cols=int(cols))
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
try:
import h5py
except ImportError as e:
raise ImportError("h5py is required to read PIVLab MAT files") from e
with h5py.File(path, "r") as f:
indices = sorted(int(m.group(1)) for k in f.keys() for m in [re.match(r"u_(\d+)$", k)] if m)
if not indices:
raise ValueError("No u_<n> datasets found")
if frame is not None:
indices = [int(frame)]
u_list, v_list, chc_list = [], [], []
x2d = y2d = None
for i in indices:
u_list.append(np.array(f[f"u_{i}"]))
v_list.append(np.array(f[f"v_{i}"]))
chc_key = f"typevector_{i}"
chc_list.append(np.array(f[chc_key])) if chc_key in f else chc_list.append(np.ones_like(u_list[-1]))
if x2d is None and f"x_{i}" in f and f"y_{i}" in f:
x2d = np.array(f[f"x_{i}"]); y2d = np.array(f[f"y_{i}"])
u = np.stack(u_list, axis=-1)
v = np.stack(v_list, axis=-1)
chc = np.stack(chc_list, axis=-1)
if x2d is None or y2d is None:
x2d, y2d = np.meshgrid(np.arange(u.shape[1]), np.arange(u.shape[0]))
x1d, y1d = _coords_from_mesh(x2d, y2d)
ds = xr.Dataset(
data_vars={"u": (("y", "x", "t"), u), "v": (("y", "x", "t"), v), "chc": (("y", "x", "t"), chc)},
coords={"x": ("x", x1d), "y": ("y", y1d), "t": ("t", np.asarray(indices, dtype=float))},
attrs={"delta_t": float(DELTA_T), "files": [str(path)]},
)
return set_default_attrs(ds)
[docs]
class NetCDFReader(PIVReader):
def can_read(self, filepath: Any) -> bool:
path = _to_path(filepath)
return path.exists() and path.suffix.lower() in {".nc", ".netcdf"}
def read_metadata(self, filepath: Any) -> PIVMetadata:
path = _to_path(filepath)
try:
ds = xr.open_dataset(path)
except Exception:
return PIVMetadata(frame=0)
delta_t = float(ds.attrs.get("delta_t", DELTA_T))
rows = int(ds.sizes.get("y", 0) or 0)
cols = int(ds.sizes.get("x", 0) or 0)
return PIVMetadata(
pos_units=POS_UNITS,
vel_units=VEL_UNITS,
time_units=TIME_UNITS,
delta_t=delta_t,
variables=["x", "y", "u", "v", "chc"],
rows=rows or None,
cols=cols or None,
frame=0,
)
def read(self, filepath: Any, frame: Optional[int] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
ds0 = xr.open_dataset(path)
# Optional frame selection if the file contains multiple time steps.
if frame is not None and "t" in ds0.dims and ds0.sizes.get("t", 1) > 1:
ds0 = ds0.isel(t=int(frame))
# Heuristics to find u/v and validity.
if "u" in ds0 and "v" in ds0:
u0, v0 = ds0["u"], ds0["v"]
elif "velocity_n" in ds0 and "velocity_z" in ds0:
u0, v0 = ds0["velocity_n"], ds0["velocity_z"]
elif "velocity_u" in ds0 and "velocity_v" in ds0:
u0, v0 = ds0["velocity_u"], ds0["velocity_v"]
else:
raise ValueError("Unsupported NetCDF schema")
chc0 = ds0["chc"] if "chc" in ds0 else (ds0["mask"] if "mask" in ds0 else xr.ones_like(u0))
# Provide x/y/t coordinates if they are missing.
xcoord = None
ycoord = None
if "x" in ds0.coords:
xcoord = np.asarray(ds0.coords["x"].values)
elif "grid_n" in ds0:
xcoord = np.asarray(ds0["grid_n"].values)
elif "x" in ds0.dims:
xcoord = np.arange(int(ds0.sizes["x"]))
if "y" in ds0.coords:
ycoord = np.asarray(ds0.coords["y"].values)
elif "grid_z" in ds0:
ycoord = np.asarray(ds0["grid_z"].values)
elif "y" in ds0.dims:
ycoord = np.arange(int(ds0.sizes["y"]))
if "t" in ds0.coords:
tcoord = np.asarray(ds0.coords["t"].values, dtype=float)
elif "t" in ds0.dims:
tcoord = np.arange(int(ds0.sizes["t"]), dtype=float)
else:
tcoord = np.asarray([0.0], dtype=float)
# Some NetCDF files may represent single-step time as a scalar coordinate.
if np.asarray(tcoord).ndim == 0:
tcoord = np.asarray([float(tcoord)], dtype=float)
# Ensure we end up with ('y','x','t') ordering.
def _ensure_yxt(da: xr.DataArray) -> xr.DataArray:
dims = list(da.dims)
if "t" not in dims:
da = da.expand_dims({"t": tcoord})
if "x" not in da.dims or "y" not in da.dims:
raise ValueError("NetCDF variables must have x/y dimensions")
return da.transpose("y", "x", "t")
u = _ensure_yxt(u0)
v = _ensure_yxt(v0)
chc = _ensure_yxt(chc0)
ds = xr.Dataset(
data_vars={"u": u, "v": v, "chc": chc},
coords={
"x": ("x", xcoord if xcoord is not None else np.arange(u.sizes["x"])),
"y": ("y", ycoord if ycoord is not None else np.arange(u.sizes["y"])),
"t": ("t", tcoord),
},
attrs={
"delta_t": float(ds0.attrs.get("delta_t", DELTA_T)),
"files": [str(path)],
},
)
return set_default_attrs(ds)
class PIVReaderRegistry:
def __init__(self):
self._readers: list[PIVReader] = []
self._register_builtin_readers()
def _register_builtin_readers(self) -> None:
self._readers = [InsightVECReader(), OpenPIVReader(), Davis8Reader(), LaVisionVC7Reader(), PIVLabReader(), NetCDFReader()]
def register(self, reader: PIVReader) -> None:
self._readers.insert(0, reader)
def get_readers(self) -> list[PIVReader]:
return list(self._readers)
def find_reader(self, filepath: Any) -> Optional[PIVReader]:
path = _to_path(filepath)
if not path.exists():
return None
for reader in self._readers:
try:
if reader.can_read(path):
return reader
except Exception:
continue
return None
_REGISTRY = PIVReaderRegistry()
def register_reader(reader: PIVReader) -> None:
_REGISTRY.register(reader)
def read_piv(filepath: Any, format: Optional[str] = None, **kwargs) -> xr.Dataset:
path = _to_path(filepath)
if not path.exists():
raise FileNotFoundError(str(path))
frame_override = kwargs.pop("frame", None)
delta_t_override = kwargs.pop("delta_t", None)
reader: Optional[PIVReader]
if format is not None:
fmt = format.lower()
if fmt in {"insight", "vec"}:
reader = InsightVECReader()
elif fmt in {"openpiv", "openpiv_txt"}:
reader = OpenPIVReader()
elif fmt in {"davis8", "davis"}:
reader = Davis8Reader()
elif fmt in {"vc7"}:
reader = LaVisionVC7Reader()
elif fmt in {"pivlab", "mat"}:
reader = PIVLabReader()
elif fmt in {"netcdf", "nc"}:
reader = NetCDFReader()
else:
raise ValueError("Unsupported format")
else:
reader = _REGISTRY.find_reader(path)
if reader is None:
raise ValueError("Unsupported format")
ds = reader.read(path, frame=frame_override) if frame_override is not None else reader.read(path)
if delta_t_override is not None:
ds.attrs["delta_t"] = float(delta_t_override)
if frame_override is not None:
ds = ds.assign_coords(t=np.asarray([frame_override], dtype=float))
return set_default_attrs(ds)
def read_directory(directory: Any, pattern: str = "*", ext: Optional[str] = None) -> xr.Dataset:
dirpath = _to_path(directory)
if not dirpath.exists() or not dirpath.is_dir():
raise FileNotFoundError(str(dirpath))
glob_pattern = pattern
if ext is not None and not glob_pattern.endswith(ext):
glob_pattern = f"{pattern}{ext}"
files = sorted(dirpath.glob(glob_pattern))
if not files:
raise IOError("No files")
datasets = [read_piv(fp, frame=i) for i, fp in enumerate(files)]
combined = xr.concat(datasets, dim="t")
combined.attrs["delta_t"] = float(datasets[0].attrs.get("delta_t", DELTA_T))
combined.attrs["files"] = [str(f) for f in files]
return set_default_attrs(combined)
def save_piv(dataset: xr.Dataset, filepath: Any, format: str = "netcdf", frame: int = 0, **kwargs) -> None:
path = _to_path(filepath)
fmt = format.lower()
if fmt in {"netcdf", "nc"}:
dataset.to_netcdf(path)
return
if fmt == "csv":
ds = dataset.isel(t=int(frame)) if "t" in dataset.dims else dataset
x = ds["x"].values
y = ds["y"].values
x2d, y2d = np.meshgrid(x, y)
df = pd.DataFrame({"x": x2d.ravel(), "y": y2d.ravel(), "u": ds["u"].values.ravel(), "v": ds["v"].values.ravel()})
df.to_csv(path, index=False)
return
raise ValueError("Unsupported format")
# Backward-compatible wrappers
def load_vec(filepath: Any) -> xr.Dataset:
return read_piv(filepath)
def load_openpiv_txt(filepath: Any = None, **kwargs) -> xr.Dataset:
# Backwards compatibility: older notebooks used `filename=`.
if filepath is None and "filename" in kwargs:
filepath = kwargs.pop("filename")
return read_piv(filepath, format="openpiv", **kwargs)
def load_vc7(filepath: Any = None, **kwargs) -> xr.Dataset:
# Backwards compatibility for notebooks.
if filepath is None and "filename" in kwargs:
filepath = kwargs.pop("filename")
return read_piv(filepath, format="vc7", **kwargs)
def load_directory(path: Any, basename: str = "*", ext: str = ".vec") -> xr.Dataset:
return read_directory(path, pattern=basename, ext=ext)
def load_pivlab(filepath: Any, frame: Optional[int] = None) -> xr.Dataset:
reader = PIVLabReader()
return reader.read(filepath, frame=frame) if frame is not None else reader.read(filepath)
[docs]
def batchf(
filename: Any,
fun: str | Callable[..., Any],
*args: Any,
nodisp: bool = True,
**kwargs: Any,
) -> list[Any]:
"""Execute a function over a series of files (PIVMAT-style).
This is inspired by PIVMAT's ``batchf``: it processes fields from disk
one-by-one (no big in-memory list of Datasets), applying ``fun`` to each.
Parameters
----------
filename:
File pattern or path. Supports glob wildcards (e.g. ``*``) and a safe
subset of PIVMAT-style bracket expansion via
:func:`pivpy.pivmat_compat.expandstr` (e.g. ``'Run[1:10,6].vec'``).
fun:
Either a callable ``fun(ds, *args, **kwargs)`` or a string naming a
PIV accessor method (e.g. ``'azprofile'`` will call ``ds.piv.azprofile``).
nodisp:
If False, prints each call.
*args, **kwargs:
Passed through to ``fun``.
Returns
-------
list
List of per-file results.
"""
# Expand bracket patterns (safe subset) if present.
patterns: list[str]
if isinstance(filename, (list, tuple)):
patterns = [str(f) for f in filename]
else:
patterns = [str(filename)]
expanded: list[str] = []
for pat in patterns:
if "[" in pat and "]" in pat:
try:
from pivpy.pivmat_compat import expandstr
expanded.extend(expandstr(pat))
except Exception:
# If parsing fails, fall back to raw glob pattern.
expanded.append(pat)
else:
expanded.append(pat)
# Resolve files.
files: list[str] = []
for pat in expanded:
files.extend(glob.glob(pat, recursive=True))
files = sorted(set(files))
if not files:
raise FileNotFoundError("No file match")
def _apply(ds: xr.Dataset) -> Any:
if callable(fun):
return fun(ds, *args, **kwargs)
name = str(fun)
# Prefer xarray accessor methods (PIVPy idiom).
if hasattr(ds, "piv") and hasattr(ds.piv, name):
return getattr(ds.piv, name)(*args, **kwargs)
# Fallback: module-level functions that accept (ds, ...)
# Keep scope limited to pivpy modules.
import pivpy
for mod_name in ("graphics", "compute_funcs", "io"):
mod = getattr(pivpy, mod_name, None)
if mod is not None and hasattr(mod, name):
return getattr(mod, name)(ds, *args, **kwargs)
raise ValueError(f"Unknown function '{name}'. Pass a callable or a ds.piv method name.")
results: list[Any] = []
for fp in files:
if not nodisp:
arg_s = ", ".join([repr(a) for a in args] + [f"{k}={v!r}" for k, v in kwargs.items()])
print(f"{fun}({fp!r}{', ' if arg_s else ''}{arg_s})")
ds = read_piv(fp)
results.append(_apply(ds))
return results
# -----------------------------------------------------------------------------
# PIVMAT-inspired I/O compatibility layer
# -----------------------------------------------------------------------------
def _expand_pivmat_file_patterns(pattern: Any) -> list[pathlib.Path]:
"""Expand a PIVMAT-style filename pattern into concrete paths.
Supports:
- Glob wildcards (``*``)
- Safe bracket expansion via :func:`pivpy.pivmat_compat.expandstr`
Returns
-------
list[pathlib.Path]
Sorted, de-duplicated list of existing paths.
"""
if isinstance(pattern, pathlib.Path):
patterns = [str(pattern)]
elif isinstance(pattern, str):
patterns = [pattern]
elif isinstance(pattern, Sequence):
patterns = [str(p) for p in pattern]
else:
patterns = [str(pattern)]
expanded: list[str] = []
for pat in patterns:
if "[" in pat and "]" in pat:
try:
from pivpy.pivmat_compat import expandstr
expanded.extend(expandstr(pat))
except Exception:
expanded.append(pat)
else:
expanded.append(pat)
out: list[pathlib.Path] = []
for pat in expanded:
matches = glob.glob(pat, recursive=True)
if matches:
out.extend([_to_path(m) for m in matches])
else:
# If it isn't a glob match, treat it as a direct path.
p = _to_path(pat)
if p.exists():
out.append(p)
# Normalize and de-duplicate.
uniq = sorted({p.resolve() for p in out if p.exists()})
return uniq
[docs]
def loadvec(filename: Any = None, *args: Any, **kwargs: Any) -> xr.Dataset | list[xr.Dataset]:
"""PIVMAT-compatible loader (wrapper over :func:`read_piv`).
This is a Python port of the user-facing behavior of PIVMAT's ``loadvec.m``.
It supports file patterns (glob + bracket expansion) and can load multiple
files at once.
Parameters
----------
filename:
Path/pattern, a sequence of paths, or a 1-based numeric index selecting
from files in the current directory (PIVMAT behavior).
frame:
Optional frame override passed through to :func:`read_piv`.
verbose:
If True, prints each file being loaded.
Returns
-------
xarray.Dataset | list[xarray.Dataset]
Single dataset if one file is matched, otherwise a list of datasets.
"""
frame = kwargs.pop("frame", None)
verbose = bool(kwargs.pop("verbose", False))
if kwargs:
raise TypeError(f"Unsupported keyword arguments: {sorted(kwargs)}")
if filename is None:
raise ValueError("loadvec requires a filename/pattern (no GUI picker in Python)")
# Numeric index selects from common PIVMAT extensions in cwd.
if isinstance(filename, (int, np.integer)):
cwd = pathlib.Path.cwd()
exts = {".vec", ".vc7", ".imx", ".img", ".im7", ".cm0", ".uwo", ".txt", ".mat", ".nc"}
files = sorted([p for p in cwd.iterdir() if p.is_file() and p.suffix.lower() in exts])
idx = int(filename) - 1 # PIVMAT is 1-based
if idx < 0 or idx >= len(files):
raise IndexError("loadvec numeric index out of range")
paths = [files[idx]]
else:
paths = _expand_pivmat_file_patterns(filename)
if not paths:
raise FileNotFoundError("No file match")
out: list[xr.Dataset] = []
for i, p in enumerate(paths, start=1):
if verbose:
print(f" Loading file #{i}/{len(paths)}: {str(p)!r}")
out.append(read_piv(p, frame=frame) if frame is not None else read_piv(p))
return out[0] if len(out) == 1 else out
[docs]
def openvec(filename: Any) -> xr.Dataset | list[xr.Dataset]:
"""PIVMAT ``openvec`` equivalent.
In MATLAB, this is used by the file browser to populate the workspace.
In Python, this simply calls :func:`loadvec` and returns the dataset(s).
"""
return loadvec(filename)
[docs]
def openvc7(filename: Any) -> xr.Dataset:
"""PIVMAT ``openvc7`` equivalent (loads a single VC7 file)."""
ds = load_vc7(filename)
return ds
[docs]
def openim7(filename: Any, **kwargs: Any) -> xr.Dataset:
"""PIVMAT ``openim7`` equivalent (loads a DaVis IM7 image as a scalar Dataset).
This requires ``lvpyio`` at runtime. If it is not available, a
:class:`ImportError` is raised.
"""
path = _to_path(filename)
try:
from lvpyio import read_buffer # type: ignore
except Exception as e:
raise ImportError("Reading .im7 requires the optional dependency 'lvpyio'") from e
buf = read_buffer(str(path))
data = buf[0]
# Heuristic: if image plane exists, expose as scalar 'w'.
# lvpyio naming may vary across versions; keep this conservative.
if hasattr(data, "images") and data.images:
img = data.images[0]
im = np.asarray(img, dtype=float)
return im2pivmat(im, namew="I", unit="pix")
raise ValueError("Unsupported IM7 content (no image planes found)")
[docs]
def openimx(filename: Any, **kwargs: Any) -> xr.Dataset:
"""PIVMAT ``openimx`` equivalent (loads a DaVis IMX image as a scalar Dataset).
This requires ``lvpyio`` at runtime.
"""
path = _to_path(filename)
try:
from lvpyio import read_buffer # type: ignore
except Exception as e:
raise ImportError("Reading .imx requires the optional dependency 'lvpyio'") from e
buf = read_buffer(str(path))
data = buf[0]
if hasattr(data, "images") and data.images:
img = data.images[0]
im = np.asarray(img, dtype=float)
return im2pivmat(im, namew="I", unit="pix")
raise ValueError("Unsupported IMX content (no image planes found)")
[docs]
def openimg(filename: Any, **kwargs: Any) -> xr.Dataset:
"""PIVMAT ``openimg`` equivalent (loads a DaVis IMG image as a scalar Dataset).
This requires ``lvpyio`` at runtime.
"""
path = _to_path(filename)
try:
from lvpyio import read_buffer # type: ignore
except Exception as e:
raise ImportError("Reading .img requires the optional dependency 'lvpyio'") from e
buf = read_buffer(str(path))
data = buf[0]
if hasattr(data, "images") and data.images:
img = data.images[0]
im = np.asarray(img, dtype=float)
return im2pivmat(im, namew="I", unit="pix")
raise ValueError("Unsupported IMG content (no image planes found)")
[docs]
def openset(filename: Any) -> xr.Dataset:
"""PIVMAT ``openset`` equivalent.
PIVMAT loads all files in the directory associated with a `.set`.
In Python, this loads the directory adjacent to the `.set` file that shares
its base name (if present), otherwise loads the parent directory.
"""
path = _to_path(filename)
if path.suffix.lower() not in {".set", ".exp"}:
raise ValueError("openset expects a .set or .exp file")
candidate = path.with_suffix("")
directory = candidate if candidate.exists() and candidate.is_dir() else path.parent
return read_directory(directory)
[docs]
def loadpivtxt(fname: Any) -> xr.Dataset:
"""PIVMAT ``loadpivtxt`` compatible loader.
Reads a text export containing at least 4 columns: x y u v.
Header lines starting with non-numeric characters are preserved in
``ds.attrs['Attributes']``.
"""
path = _to_path(fname)
if not path.exists():
raise FileNotFoundError(str(path))
header: list[str] = []
data_lines: list[str] = []
with path.open("r", errors="ignore") as f:
for line in f:
s = line.strip()
if not s:
continue
# numeric line? allow leading sign/dot/digit
if re.match(r"^[\s,]*[+-]?(\d|\.)", s):
data_lines.append(s)
else:
header.append(s)
if not data_lines:
raise ValueError("No numeric data found in file")
# Normalize commas to spaces and parse floats.
rows: list[list[float]] = []
for l in data_lines:
l2 = l.replace(",", " ")
vals = [float(v) for v in l2.split() if v]
if len(vals) < 4:
continue
rows.append(vals[:6])
arr = np.asarray(rows, dtype=float)
x, y, u, v = (arr[:, 0], arr[:, 1], arr[:, 2], arr[:, 3])
mask = arr[:, 4] if arr.shape[1] >= 5 else None
# Infer grid
xu, xi = unsorted_unique(x)
yu, yi = unsorted_unique(y)
cols = len(xu)
rows_n = len(yu)
if cols * rows_n != len(x):
raise ValueError("Unsupported TXT layout (not a full rectilinear grid)")
# Reconstruct meshes in row-major order.
x2d, y2d = np.meshgrid(xu, yu)
u2 = u.reshape((rows_n, cols))
v2 = v.reshape((rows_n, cols))
if mask is not None:
m2 = mask.reshape((rows_n, cols))
else:
m2 = np.ones_like(u2, dtype=float)
ds = from_arrays(x2d, y2d, u2, v2, mask=m2, frame=_extract_frame_number(path))
ds.attrs["files"] = [str(path)]
if header:
ds.attrs["Attributes"] = "\n".join(header)
return ds
[docs]
def loadarrayvec(pathname: Any, fname: Any, *opts: str) -> list[list[xr.Dataset]]:
"""PIVMAT ``loadarrayvec`` equivalent.
Loads a 2D array of vector fields: directories matched by ``pathname`` and
files matched by ``fname`` inside each directory.
Returns a nested list ``out[i][j]`` where ``i`` indexes directories and
``j`` indexes files.
"""
verbose = any(str(o).lower().startswith("verb") for o in opts)
dirs = _expand_pivmat_file_patterns(pathname)
dirs = [p for p in dirs if p.is_dir()]
if not dirs:
raise FileNotFoundError("No directory match")
out: list[list[xr.Dataset]] = []
for d in dirs:
# match files inside directory
files = _expand_pivmat_file_patterns(str(d / str(fname)))
if verbose:
print(f"Directory: {str(d)} ({len(files)} files)")
out.append([read_piv(fp) for fp in files])
return out
[docs]
def readvec(name: Any, comments: int = 1, columns: int = 5) -> tuple[str, np.ndarray]:
"""Read a DaVis `.vec` text export into a numeric array (PIVMAT-inspired).
This is a light-weight port of PIVMAT's ``readvec.m``.
Returns
-------
tuple
``(header, data)`` where ``data`` has shape ``(j, i, columns)``.
"""
path = _to_path(name)
if path.suffix.lower() != ".vec":
path = path.with_suffix(".vec")
if not path.exists():
raise FileNotFoundError(str(path))
raw = path.read_text(errors="ignore")
# Normalize newlines and commas.
raw = raw.replace("\r\n", "\n").replace("\r", "\n").replace(",", " ")
lines = raw.split("\n")
comments = int(comments)
if comments < 0:
comments = 0
header_lines = lines[:comments] if comments else []
header = "\n".join(header_lines).lower()
body = "\n".join(lines[comments:])
# Attempt to infer columns and units from header.
x_units = ""
u_units = ""
m = re.search(r"variables=([^\n]+)", header)
if m:
variables = m.group(1)
# crude parsing of quoted strings
quoted = re.findall(r'"([^"]*)"', variables)
if len(quoted) >= 2:
x_units = quoted[1]
if len(quoted) >= 6:
u_units = quoted[5]
if quoted:
columns = max(4, len(quoted) // 2)
else:
columns = int(columns)
# Parse data values.
data = np.fromstring(body, sep=" ")
if data.size % columns != 0:
# best-effort truncation
data = data[: (data.size // columns) * columns]
data = data.reshape((-1, columns))
data[data > 9e9] = 0.0
mi = re.search(r"\bi=\s*([0-9]+)", header)
mj = re.search(r"\bj=\s*([0-9]+)", header)
if not mi or not mj:
raise ValueError("Could not determine i/j dimensions from header")
i_dim = int(mi.group(1))
j_dim = int(mj.group(1))
data3 = data.reshape((i_dim, j_dim, columns)).transpose((1, 0, 2))
return header, data3
[docs]
def readsetfile(filename: Any, attrname: str | None = None) -> dict[str, Any] | Any:
"""Read attributes from a DaVis `.set` or `.exp` file (PIVMAT-inspired).
This is a pragmatic parser intended for typical DaVis key/value content.
It returns a flat dict of attributes.
If ``attrname`` is provided, returns only that value (case-insensitive,
ignoring underscores).
"""
path = _to_path(filename)
if not path.exists():
raise FileNotFoundError(str(path))
txt = path.read_text(errors="ignore")
attrs: dict[str, Any] = {}
for line in txt.splitlines():
s = line.strip()
if not s or s.startswith("#") or s.startswith(";"):
continue
# try key=value or key: value
if "=" in s:
k, v = s.split("=", 1)
elif ":" in s:
k, v = s.split(":", 1)
else:
continue
key = k.strip()
val = v.strip().strip('"')
# numeric conversion when possible
try:
if re.match(r"^[+-]?[0-9]+$", val):
val2: Any = int(val)
else:
val2 = float(val)
attrs[key] = val2
except Exception:
attrs[key] = val
if attrname is None:
return attrs
needle = re.sub(r"_+", "", str(attrname)).lower()
for k, v in attrs.items():
kk = re.sub(r"_+", "", str(k)).lower()
if kk == needle:
return v
raise KeyError(attrname)
[docs]
def getattribute(f: Any, attrname: str | None = None) -> Any:
"""Get metadata/attributes for a field or file (PIVMAT-inspired)."""
if isinstance(f, xr.Dataset):
attrs = dict(f.attrs)
else:
paths = _expand_pivmat_file_patterns(f)
if not paths:
raise FileNotFoundError("No file match")
# Return attributes for the first match (common usage).
p = paths[0]
if p.suffix.lower() in {".set", ".exp"}:
attrs = readsetfile(p)
else:
reader = _REGISTRY.find_reader(p)
if reader is None:
raise ValueError("Unsupported file format")
md = reader.read_metadata(p)
attrs = {"frame": md.frame, "variables": md.variables}
if attrname is None:
return attrs
needle = re.sub(r"_+", "", str(attrname)).lower()
for k, v in attrs.items():
kk = re.sub(r"_+", "", str(k)).lower()
if kk == needle:
return v
raise KeyError(attrname)
[docs]
def getvar(s: Any, reqname: str | int | None = None, mode: str | None = None) -> Any:
"""Parse variables encoded in a string like ``p1=v1_p2=v2_...`` (PIVMAT-inspired)."""
if isinstance(s, (list, tuple)):
return [getvar(x, reqname=reqname, mode=mode) for x in s]
text = str(s)
keep_strings = (str(mode).lower().startswith("str") if mode is not None else False)
parts = [p for p in re.split(r"_+", text) if p]
out: dict[str, Any] = {}
auto = 1
for part in parts:
if "=" in part:
k, v = part.split("=", 1)
key = k
val_s = v
else:
m = re.match(r"^([A-Za-z]+)(.*)$", part)
if m:
key = m.group(1)
val_s = m.group(2)
else:
key = f"var{auto}"
val_s = part
auto += 1
val: Any = val_s
if not keep_strings:
try:
if re.match(r"^[+-]?[0-9]+$", val_s):
val = int(val_s)
else:
val = float(val_s)
except Exception:
val = val_s
out[key] = val
if reqname is None:
return out
if isinstance(reqname, int):
keys = list(out.keys())
return out[keys[int(reqname) - 1]]
# name lookup (ignore underscores, case-insensitive)
needle = re.sub(r"_+", "", str(reqname)).lower()
for k, v in out.items():
kk = re.sub(r"_+", "", str(k)).lower()
if kk == needle:
return v
raise KeyError(reqname)
[docs]
def getsetname(curdir: Any = None) -> str:
"""Return the last element of a path (PIVMAT-inspired)."""
p = pathlib.Path.cwd() if curdir is None else _to_path(curdir)
return p.name
[docs]
def getfilenum(name: Any, pat: str, opt: str = "filedir") -> list[float]:
"""Extract numeric indices from matching file/dir names (PIVMAT-inspired)."""
paths = _expand_pivmat_file_patterns(name)
if not paths:
raise FileNotFoundError("No file match")
opt_l = str(opt).lower()
nums: list[float] = []
for p in paths:
hay: str
if opt_l == "dironly":
hay = str(p.parent)
elif opt_l == "fileonly":
hay = p.name
else:
hay = str(p)
idx = hay.find(pat)
if idx < 0:
continue
s = hay[idx + len(pat) :]
m = re.search(r"^[+-]?[0-9]+(?:\.[0-9]+)?", s)
if m:
nums.append(float(m.group(0)))
return nums
[docs]
def getpivtime(f: Any, *args: str) -> np.ndarray:
"""Return acquisition times in seconds for dataset(s) or files (PIVMAT-inspired).
For PIVPy datasets, this uses ``ds['t']`` and ``ds.attrs['delta_t']``.
If the option `'0'` is provided, the first time is shifted to 0.
"""
start_at_zero = any(str(a) == "0" for a in args)
if isinstance(f, xr.Dataset):
ds = f
dt = float(ds.attrs.get("delta_t", 0.0))
t = np.asarray(ds["t"].values, dtype=float) * dt
if start_at_zero and t.size:
t = t - float(t[0])
return t
paths = _expand_pivmat_file_patterns(f)
if not paths:
raise FileNotFoundError("No file match")
times: list[float] = []
for p in paths:
ds = read_piv(p)
t = getpivtime(ds, *args)
# PIVMAT returns one time per field; use first time for per-file.
times.append(float(t[0]) if t.size else 0.0)
return np.asarray(times, dtype=float)
[docs]
def getframedt(filename: Any) -> np.ndarray:
"""Compute time interval(s) between frames for IMX/IM7 (PIVMAT-inspired).
This requires ``lvpyio`` and DaVis time series metadata. If unavailable,
returns ``array([0.0])``.
"""
path = _to_path(filename)
try:
from lvpyio import read_buffer # type: ignore
except Exception:
return np.asarray([0.0], dtype=float)
try:
buf = read_buffer(str(path))
data = buf[0]
# Best-effort: try common attribute locations.
ats = None
if hasattr(data, "attributes"):
ats = getattr(data, "attributes")
if ats and isinstance(ats, dict):
# Some exports may store AcqTimeSeries0 as list of timestamps.
for key in ("AcqTimeSeries0", "AcqTimeSeries"):
if key in ats:
ts = np.asarray(ats[key], dtype=float)
if ts.size <= 1:
return np.asarray([0.0], dtype=float)
return np.diff(ts) * 1e-3 # ms -> s
except Exception:
pass
return np.asarray([0.0], dtype=float)
[docs]
def getimx(A: Any, frame: int = 0):
"""PIVMAT ``getimx`` equivalent.
In PIVPy, if ``A`` is an :class:`xarray.Dataset`, this returns the 2D
coordinate meshes and the scalar/vector arrays for the selected frame.
"""
if not isinstance(A, xr.Dataset):
raise TypeError("getimx currently supports xarray.Dataset inputs")
ds = A
if "t" in ds.dims:
ds = ds.isel(t=int(frame))
x = np.asarray(ds["x"].values, dtype=float)
y = np.asarray(ds["y"].values, dtype=float)
x2d, y2d = np.meshgrid(x, y)
if "w" in ds:
return x2d, y2d, np.asarray(ds["w"].values, dtype=float)
u = np.asarray(ds["u"].values, dtype=float)
v = np.asarray(ds["v"].values, dtype=float)
chc = np.asarray(ds["chc"].values, dtype=float) if "chc" in ds else np.ones_like(u)
return x2d, y2d, u, v, chc
# -----------------------------------------------------------------------------
# PIVMAT vortex generators (synthetic fields)
# -----------------------------------------------------------------------------
[docs]
def vortex(
n: int = 128,
r0: float = 10.0,
vorticity: float = 1.0,
mode: str = "burgers",
diver: float | None = None,
) -> xr.Dataset:
"""Generate a centered vortex field (PIVMAT-compatible).
Port of PIVMAT's ``vortex.m``.
Returns an xarray.Dataset with variables ``u`` and ``v``.
"""
n = int(n)
if diver is None:
diver = float(vorticity)
mid = n / 2.0 + 1.0
omega = float(vorticity) / (2.0 * 1000.0) # m/s/mm
gamma = float(diver) / (2.0 * 1000.0) # m/s/mm
i = np.arange(1, n + 1, dtype=float)
j = np.arange(1, n + 1, dtype=float)
ii, jj = np.meshgrid(i, j, indexing="ij")
dx = ii - mid
dy = jj - mid
radius = np.sqrt(dx * dx + dy * dy)
u = np.zeros((n, n), dtype=float)
v = np.zeros((n, n), dtype=float)
mode_l = str(mode).lower()
if "rankine" in mode_l:
inside = radius <= float(r0)
# solid body inside
u[inside] = omega * dy[inside]
v[inside] = -omega * dx[inside]
# irrotational outside
outside = ~inside
r2 = np.where(outside, radius * radius, 1.0)
u[outside] = omega * float(r0) ** 2 * dy[outside] / r2[outside]
v[outside] = -omega * float(r0) ** 2 * dx[outside] / r2[outside]
elif "burgers" in mode_l:
safe_r2 = np.where(radius == 0, np.inf, radius * radius)
factor = float(r0) ** 2 / safe_r2 * (1.0 - np.exp(-((radius / float(r0)) ** 2)))
u = omega * factor * dy
v = -omega * factor * dx
if gamma != 0.0:
u = u + gamma * factor * dx
v = v + gamma * factor * dy
else:
raise ValueError("mode must contain 'burgers' or 'rankine'")
# Avoid a "false" zero (PIVMAT behavior).
u = u + np.max(np.abs(u)) * 1e-10
v = v + np.max(np.abs(v)) * 1e-10
x = np.arange(0, n, dtype=float)
y = np.arange(0, n, dtype=float)
x2d, y2d = np.meshgrid(x, y)
ds = from_arrays(x2d, y2d, u.T, v.T, mask=np.ones((n, n), dtype=float), frame=0)
ds["x"].attrs["units"] = "mm"
ds["y"].attrs["units"] = "mm"
ds["u"].attrs["units"] = "m/s"
ds["v"].attrs["units"] = "m/s"
ds.attrs["name"] = "Vortex"
ds.attrs["setname"] = "-"
ds.attrs["history"] = ["vortex"]
return ds
[docs]
def multivortex(
nfield: int = 1,
nsize: int = 128,
numvortex: float = 8,
*opts: str,
) -> xr.Dataset:
"""Generate random Burgers vortices (PIVMAT-compatible).
Port of PIVMAT's ``multivortex.m``.
Returns
-------
xarray.Dataset
Dataset with dims ``(y, x, t)`` where ``t`` indexes fields.
"""
nfield = int(nfield)
nsize = int(nsize)
numvortex_i = int(np.ceil(float(numvortex) * 9.0))
opt_l = {str(o).lower() for o in opts}
rng = np.random.default_rng(0)
fields: list[xr.Dataset] = []
for k in range(nfield):
vx = np.zeros((nsize, nsize), dtype=float)
vy = np.zeros((nsize, nsize), dtype=float)
icenter = 1.0 + nsize * (3.0 * rng.random(numvortex_i) - 1.0)
jcenter = 1.0 + nsize * (3.0 * rng.random(numvortex_i) - 1.0)
omega = np.sign(rng.random(numvortex_i) - 0.5) * (2.0 + rng.standard_normal(numvortex_i))
if "asym" in opt_l:
omega = np.abs(omega)
if "2d" in opt_l:
div = np.zeros(numvortex_i, dtype=float)
else:
div = rng.standard_normal(numvortex_i) / 2.0
core = 0.015 * (4.0 + rng.standard_normal(numvortex_i)) * nsize
i = np.arange(1, nsize + 1, dtype=float)
j = np.arange(1, nsize + 1, dtype=float)
ii, jj = np.meshgrid(i, j, indexing="ij")
for num in range(numvortex_i):
dx = ii - icenter[num]
dy = jj - jcenter[num]
radius = np.sqrt(dx * dx + dy * dy)
safe_r2 = np.where(radius == 0, np.inf, radius * radius)
ampl = (core[num] ** 2) / safe_r2 * (1.0 - np.exp(-((radius / core[num]) ** 2))) / 1000.0
vx = vx + ampl * (-omega[num] * dy + div[num] * dx)
vy = vy + ampl * (omega[num] * dx + div[num] * dy)
x = np.arange(0, nsize, dtype=float)
y = np.arange(0, nsize, dtype=float)
x2d, y2d = np.meshgrid(x, y)
ds = from_arrays(x2d, y2d, vx.T, vy.T, mask=np.ones((nsize, nsize), dtype=float), frame=k)
ds["x"].attrs["units"] = "mm"
ds["y"].attrs["units"] = "mm"
ds["u"].attrs["units"] = "m/s"
ds["v"].attrs["units"] = "m/s"
ds.attrs["name"] = "Multivortex"
ds.attrs["setname"] = "-"
ds.attrs["history"] = ["multivortex"]
fields.append(ds)
return xr.concat(fields, dim="t")
[docs]
def randvec(
n: int = 256,
nf: int = 1,
slope: float = 5.0 / 3.0,
nc: float = 3.0,
nl: float | None = None,
*,
seed: int = 0,
) -> xr.Dataset:
r"""Generate a synthetic 2D divergence-free random vector field (PIVMAT-compatible).
Port of PIVMAT's ``randvec.m``.
The field is constructed in Fourier space with random phase, a prescribed
power-law slope, and an incompressibility constraint (2D divergence-free).
Parameters
----------
n:
Grid size (produces an ``n x n`` field). PIVMAT assumes even ``n``.
nf:
Number of independent fields (time frames).
slope:
Spectral slope $k^{-\mathrm{slope}}$ (default $5/3$).
nc:
Small-scale cut-off (in units of the vector mesh). For $k > nc$ the
spectrum decays Gaussianly.
nl:
Large-scale cut-off (in units of the vector mesh). For $k < nl$ the
spectrum behaves as $k^2$. Defaults to ``n/3``.
seed:
RNG seed used for deterministic synthesis.
Returns
-------
xarray.Dataset
Dataset with variables ``u``, ``v`` and ``chc`` and dims ``(y, x, t)``.
"""
n = int(n)
nf = int(nf)
slope = float(slope)
nc = float(nc)
if nl is None:
nl = float(n) / 3.0
nl = float(nl)
if n <= 0 or nf <= 0:
raise ValueError("n and nf must be positive")
if n % 2 != 0:
raise ValueError("randvec currently requires even n (PIVMAT convention)")
if nc <= 0.0 or nl <= 0.0:
raise ValueError("nc and nl must be positive")
k0_idx = n // 2 # 0-based index of the zero mode (PIVMAT: k0 = n/2 + 1)
rng = np.random.default_rng(int(seed))
fields: list[xr.Dataset] = []
x = np.arange(1, n + 1, dtype=float)
y = np.arange(1, n + 1, dtype=float)
x2d, y2d = np.meshgrid(x, y)
small_scale = float(n) / nc
large_scale = float(n) / nl
# Frequencies in numpy's unshifted FFT ordering.
kx = (np.fft.fftfreq(n) * n).astype(float)
ky = (np.fft.fftfreq(n) * n).astype(float)
for frame in range(nf):
tux = np.zeros((n, n), dtype=np.complex128)
tuy = np.zeros((n, n), dtype=np.complex128)
for iy in range(n):
kyv = float(ky[iy])
for ix in range(n):
kxv = float(kx[ix])
ip = (-iy) % n
jp = (-ix) % n
# Only fill one representative per conjugate pair.
if (iy > ip) or (iy == ip and ix > jp):
continue
k = float(np.hypot(kxv, kyv))
if k == 0.0:
tux[iy, ix] = 0.0
tuy[iy, ix] = 0.0
continue
# PIVMAT energy prescription (see randvec.m).
e = (
np.exp(-((k / small_scale) ** 2))
* (k**2)
/ np.sqrt(1.0 + (k / large_scale) ** (2.0 * slope + 4.0))
) / k
amp = float(np.sqrt(float(e)))
# Self-conjugate modes must be real to keep real-valued fields.
if iy == ip and ix == jp:
phase = -1.0 if int(np.round(2.0 * rng.random())) else 1.0
else:
phase = np.exp(1j * (rng.random() * 2.0 * np.pi))
costheta = kxv / k
sintheta = kyv / k
tux[iy, ix] = -amp * sintheta * phase
tuy[iy, ix] = amp * costheta * phase
if not (iy == ip and ix == jp):
tux[ip, jp] = np.conj(tux[iy, ix])
tuy[ip, jp] = np.conj(tuy[iy, ix])
vx = np.fft.ifft2(tux).real * (n**2)
vy = np.fft.ifft2(tuy).real * (n**2)
ds = from_arrays(x2d, y2d, vx, vy, mask=np.ones((n, n), dtype=float), frame=frame)
ds["x"].attrs["units"] = "au"
ds["y"].attrs["units"] = "au"
ds["u"].attrs["units"] = "au"
ds["v"].attrs["units"] = "au"
ds.attrs["ysign"] = "Y axis downward"
ds.attrs["name"] = "randvec"
ds.attrs["setname"] = ""
ds.attrs["history"] = [f"randvec({n},{nf},{slope},{nc},{nl})"]
fields.append(ds)
return xr.concat(fields, dim="t")