Source code for pyfvcom2.fvcom_reader

"""FVCOM data reader for PyFVCOM2"""

__all__ = ["FVCOMReader"]

from datetime import datetime, timedelta
import numpy as np
from netCDF4 import Dataset
from typing import Union, List, Optional
from cftime import num2pydate

from .interpolation_coordinates import InterpolationCoordinates
from .coordinates import sigma_to_z_coords
from .grid import Grid, nodes2elems
from .mesh_reader import MeshData
from .sigma import SigmaData
from .date_utils import round_time
from .exceptions import PyFVCOM2ValueError


[docs] class FVCOMReader: """A class to read FVCOM model output and restart files. This class provides methods to read FVCOM netCDF files and extract relevant data. Attributes: filepath (str): Path to the FVCOM netCDF file. dataset (xarray.Dataset): The loaded FVCOM dataset. """ # Class variable for time conversion DAYS_PER_MILLISECOND = 1.0 / (1000.0 * 60.0 * 60.0 * 24.0) def __init__(self, file_paths: Union[str, List[str]]): """Initialize the FVCOMReader with the path to the netCDF file. Args: file_paths (str, list): Path to the FVCOM netCDF file. """ # Handle single file path or list of file paths if isinstance(file_paths, str): self.file_paths = [file_paths] else: self.file_paths = file_paths # Load only the first file initially for metadata and time-independent data print(f'Accessing FVCOM metadata from: {self.file_paths[0]}') self._metadata_dataset = Dataset(self.file_paths[0]) self._grid = None # Lazy initialization self._z_level_cache = {} # Cache for time-dependent z levels # Build the time index mapping for multiple files self._build_time_index_mapping() def _build_time_index_mapping(self): """Build a mapping from datetime to (file_path, local_time_index)""" self._time_to_file_map = {} self._all_dates = [] for file_path in self.file_paths: with Dataset(file_path) as ds: times = self._read_times(ds) for local_idx, time_val in enumerate(times): self._time_to_file_map[time_val] = (file_path, local_idx) self._all_dates.append(time_val) # Sometime FVCOM dates are repeated across files; ensure uniqueness self._all_dates = list(set(self._all_dates)) # Sort dates for efficient searching self._all_dates.sort() def _read_times(self, dataset: Dataset) -> List[datetime]: """Read time variable from the dataset and convert to datetime objects. Args: dataset (Dataset): The netCDF dataset. Returns: List[datetime]: List of datetime objects corresponding to the time variable. """ time_raw = (dataset.variables['Itime'][:] + dataset.variables['Itime2'][:] * self.DAYS_PER_MILLISECOND) units = dataset.variables['Itime'].units datetime_raw = num2pydate(time_raw[:], units=units) return round_time(datetime_raw) def _load_dataset_for_datetime(self, target_datetime, tolerance=None): """Load the appropriate dataset for a given datetime Args: target_datetime: The target datetime to find data for tolerance: Maximum allowed time difference (as timedelta). If None, uses default bounds checking. """ # Convert datetime to numpy datetime64 if needed if isinstance(target_datetime, datetime): target_datetime = np.datetime64(target_datetime) # Check bounds first if len(self._all_dates) == 0: raise PyFVCOM2ValueError("No dates available in the dataset(s)") start_date = self._all_dates[0] end_date = self._all_dates[-1] # Check if target is exactly in our time mapping if target_datetime in self._time_to_file_map: required_file_path, local_time_index = self._time_to_file_map[target_datetime] else: # Check if target is within reasonable bounds if target_datetime < start_date or target_datetime > end_date: raise PyFVCOM2ValueError( f"Target datetime {target_datetime} is outside the available data range " f"[{start_date} to {end_date}]" ) # Find closest time within the valid range time_diffs = [abs(dt - target_datetime) for dt in self._all_dates] closest_idx = time_diffs.index(min(time_diffs)) closest_time = self._all_dates[closest_idx] # Optional tolerance check if tolerance is not None: min_diff = min(time_diffs) if min_diff > np.timedelta64(tolerance): raise PyFVCOM2ValueError( f"Closest available time ({closest_time}) is {min_diff} away from target " f"({target_datetime}), which exceeds tolerance ({tolerance})" ) required_file_path, local_time_index = self._time_to_file_map[closest_time] dataset = Dataset(required_file_path) return dataset, local_time_index @property def grid(self) -> Grid: """Get the Grid object (constructed lazily). Returns: Grid: The grid object containing mesh structure and open boundaries. """ if self._grid is None: mesh_data = self._extract_mesh_data() sigma_data = self._extract_sigma_data() self._grid = Grid(mesh_data, sigma_data, "geographic") return self._grid @property def n_nodes(self): """Get the number of nodes in the FVCOM grid.""" return self.grid.n_nodes @property def n_elements(self): """Get the number of elements in the FVCOM grid.""" return self.grid.n_elements @property def n_sigma_layers(self): """Get the number of sigma layers in the FVCOM grid.""" return self.grid.n_sigma_layers @property def n_sigma_levels(self): """Get the number of sigma levels in the FVCOM grid.""" return self.grid.n_sigma_levels @property def lon_nodes(self): """Get the longitude values from the dataset.""" return self.grid.lon_nodes @property def lat_nodes(self): """Get the latitude values from the dataset.""" return self.grid.lat_nodes @property def lon_elements(self): """Get the longitude values of element centroids.""" return self.grid.lon_elements @property def lat_elements(self): """Get the latitude values of element centroids.""" return self.grid.lat_elements @property def x_nodes(self): """Get the Cartesian x-coordinates at nodes.""" return self.grid.x_nodes @property def y_nodes(self): """Get the Cartesian y-coordinates at nodes.""" return self.grid.y_nodes @property def x_elements(self): """Get the Cartesian x-coordinates of element centroids.""" return self.grid.x_elements @property def y_elements(self): """Get the Cartesian y-coordinates of element centroids.""" return self.grid.y_elements @property def sigma_layers_nodes(self): """Get the sigma layer values at nodes (n_layers, n_nodes).""" return self.grid.sigma_layers_nodes @property def sigma_layers_elements(self): """Get the sigma layer values at element centroids (n_layers, n_elements).""" return self.grid.sigma_layers_elements @property def sigma_levels_nodes(self): """Get the sigma level values at nodes (n_levels, n_nodes).""" return self.grid.sigma_levels_nodes @property def sigma_levels_elements(self): """Get the sigma level values at element centroids (n_levels, n_elements).""" return self.grid.sigma_levels_elements @property def bathy_nodes(self): """Get the bathymetry values at nodes""" return self.grid.bathy_nodes @property def bathy_elements(self): """Get the bathymetry values at element centroids""" return self.grid.bathy_elements @property def dates(self) -> List[datetime]: """Get the available dates in the dataset(s). Returns: List[datetime]: List of available datetime objects. """ return self._all_dates
[docs] def get_var_dimensions(self, var_name: str) -> tuple: """Get the dimensions of a variable. Args: var_name (str): The name of the variable to check. Returns: tuple: The dimensions of the variable. """ return self._metadata_dataset.variables[var_name].dimensions
[docs] def var_is_node_based(self, var_name: str) -> bool: """Check if a variable is node-based. Args: var_name (str): The name of the variable to check. Returns: bool: True if the variable is node-based, False if element-based. """ var_dims = self._metadata_dataset.variables[var_name].dimensions if 'node' in var_dims: return True elif 'nele' in var_dims: return False else: raise PyFVCOM2ValueError( f"Variable {var_name} is neither node-based nor element-based." )
[docs] def get_vertical_position(self, var_name: str) -> str: """Get the sigma coordinate type for a variable. Args: var_name (str): The name of the variable. Returns: str: 'layer_centre' if the variable uses sigma layers, 'layer_interface' if it uses sigma levels. """ var_dims = self._metadata_dataset.variables[var_name].dimensions if 'siglay' in var_dims: return 'layer_centre' elif 'siglev' in var_dims: return 'layer_interface' else: return None
[docs] def get_var(self, var_name: str, target_datetime: Optional[datetime]=None, tolerance: Optional[timedelta]=None) -> np.ndarray: """Get the data for a given variable at a specific time. Args: var_name (str): The name of the variable to retrieve. target_datetime (datetime): The target datetime for which to retrieve the data. Returns: np.ndarray: The data array for the specified variable at the given time. """ if target_datetime is None: # Check if time is a dimension of the variable var_dims = self._metadata_dataset.variables[var_name].dimensions if 'time' not in var_dims: return self._return_grid_variable_data(var_name) else: raise PyFVCOM2ValueError( f"Variable {var_name} is time-dependent; please provide a target_datetime." ) dataset, local_time_index = self._load_dataset_for_datetime(target_datetime, tolerance=tolerance) # Make sure variable exists in the dataset if var_name not in dataset.variables: raise PyFVCOM2ValueError( f"Variable {var_name} not found in dataset for time {target_datetime}." ) # 2D spatial variable; no depth indexing needed var = dataset.variables[var_name][local_time_index, :] if np.ma.is_masked(var): print( f"Warning: {var_name} contains masked data at time {target_datetime}. " "Masked values will be filled with default fill value." ) return np.ma.getdata(var)
[docs] def get_sigma_levels(self, var_name: str) -> np.ndarray: """Get the sigma levels/layers for the variable based on whether it is node or element based. Args: var_name (str): The name of the variable. Returns: np.ndarray: Sigma levels array. """ var_is_node_based = self.var_is_node_based(var_name) vertical_position = self.get_vertical_position(var_name) if var_is_node_based: if vertical_position == 'layer_centre': return self.grid.sigma_layers_nodes else: return self.grid.sigma_levels_nodes else: if vertical_position == 'layer_centre': return self.grid.sigma_layers_elements else: return self.grid.sigma_levels_elements
[docs] def get_time_dep_z_levels(self, var_name: str, target_datetime: datetime=None, tolerance: Optional[timedelta]=None, zeta_var_name: str='zeta', relative_to_free_surface: bool=False) -> np.ndarray: """Get time-dependent z levels/layers for the variable based on whether it is node or element based. Args: var_name (str): The name of the variable. target_datetime (datetime): The target datetime for which to retrieve the z levels. tolerance (timedelta, optional): The tolerance for matching the target datetime. zeta_var_name (str, optional): The name of the zeta (ssh) variable. Defaults to 'zeta'. relative_to_free_surface (bool, optional): Whether to return z levels relative to the free surface or the zero geoid. Defaults to False (relative to zero geoid). Returns: np.ndarray: Sigma levels array. """ var_is_node_based = self.var_is_node_based(var_name) vertical_position = self.get_vertical_position(var_name) # Check cache — stores (z_geoid, zeta) tuples so both reference # frames can be served from a single cached computation. Here: # - z_geoid is the depth of z levels relative to the zero geoid # - zeta is the free surface elevation at the target time cache_key = (var_is_node_based, vertical_position, target_datetime) if cache_key in self._z_level_cache: z_geoid, zeta_cached = self._z_level_cache[cache_key] if relative_to_free_surface: return z_geoid - zeta_cached return z_geoid zeta = self.get_var(zeta_var_name, target_datetime=target_datetime, tolerance=tolerance) if var_is_node_based: full_depth = self.grid.bathy_nodes if vertical_position == 'layer_centre': z_geoid = sigma_to_z_coords(self.sigma_layers_nodes, zeta, full_depth) else: z_geoid = sigma_to_z_coords(self.sigma_levels_nodes, zeta, full_depth) is_wet = self.get_var('wet_nodes', target_datetime=target_datetime, tolerance=tolerance) else: zeta = nodes2elems(zeta, self.grid.triangles) full_depth = self.grid.bathy_elements if vertical_position == 'layer_centre': z_geoid = sigma_to_z_coords(self.sigma_layers_elements, zeta, full_depth) else: z_geoid = sigma_to_z_coords(self.sigma_levels_elements, zeta, full_depth) is_wet = self.get_var('wet_cells', target_datetime=target_datetime, tolerance=tolerance) # Mask dry cells dry_cells = np.asarray(is_wet==0, dtype=bool) z_geoid[:, dry_cells] = np.nan self._z_level_cache[cache_key] = (z_geoid, zeta) if relative_to_free_surface: return z_geoid - zeta return z_geoid
[docs] def get_n_depth_levels(self, var_name: str) -> int: """Get number of z levels/layers Args: var_name (str): The name of the variable. Returns: int: Number of sigma levels/layers. """ vertical_position = self.get_vertical_position(var_name) if vertical_position == 'layer_centre': return self.grid.n_sigma_layers else: return self.grid.n_sigma_levels
[docs] def get_interpolation_coordinates(self, horizontal_position: str, vertical_position: str, horizontal_coordinate_system: Optional[str] = "geographic", vertical_coordinate_system: Optional[str] = "z", dates: Optional[np.ndarray] = None) -> InterpolationCoordinates: """Get interpolation coordinates for a specific grid position. Wrapper for Grid.get_interpolation_coordinates. Args: horizontal_position: Whether coordinates are at mesh nodes or element centres ('node' or 'element'). vertical_position: Whether depth coordinates are at layer centres or layer interfaces ('layer_centre' or 'layer_interface'). horizontal_coordinate_system: The coordinate system ("geographic" or "cartesian") for the interpolation coordinates. vertical_coordinate_system: The vertical coordinate system ("z" or "sigma") for the interpolation coordinates. dates: Optional array of datetime objects for the interpolation coordinates. Returns: InterpolationCoordinates: The interpolation coordinates for the specified grid position. """ return self.grid.get_interpolation_coordinates(horizontal_position, vertical_position=vertical_position, horizontal_coordinate_system=horizontal_coordinate_system, vertical_coordinate_system=vertical_coordinate_system, dates=dates)
def _extract_mesh_data(self) -> MeshData: """Extract mesh data from FVCOM output file. Returns: MeshData: Mesh data object compatible with Grid construction. """ # Extract basic mesh components nodes = np.arange(1, self._metadata_dataset.dimensions['node'].size+1) # TBC zero based indexing kept here. triangles = self._metadata_dataset.variables['nv'][:].T - 1 # Convert to 0-based indexing, transpose to (n_elem, 3) x1 = self._metadata_dataset.variables['lon'][:] x2 = self._metadata_dataset.variables['lat'][:] x3 = self._return_grid_variable_data('h')[:] open_bdy_node_lists = None bdy_types = None return MeshData(triangles, nodes, x1, x2, x3, bdy_types, open_bdy_node_lists) def _extract_sigma_data(self) -> SigmaData: """Extract sigma coordinate data from FVCOM output file. Returns: SigmaData: Sigma data object compatible with Grid construction. """ # Generate "dummy" sigma configuration sigma_config = { 'sigtype': 'dummy', # Assume generalised coordinates 'sigma_power': np.nan, 'sigma_theta': np.nan, 'sigma_b': np.nan } # Extract sigma levels at nodes. Transpose from (levels, nodes) to (nodes, levels) sigma_levels = self._return_grid_variable_data("siglev").T return SigmaData(sigma_config, sigma_levels) def _return_grid_variable_data(self, var_name): """Return the data for a given variable name. Warn if the variable contains masked data for any reason. Args: var_name (str): The name of the variable to retrieve. Returns: np.ndarray: The data array for the specified variable. """ if np.ma.is_masked(self._metadata_dataset.variables[var_name][:]): print( f"Warning: {var_name} contains masked data. " "Masked values will be filled with default fill value." ) return np.ma.getdata(self._metadata_dataset.variables[var_name][:])