Source code for pyfvcom2.interpolation

"""Interpolation functions"""

import numpy as np
from abc import ABC, abstractmethod
from scipy import interpolate
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator
from matplotlib.tri import Triangulation, LinearTriInterpolator
from typing import Optional

from .cmems_reader import CMEMSReader, default_fvcom_to_cmems_var_names
from .coordinates import sigma_to_z_coords, pol2cart, cart2pol
from .exceptions import PyFVCOM2ValueError
from .fvcom_reader import FVCOMReader
from .interpolation_coordinates import InterpolationCoordinates
from .tide_reader import HarmonicsData

__all__ = ["InterpolationCoordinates", "Interpolator", "CMEMSInterpolator", "FVCOMInterpolator", "TPXOInterpolator"]


[docs] class Interpolator(ABC): """Abstract base class for interpolation operations.""" def __init__(self): pass
[docs] @abstractmethod def interpolate(self, coordinates: InterpolationCoordinates, fvcom_var_name: str) -> np.ndarray: """Perform interpolation operation. This method must be implemented by subclasses to define the specific interpolation behavior. Args: coordinates (InterpolationCoordinates): Coordinates on the FVCOM grid. fvcom_var_name (str): Name of the FVCOM variable to interpolate. """ pass
[docs] class CMEMSInterpolator(Interpolator): """ CMEMS interpolator class Args: cmems_reader (CMEMSReader): An instance of CMEMSReader with loaded data. fvcom_name_map (dict): A mapping of variable names between FVCOM and CMEMS. The keys are FVCOM variable names and the values are CMEMS variable names. """ def __init__(self, cmems_reader: CMEMSReader, fvcom_to_cmems_var_names: Optional[dict] = None): super().__init__() self.cmems_reader = cmems_reader if fvcom_to_cmems_var_names is None: self.fvcom_to_cmems_var_names = default_fvcom_to_cmems_var_names else: self.fvcom_to_cmems_var_names = fvcom_to_cmems_var_names
[docs] def interpolate(self, coordinates: InterpolationCoordinates, fvcom_var_name: str) -> np.ndarray: """Perform interpolation operation for CMEMS data. Args: coordinates (InterpolationCoordinates): Space and time coordinates for the FVCOM grid; i.e., these are the times and locations where we want interpolated data. fvcom_var_name (str): Name of the FVCOM variable that we want interpolated data for. This will be matched to the corresponding CMEMS variable name using the provided mapping. Returns: np.ndarray: Interpolated variable on the FVCOM grid. For 2D this will be (time, points), and for 3D this will be (time, depth, points), where points may be either nodes or elements depending on the variable. """ cmems_var_name = self.fvcom_to_cmems_var_names.get(fvcom_var_name) if cmems_var_name is None: raise PyFVCOM2ValueError( f"No CMEMS variable mapping found for FVCOM variable '{fvcom_var_name}'. " f"Available mappings: {self.fvcom_to_cmems_var_names}" ) print(f"Interpolating CMEMS {cmems_var_name} to FVCOM grid.") # Calculate the number of dimensions of the CMEMS variable var_ndims = self.cmems_reader.get_var_ndims(cmems_var_name) # If a 2D spatial variable (time, lat, lon) if var_ndims == 3: return self._interpolate_2d(coordinates, cmems_var_name) # If a 3D spatial variable (time, depth, lat, lon) elif var_ndims == 4: return self._interpolate_3d(coordinates, cmems_var_name)
def _interpolate_2d(self, coordinates: InterpolationCoordinates, cmems_var_name: str) -> np.ndarray: """Interpolate a 2D CMEMS variable onto the FVCOM grid. Args: coordinates (InterpolationCoordinates): Space and time coordinates for the FVCOM grid. cmems_var_name (str): Name of the CMEMS variable to interpolate. Returns: np.ndarray: Interpolated variable on the FVCOM grid. """ # Determine time indices from the coordinates, which provides either a single # date/time as a datetime object or a list of datetime objects try: # Try to get length - if it fails, it's a single datetime object len(coordinates.dates) dates = coordinates.dates except TypeError: # Single datetime object, wrap in list dates = [coordinates.dates] # Determine the number of dates and points n_dates = len(dates) n_points = len(coordinates.x1) # Initialise array to hold interpolated data interpolated_data = np.empty((n_dates, n_points), dtype=np.float32) # Loop over each time index to perform interpolation for d_idx, target_date in enumerate(dates): print(f"Interpolating CMEMS {cmems_var_name} to FVCOM grid for date: {target_date}.") # Get the filled 2D CMEMS variable data on the regular grid, with # linear interpolation in time between bracketing CMEMS time steps. t0, t1, alpha = self.cmems_reader._get_bracketing_times(target_date) var_filled_0 = self.cmems_reader.get_filled_2D_var(cmems_var_name, t0) if alpha == 0.0: var_filled = var_filled_0 else: var_filled_1 = self.cmems_reader.get_filled_2D_var(cmems_var_name, t1) var_filled = (1.0 - alpha) * var_filled_0 + alpha * var_filled_1 # Interpolate from the regular grid onto the FVCOM points interp = interpolate.RegularGridInterpolator( (self.cmems_reader.lons, self.cmems_reader.lats), var_filled.T ) interpolated_data[d_idx, :] = interp((coordinates.x1, coordinates.x2)) return interpolated_data def _interpolate_3d(self, coordinates: InterpolationCoordinates, cmems_var_name: str) -> np.ndarray: """Interpolate a 3D CMEMS variable onto the FVCOM grid. Args: coordinates (InterpolationCoordinates): Space and time coordinates for the FVCOM grid. cmems_var_name (str): Name of the CMEMS variable to interpolate. Returns: np.ndarray: Interpolated variable on the FVCOM grid. """ # Determine time indices from the coordinates, which provides either a single # date/time as a datetime object or a list of datetime objects try: # Try to get length - if it fails, it's a single datetime object len(coordinates.dates) dates = coordinates.dates except TypeError: # Single datetime object, wrap in list dates = [coordinates.dates] # Determine the number of dates and points n_dates = len(dates) n_depths = coordinates.x3.shape[0] n_points = coordinates.x1.shape[0] # Loop over each time index to perform interpolation interpolated_data = np.empty((n_dates, n_depths, n_points), dtype=np.float32) for d_idx, target_date in enumerate(dates): print(f"Interpolating CMEMS {cmems_var_name} to FVCOM grid for date: {target_date}.") # Get the filled 3D CMEMS variable data, with linear interpolation # in time between bracketing CMEMS time steps. t0, t1, alpha = self.cmems_reader._get_bracketing_times(target_date) var_filled_0 = self.cmems_reader.get_filled_3D_var(cmems_var_name, t0) if alpha == 0.0: var_filled = var_filled_0 else: var_filled_1 = self.cmems_reader.get_filled_3D_var(cmems_var_name, t1) var_filled = (1.0 - alpha) * var_filled_0 + alpha * var_filled_1 # First, interpolate onto the horizontal grid for each depth level var_on_fvcom_horizontal_grid = np.empty( (self.cmems_reader.n_depths, n_points), dtype=var_filled.dtype ) for depth_index in range(self.cmems_reader.n_depths): layer_data = var_filled[depth_index, :, :] interp = interpolate.RegularGridInterpolator( (self.cmems_reader.lons, self.cmems_reader.lats), layer_data.T ) var_on_fvcom_horizontal_grid[depth_index, :] = interp((coordinates.x1, coordinates.x2)) # Next, interpolate onto the FVCOM vertical sigma layers for each horizontal point var_on_fvcom_grid = np.empty((n_depths, n_points), dtype=var_filled.dtype) for i in range(n_points): var_profile = var_on_fvcom_horizontal_grid[:, i] target_depths = coordinates.x3[:, i] interp = interpolate.interp1d( self.cmems_reader.depth_levels, var_profile, kind="linear", bounds_error=False, fill_value="extrapolate", ) var_on_fvcom_grid[:, i] = interp(target_depths) interpolated_data[d_idx, :, :] = var_on_fvcom_grid return interpolated_data
[docs] class FVCOMInterpolator(Interpolator): """ FVCOM interpolator class There are several methods that could be used for interpolating data: 1. Nearest-neighbor interpolation 2. Linear interpolation from a delaunay triangulation 3. Radial basis function interpolation These different methods have different trade-offs in terms of speed and accuracy, with RBFs being the most accurate but also the slowest. For now, we only support linear interpolation using scipy's LinearTriInterpolator. However, this could be extended in future to support other methods. Further notes: - In nested FVCOM grids, the elements are identical in the overlap zone, meaning there is no need to interpolate from one grid to another. Instead, one can simply extract the data from the parent grid at the locations of the child grid nodes/elements. - For node-based variables (e.g., zeta, h), interpolation is performed using the grid nodes. We use the input grid's connectivity (triangles) to build a triangulation for the nodes (i.e., the nv array). - For element-based variables (e.g., u, v), we must construct a new triangulation based on the element centroids (xc, yc). - We should allow for extrapolation of values outside the convex hull of the triangulation, which may be necessary when moving from a coarse grid to a finer grid. This is done by testing for NaN values in the interpolated output and filling them using nearest-neighbor interpolation. A warning is issued when this occurs. - Once data has been interpolated onto all horizontal surfaces of the input grid, we then interpolate vertically onto the supplied depth levels. - Options to parallelise the interpolation step are included. Args: fvcom_reader (FVCOMReader): An instance of FVCOMReader with loaded data. """ def __init__(self, fvcom_reader: FVCOMReader): super().__init__() self.fvcom_reader = fvcom_reader # These are created through lazy initialisation. Note for # element based data, form a new triangulation based on # element centroids. self._triangulation_nodes_cartesian = None self._triangulation_nodes_geographic = None self._triangulation_elements_cartesian = None self._triangulation_elements_geographic = None @property def triangulation_nodes_cartesian(self): """Triangulation for FVCOM grid nodes in cartesian coordinates.""" if self._triangulation_nodes_cartesian is not None: return self._triangulation_nodes_cartesian self._triangulation_nodes_cartesian = Triangulation( self.fvcom_reader.x_nodes, self.fvcom_reader.y_nodes, triangles=self.fvcom_reader.grid.triangles) return self._triangulation_nodes_cartesian @property def triangulation_nodes_geographic(self): """Triangulation for FVCOM grid nodes in geographic coordinates.""" if self._triangulation_nodes_geographic is not None: return self._triangulation_nodes_geographic self._triangulation_nodes_geographic = Triangulation( self.fvcom_reader.lon_nodes, self.fvcom_reader.lat_nodes, triangles=self.fvcom_reader.grid.triangles) return self._triangulation_nodes_geographic @property def triangulation_elements_cartesian(self): """Triangulation for FVCOM grid elements in cartesian coordinates.""" if self._triangulation_elements_cartesian is not None: return self._triangulation_elements_cartesian self._triangulation_elements_cartesian = Triangulation( self.fvcom_reader.x_elements, self.fvcom_reader.y_elements) return self._triangulation_elements_cartesian @property def triangulation_elements_geographic(self): """Triangulation for FVCOM grid elements in geographic coordinates.""" if self._triangulation_elements_geographic is not None: return self._triangulation_elements_geographic self._triangulation_elements_geographic = Triangulation( self.fvcom_reader.lon_elements, self.fvcom_reader.lat_elements) return self._triangulation_elements_geographic
[docs] def interpolate(self, coordinates: InterpolationCoordinates, fvcom_var_name: str, extrapolate_horizontally: Optional[bool] = False, extrapolate_down: Optional[bool] = False, extrapolate_up: Optional[bool] = True, apply_land_sea_mask: Optional[bool] = True, land_sea_mask: Optional[np.ndarray] = None) -> np.ndarray: """Perform interpolation operation for FVCOM data. The land_sea_mask is only applied when extrapolate is False. If not provided and extrapolate is False, it is automatically generated from the supplied coordinates object through a call to `self.generate_land_sea_mask`. The argument is included to save on computation time in cases where in `interpolate` is called multiple times with the same coordinates, e.g., when interpolating multiple variables onto the same grid. Args: coordinates (InterpolationCoordinates): Coordinates on the FVCOM grid. fvcom_var_name (str): Name of the FVCOM variable to interpolate. extrapolate_horizontally (Optional[bool]): Whether to allow extrapolation outside the horizontal grid. extrapolate_down (Optional[bool]): Whether to allow extrapolation below the bottom grid. extrapolate_up (Optional[bool]): Whether to allow extrapolation above the surface grid. apply_land_sea_mask (Optional[bool]): Whether to apply the land-sea mask during interpolation. land_sea_mask (Optional[np.ndarray]): Optional land-sea mask to apply during interpolation. Returns: np.ndarray: Interpolated variable on the FVCOM grid. """ # Get variable dimensions and their names var_dims = self.fvcom_reader.get_var_dimensions(fvcom_var_name) # Check if time dimension is present has_time = 'time' in var_dims # Set has_depth flag has_depth = self.fvcom_reader.get_vertical_position(fvcom_var_name) is not None # If not extrapolating, generate land-sea mask is not provided if apply_land_sea_mask and land_sea_mask is None: land_sea_mask = ~self.generate_land_sea_mask(coordinates) # Route to appropriate interpolation method based on dimensions if not has_time and not has_depth: # Case 1: 2D time independent (e.g., bathymetry: [node] or [nele]) var = self._interpolate_2d_static(coordinates, fvcom_var_name, extrapolate_horizontally=extrapolate_horizontally) if apply_land_sea_mask: # Apply land-sea mask to interpolated variable var[land_sea_mask] = np.nan return var elif has_time and not has_depth: # Case 2: 2D time dependent (e.g., zeta: [time, node]) var = self._interpolate_2d_time_dependent(coordinates, fvcom_var_name, extrapolate_horizontally=extrapolate_horizontally) if apply_land_sea_mask: # Apply land-sea mask to interpolated variable var[:, land_sea_mask] = np.nan return var elif has_time and has_depth: # Case 3: 3D time dependent (e.g., temp: [time, siglay, node]) var = self._interpolate_3d_time_dependent(coordinates, fvcom_var_name, extrapolate_horizontally=extrapolate_horizontally, extrapolate_down=extrapolate_down, extrapolate_up=extrapolate_up) if apply_land_sea_mask: # Apply land-sea mask to interpolated variable var[:, :, land_sea_mask] = np.nan return var else: raise PyFVCOM2ValueError( f"Unsupported variable dimensions for {fvcom_var_name}: {var_dims}" )
def _interpolate_2d_static(self, coordinates: InterpolationCoordinates, fvcom_var_name: str, extrapolate_horizontally: Optional[bool] = False) -> np.ndarray: """Interpolate a 2D time-independent FVCOM variable. Args: coordinates (InterpolationCoordinates): Target coordinates. fvcom_var_name (str): Name of the FVCOM variable. extrapolate_horizontally (Optional[bool]): Whether to allow extrapolation outside the grid. Returns: np.ndarray: Interpolated variable data with shape (n_points,). """ n_points = len(coordinates.x1) # Initialise array to hold interpolated data interpolated_data = np.empty((n_points), dtype=np.float32) # FVCOM data for interpolation data = self.fvcom_reader.get_var(fvcom_var_name) # Create interpolator var_is_node_based = self.fvcom_reader.var_is_node_based(fvcom_var_name) interpolator = self._get_linear_interpolator_for_variable(var_is_node_based, coordinates.horizontal_coordinate_system, data) # Interpolate interpolated_data[:] = interpolator(coordinates.x1, coordinates.x2) # Fill NaNs indicating out-of-bounds points using nearest-neighbor interpolation if extrapolate_horizontally: target_points = np.column_stack((coordinates.x1, coordinates.x2)) self._fill_nans_nearest_neighbor(interpolated_data, target_points, var_is_node_based, data) return interpolated_data def _interpolate_2d_time_dependent(self, coordinates: InterpolationCoordinates, fvcom_var_name: str, extrapolate_horizontally: Optional[bool] = False) -> np.ndarray: """Interpolate a 2D time-dependent FVCOM variable. Args: coordinates (InterpolationCoordinates): Target coordinates. fvcom_var_name (str): Name of the FVCOM variable. extrapolate_horizontally (Optional[bool]): Whether to allow extrapolation outside the grid. Returns: np.ndarray: Interpolated variable data with shape (n_times, n_points). """ try: len(coordinates.dates) dates = coordinates.dates except TypeError: # Single datetime object, wrap in list dates = [coordinates.dates] # Determine the number of dates and points n_dates = len(dates) n_points = len(coordinates.x1) # Initialise array to hold interpolated data interpolated_data = np.empty((n_dates, n_points), dtype=np.float32) # Is the variable node or element based? var_is_node_based = self.fvcom_reader.var_is_node_based(fvcom_var_name) # Loop over each time index to perform interpolation for d_idx, target_date in enumerate(dates): print(f"Interpolating FVCOM {fvcom_var_name} to FVCOM grid for date: {target_date}.") # FVCOM data for this time step data = self.fvcom_reader.get_var(fvcom_var_name, target_date) # Create interpolator interpolator = self._get_linear_interpolator_for_variable(var_is_node_based, coordinates.horizontal_coordinate_system, data) interpolated_data[d_idx, :] = interpolator(coordinates.x1, coordinates.x2) # Fill NaNs indicating out-of-bounds points using nearest-neighbor interpolation if extrapolate_horizontally: target_points = np.column_stack((coordinates.x1, coordinates.x2)) self._fill_nans_nearest_neighbor(interpolated_data[d_idx, :], target_points, var_is_node_based, data, context_msg=f"at time {target_date}") return interpolated_data def _interpolate_3d_time_dependent(self, coordinates: InterpolationCoordinates, fvcom_var_name: str, extrapolate_horizontally: Optional[bool] = False, extrapolate_down: Optional[bool] = False, extrapolate_up: Optional[bool] = True) -> np.ndarray: """Interpolate a 3D time-dependent FVCOM variable. Args: coordinates (InterpolationCoordinates): Target coordinates. fvcom_var_name (str): Name of the FVCOM variable. extrapolate_horizontally (Optional[bool]): Whether to allow horizontal extrapolation outside the grid. extrapolate_down (Optional[bool]): Whether to allow extrapolation below the grid. extrapolate_up (Optional[bool]): Whether to allow extrapolation above the grid. NB Useful if highest depth point is a layer center and thus does not reach all the way to the surface. Defaults to True for this reason. Returns: np.ndarray: Interpolated variable data with shape (n_times, n_depths, n_points). """ try: len(coordinates.dates) dates = coordinates.dates except TypeError: dates = [coordinates.dates] # Determine the number of dates, depths and points we will interpolate to n_dates = len(dates) n_depths = coordinates.x3.shape[0] n_points = coordinates.x1.shape[0] # The number of depth levels in the FVCOM data (variable dependent, as can be # defined at layer centers or layer interfaces) n_depths_fvcom = self.fvcom_reader.get_n_depth_levels(fvcom_var_name) # Is the variable node or element based? var_is_node_based = self.fvcom_reader.var_is_node_based(fvcom_var_name) # Target points for interpolation target_points = np.column_stack((coordinates.x1, coordinates.x2)) # Initialise array to hold interpolated data interpolated_var = np.empty((n_dates, n_depths, n_points), dtype=np.float32) # Loop over each time index to perform interpolation for d_idx, target_date in enumerate(dates): print(f"Interpolating FVCOM {fvcom_var_name} for date: {target_date}.") # FVCOM variable for this time point fvcom_var = self.fvcom_reader.get_var(fvcom_var_name, target_date) # Read the correct FVCOM depth coordinates if coordinates.vertical_coordinate_system == 'z': # Interpolation to be done in z coordinates. Depths are spatially # variable in FVCOM, so we need to interpolate these to the target # horizontal grid before performing vertical interpolation. fvcom_depths = self.fvcom_reader.get_time_dep_z_levels( fvcom_var_name, target_datetime=target_date, relative_to_free_surface=True ) else: # sigma coordinates # Interpolation to be done in sigma coordinates. Typically, sigma coordinates # do not vary horizontally, but we allow for it here. They do not vary in time. fvcom_depths = self.fvcom_reader.get_sigma_levels(fvcom_var_name) # Array holding variable data interpolated onto the new horizontal grid at each # FVCOM depth level var_on_target_horizontal_grid = np.empty( (n_depths_fvcom, n_points), dtype=fvcom_var.dtype ) # Array holding depth data interpolated onto the new horizontal grid at each # FVCOM depth level (only used for vertical interpolation) depth_on_target_horizontal_grid = np.empty( (n_depths_fvcom, n_points), dtype=fvcom_depths.dtype ) # Loop over all FVCOM depth levels for i in range(n_depths_fvcom): # Horizontal interpolation of depth levels for this time step and depth level. depth_interp = self._get_linear_interpolator_for_variable(var_is_node_based, coordinates.horizontal_coordinate_system, fvcom_depths[i,:]) depth_on_target_horizontal_grid[i, :] = depth_interp(coordinates.x1, coordinates.x2) if extrapolate_horizontally: self._fill_nans_nearest_neighbor(depth_on_target_horizontal_grid[i, :], target_points, var_is_node_based, fvcom_depths[i, :]) # Horizontal interpolation of variable for this time step and depth level. var_interp = self._get_linear_interpolator_for_variable(var_is_node_based, coordinates.horizontal_coordinate_system, fvcom_var[i, :]) var_on_target_horizontal_grid[i, :] = var_interp(coordinates.x1, coordinates.x2) if extrapolate_horizontally: self._fill_nans_nearest_neighbor(var_on_target_horizontal_grid[i, :], target_points, var_is_node_based, fvcom_var[i, :], context_msg=f"at depth index {i} and time {target_date}") # Next, interpolate onto depth levels of each vertical point var_on_target_grid = np.empty((n_depths, n_points), dtype=fvcom_var.dtype) for i in range(n_points): # Set fill_value per point based on extrapolate_up and extrapolate_down. fill_value = (np.nan if not extrapolate_down else var_on_target_horizontal_grid[0, i], np.nan if not extrapolate_up else var_on_target_horizontal_grid[-1, i]) interp = interpolate.interp1d( depth_on_target_horizontal_grid[:, i], var_on_target_horizontal_grid[:, i], kind="linear", bounds_error=False, fill_value=fill_value ) target_depths = coordinates.x3[:, i] var_on_target_grid[:, i] = interp(target_depths) interpolated_var[d_idx, :, :] = var_on_target_grid return interpolated_var def _get_linear_interpolator_for_variable(self, var_is_node_based: bool, horizontal_coordinate_system: str, data: np.ndarray) -> LinearTriInterpolator: """Get the appropriate linear interpolator based on whether the variable is node or element based. Args: var_is_node_based (bool): True if the variable is node based, False if element based. horizontal_coordinate_system (str): The coordinate system of the data ('geographic' or 'cartesian'). data (np.ndarray): Data array for the variable. Returns: LinearTriInterpolator: The interpolator object. """ # Determine if variable is node or element based if var_is_node_based: # Node based variable if horizontal_coordinate_system == "geographic": triangulation = self.triangulation_nodes_geographic else: triangulation = self.triangulation_nodes_cartesian else: # Element based variable if horizontal_coordinate_system == "geographic": triangulation = self.triangulation_elements_geographic else: triangulation = self.triangulation_elements_cartesian # Create interpolator interpolator = LinearTriInterpolator( triangulation, data ) return interpolator def _fill_nans_nearest_neighbor(self, interpolated_data: np.ndarray, target_points: np.ndarray, var_is_node_based: bool, source_data: np.ndarray, warn: bool = False, context_msg: str = "") -> None: """Check for NaN values in interpolated data and fill using nearest-neighbor interpolation. Args: interpolated_data: 1D array of interpolated values (modified in-place). target_points: 2D array of target point coordinates, shape (n_points, 2). var_is_node_based: Whether the variable is node-based. source_data: Source data array for building the nearest-neighbor interpolator. warn: Whether to print a warning message when NaN values are found. context_msg: Additional context appended to the warning (e.g. "at time 2024-01-01"). """ nan_mask = np.isnan(interpolated_data) if not np.any(nan_mask): return nan_indices = np.where(nan_mask)[0] if warn: context = f" {context_msg}" if context_msg else "" print( f"Warning: Out-of-bounds interpolation detected for {len(nan_indices)} points{context}.\n" f"Points outside FVCOM grid coverage: " f"{[(coord[0], coord[1]) for coord in target_points[nan_indices][:1]]}" ) if len(nan_indices) > 1: print(f" ... and {len(nan_indices) - 1} more points") print("These will be filled using nearest-neighbor interpolation.") nn_interpolator = self._get_nn_interpolator_for_variable(var_is_node_based, source_data) interpolated_data[nan_indices] = nn_interpolator(target_points[nan_indices]) def _get_nn_interpolator_for_variable(self, var_is_node_based: bool, data: np.ndarray) -> NearestNDInterpolator: """Get the appropriate nearest-neighbor interpolator Args: var_is_node_based (bool): True if the variable is node based, False if element based. data (np.ndarray): Data array for the variable. Returns: NearestNDInterpolator: The nearest-neighbor interpolator object. """ # Determine if variable is node or element based if var_is_node_based: # Node based variable x = self.fvcom_reader.x_nodes y = self.fvcom_reader.y_nodes else: # Element based variable x = self.fvcom_reader.x_elements y = self.fvcom_reader.y_elements # Create nearest-neighbor interpolator interpolator = interpolate.NearestNDInterpolator( np.column_stack((x, y)), data ) return interpolator
[docs] def generate_land_sea_mask(self, coordinates: InterpolationCoordinates) -> np.ndarray: """Generate a land-sea mask for the given interpolation coordinates The mask indicates which points are within the convex hull of the FVCOM grid and which are not, and thus which are sea points and which are land points. Args: coordinates (InterpolationCoordinates): The interpolation coordinates for which to generate the mask. Returns: np.ndarray: A boolean array of shape (n_points,) where True indicates a sea point and False indicates a land point. """ # Use the triangulation of the grid nodes to determine which points are within the convex hull of the grid if coordinates.horizontal_coordinate_system == "geographic": triangulation = self.triangulation_nodes_geographic else: triangulation = self.triangulation_nodes_cartesian mask = triangulation.get_trifinder()(coordinates.x1, coordinates.x2) != -1 return mask
[docs] class TPXOInterpolator(Interpolator): """TPXO tidal harmonics interpolator. Interpolates TPXO tidal harmonic amplitudes and phases from the regular TPXO grid onto target positions (e.g., FVCOM open boundary nodes or elements). Phase interpolation is handled by decomposing amplitude/phase into two cartesian components before interpolation, then converting back. This avoids artifacts from the 360-to-0 degree phase wrapping discontinuity. Longitude convention mismatches between the TPXO grid (often 0-360) and the target positions (which may use -180 to 180) are detected and corrected automatically. Args: harmonics_data: Pre-loaded harmonics data from a TPXOHarmonicsReader or TPXOComplexHarmonicsReader. interp_method: Interpolation method passed to scipy's RegularGridInterpolator. Defaults to 'linear'. """ def __init__(self, harmonics_data: HarmonicsData, interp_method: str = 'linear'): super().__init__() self.harmonics_data = harmonics_data self.interp_method = interp_method
[docs] def interpolate(self, coordinates: InterpolationCoordinates, fvcom_var_name: str = None) -> HarmonicsData: """Interpolate TPXO harmonics onto target positions. Args: coordinates: Target positions for interpolation. Uses x1 (longitude) and x2 (latitude). fvcom_var_name: Not used for TPXO interpolation. Included for compatibility with the Interpolator interface. Returns: HarmonicsData with amplitudes and phases interpolated to the target positions, each shaped (n_points, n_constituents). """ target_lon = coordinates.x1 target_lat = coordinates.x2 harmonics_lon = np.array(self.harmonics_data.longitude, copy=True) harmonics_lat = np.array(self.harmonics_data.latitude, copy=True) amplitudes = np.asarray(self.harmonics_data.amplitudes) phases = np.asarray(self.harmonics_data.phases) # RegularGridInterpolator requires 1D coordinate arrays if harmonics_lon.ndim != 1 or harmonics_lat.ndim != 1: harmonics_lon = np.unique(harmonics_lon) harmonics_lat = np.unique(harmonics_lat) # Convert amplitude/phase to cartesian components to avoid # phase wrapping artifacts during interpolation. The components # are given the names "harmonics_component_1" and "harmonics_component_2". harmonics_component_1, harmonics_component_2 = pol2cart(amplitudes, phases, degrees=True) # Align longitude conventions between source and target grids harmonics_lon, harmonics_component_1, harmonics_component_2 = self._align_longitudes( target_lon, harmonics_lon, harmonics_component_1, harmonics_component_2 ) # Interpolate each constituent onto the target positions n_constituents = amplitudes.shape[0] n_points = len(target_lon) interp_component_1 = np.empty((n_constituents, n_points)) interp_component_2 = np.empty((n_constituents, n_points)) for i in range(n_constituents): comp_1_interpolator = interpolate.RegularGridInterpolator( (harmonics_lon, harmonics_lat), harmonics_component_1[i], method=self.interp_method, bounds_error=False, fill_value=None ) comp_2_interpolator = interpolate.RegularGridInterpolator( (harmonics_lon, harmonics_lat), harmonics_component_2[i], method=self.interp_method, bounds_error=False, fill_value=None ) interp_component_1[i] = comp_1_interpolator((target_lon, target_lat)) interp_component_2[i] = comp_2_interpolator((target_lon, target_lat)) # Convert back to amplitude and phase interp_amplitudes, interp_phases = cart2pol(interp_component_1, interp_component_2, degrees=True) # Transpose to (n_points, n_constituents) to match predict_tide expectations return HarmonicsData( longitude=target_lon, latitude=target_lat, amplitudes=interp_amplitudes.T, phases=interp_phases.T, constituents=self.harmonics_data.constituents, )
@staticmethod def _align_longitudes(target_lon, harmonics_lon, harmonics_component_1, harmonics_component_2): """Align harmonics longitude convention with target positions. Detects whether the harmonics and target use different longitude conventions (0-360 vs -180 to 180) and shifts the harmonics data accordingly. The longitude array is then sorted to ensure it is monotonically increasing, as required by RegularGridInterpolator. Args: target_lon: Target longitude positions. harmonics_lon: Harmonics grid longitudes (1D). harmonics_component_1: First cartesian component, shape (n_const, n_lon, n_lat). harmonics_component_2: Second cartesian component, shape (n_const, n_lon, n_lat). Returns: Tuple of (harmonics_lon, harmonics_component_1, harmonics_component_2) with aligned and sorted longitudes. """ if np.any(harmonics_lon > 180) and np.any(target_lon < 0): harmonics_lon = np.where( harmonics_lon > 180, harmonics_lon - 360, harmonics_lon ) elif np.any(harmonics_lon < 0) and np.any(target_lon > 180): harmonics_lon = np.where( harmonics_lon < 0, harmonics_lon + 360, harmonics_lon ) # Sort so longitudes are monotonically increasing sort_idx = np.argsort(harmonics_lon) harmonics_lon = harmonics_lon[sort_idx] harmonics_component_1 = harmonics_component_1[:, sort_idx, :] harmonics_component_2 = harmonics_component_2[:, sort_idx, :] return harmonics_lon, harmonics_component_1, harmonics_component_2