from __future__ import annotations
import os
import numpy as np
from datetime import datetime
from typing import Optional
from pyfvcom2.fvcom_writer import FVCOMWriter
from .version import full_version
from .interpolation_coordinates import InterpolationCoordinates
from .interpolation import Interpolator
from .weights_calculator import get_weights_calculator
from .grid import Grid, OpenBoundary
from .grid import find_connected_elements
from .coordinates import sigma_to_z_coords
from .ocean import zbar
from .exceptions import PyFVCOM2ValueError
from .tide import TideManager
__all__ = ["GridBand", "Nest", "NestManager"]
[docs]
class GridBand:
"""Grid band with adjoining nodes and elements and vertical sigma structure.
To be used when constructing an FVCOM nest.
"""
def __init__(self, nodes: np.ndarray, elements: np.ndarray,
node_weights: np.ndarray = None, element_weights: np.ndarray = None):
"""Initialize 3D GridBand.
Args:
nodes: Array of node coordinates.
elements: Array of element connectivity.
element_weights: Array of element weights.
node_weights: Array of node weights.
"""
self._nodes = nodes
self._elements = elements
self._node_weights = node_weights
self._element_weights = element_weights
@property
def nodes(self) -> np.ndarray:
"""Get node coordinates.
Returns:
Array of node coordinates.
"""
return self._nodes
@property
def elements(self) -> np.ndarray:
"""Get element connectivity.
Returns:
Array of element connectivity.
"""
return self._elements
@property
def element_weights(self) -> np.ndarray:
"""Get element weights.
Returns:
Array of element weights.
"""
return self._element_weights
@property
def node_weights(self) -> np.ndarray:
"""Get node weights.
Returns:
Array of node weights.
"""
return self._node_weights
[docs]
class Nest:
"""Representation of an FVCOM nest
A nest consists of an open boundary and one or more grid bands which extend away
from the open boundary into the interior of the domain.
Attributes:
open_boundary: OpenBoundary instance defining the nest's open boundary.
open_boundary_weights: Weights for the open boundary nodes.
grid_bands: List of GridBand instances defining the nest's grid bands.
"""
def __init__(self, open_boundary: OpenBoundary):
"""Initialize Nest with an OpenBoundary and an optional list of GridBands."""
self.open_boundary = open_boundary
self.open_boundary_weights = np.ones((open_boundary.nnodes), dtype=np.float32)
self.grid_bands = []
[docs]
def add_grid_band(self, grid_band: GridBand):
"""Add a GridBand to the nest.
Args:
grid_band: GridBand instance to add.
"""
self.grid_bands.append(grid_band)
[docs]
def get_grid_bands(self) -> list:
"""Get the list of GridBands in the nest.
Returns:
List of GridBand instances.
"""
return self.grid_bands
[docs]
class NestManager:
"""Manager for FVCOM nests
"""
def __init__(self, grid: Grid, num_grid_bands=1,
weights_calculation_method: str='linear'):
self._grid_ref = grid
# Method for calculating the weight of different grid bands
self._weights_calculator = get_weights_calculator(weights_calculation_method)
# Make nests
self.nests = []
self.make_nests(num_grid_bands)
# Initialise empty list of dates on which to generate forcing data
self._dates = []
# Initialise empty dict to hold forcing data
self._forcing_data = {}
# Initialise empty dict to hold tidal data
self._tidal_data = {}
[docs]
def set_dates(self, dates: list[datetime]) -> None:
""" Set the dates for the forcing data
Args:
dates: List of datetime objects.
"""
if not self._forcing_data:
print(f'Updating NestManager dates and purging old forcing data for the previous dates.')
self._forcing_data = {}
self._dates = dates
[docs]
def clear_nests(self) -> None:
""" Clear all nests from the manager """
self.nests = []
self.foricing_data = {}
[docs]
def make_nests(self, num_grid_bands: int) -> None:
""" Make nests for each open boundary in the grid
Args:
grid: Grid instance containing open boundaries.
num_grid_bands: Number of grid bands to create for each nest.
"""
if len(self.nests) > 0:
# Purge existing nests
print("Purging existing nests from NestManager")
self.clear_nests()
# Keep track of all nodes and elements used in the nest so far to avoid duplication
all_nodes = []
all_elements = []
# If there are no open boundaries, flag that no nests can be created
if len(self._grid_ref.open_boundaries) == 0:
print("Warning: No open boundaries found in the grid. No nests can be created.")
return
# Create new nest objects for each open boundary
for ob in self._grid_ref.open_boundaries:
nest = Nest(open_boundary=ob)
# Add new OB nodes to all nodes
new_nodes = np.setdiff1d(ob.node_indices, all_nodes).tolist()
all_nodes.extend(new_nodes)
# Set of nodes that are used to locate elements in the next grid band. When beginning
# to add grid bands, as we are here, we start with the open boundary nodes. As we successively
# add grid bands, we update this to be the nodes in the most recently added grid band.
reference_nodes = new_nodes
for i in range(num_grid_bands):
# First, find the elements that make up the grid band adjoining the current, inner
# most set of nodes in the nest. If no grid bands have been added yet, this will
# be the nodes that define the open boundary.
elements = find_connected_elements(reference_nodes, self._grid_ref.triangles)
# Only use unique elements that have not already been used in previous grid bands
unique_elements = np.setdiff1d(elements, all_elements).tolist()
# Get the nodes connected to the elements we've extracted.
nodes = np.unique(self._grid_ref.triangles[unique_elements, :])
# Remove ones we already have in the nest.
unique_nodes = np.setdiff1d(nodes, all_nodes).tolist()
# Calculate weights for the nodes and elements in this grid band
node_weights = self._weights_calculator.calculate_weights(
n=len(unique_nodes),
n_bands=num_grid_bands,
grid_band_index=i+1 # +1 as open boundary is band 0
)
element_weights = self._weights_calculator.calculate_weights(
n=len(unique_elements),
n_bands=num_grid_bands,
grid_band_index=i
)
# Create the grid band
grid_band = GridBand(unique_nodes, unique_elements, node_weights, element_weights)
nest.add_grid_band(grid_band)
# Update reference vars
reference_nodes = unique_nodes
# Keep track of all nodes and elements used in the nest so far
all_nodes.extend(unique_nodes)
all_elements.extend(unique_elements)
# Find elements that are fully enclosed by the outermost band's nodes
# but were missed by find_connected_elements. This happens when all
# three nodes of an element are new nodes introduced in the last
# iteration, so none of them served as reference nodes for that call.
# FVCOM implicitly includes these elements when building a nest from
# node IDs alone, so omitting them creates an inconsistency.
if num_grid_bands > 0 and len(nest.grid_bands) > 0:
outermost_nodes = reference_nodes
candidate_elements = np.flatnonzero(
np.all(np.isin(self._grid_ref.triangles, outermost_nodes), axis=1)
).tolist()
missing_elements = np.setdiff1d(candidate_elements, all_elements).tolist()
if missing_elements:
last_band = nest.grid_bands[-1]
last_band._elements = last_band._elements + missing_elements
if last_band._element_weights is not None:
missing_weights = self._weights_calculator.calculate_weights(
n=len(missing_elements),
n_bands=num_grid_bands,
grid_band_index=num_grid_bands - 1
)
last_band._element_weights = np.concatenate(
[last_band._element_weights, missing_weights]
)
all_elements.extend(missing_elements)
# Add nest
self.add_nest(nest)
[docs]
def add_nest(self, nest: Nest):
"""Add a Nest to the manager.
Args:
nest: Nest instance to add.
"""
self.nests.append(nest)
[docs]
def get_nests(self) -> list:
"""Get the list of Nests managed by the manager.
Returns:
List of Nest instances.
"""
return self.nests
[docs]
def get_all_nest_nodes(self) -> np.ndarray:
"""Get all unique node indices used in all nests.
Returns:
Array of unique node indices.
"""
all_nodes = []
for nest in self.nests:
# Add open boundary nodes
all_nodes.extend(nest.open_boundary.node_indices.tolist())
# Add nodes from each grid band
for band in nest.grid_bands:
all_nodes.extend(band.nodes)
# Return unique nodes as a numpy array
return np.array(all_nodes, dtype=np.int32)
[docs]
def get_all_nest_elements(self) -> np.ndarray:
"""Get all unique element indices used in all nests.
Returns:
Array of unique element indices.
"""
all_elements = []
for nest in self.nests:
# Add elements from each grid band
for band in nest.grid_bands:
all_elements.extend(band.elements)
# Return unique elements as a numpy array
return np.array(all_elements, dtype=np.int32)
[docs]
def get_all_node_weights(self) -> np.ndarray:
"""Get all node weights for all nests.
Returns:
Array of node weights.
"""
all_node_weights = []
for nest in self.nests:
# Add open boundary weights
all_node_weights.extend(nest.open_boundary_weights.tolist())
# Add weights from each grid band
for band in nest.grid_bands:
all_node_weights.extend(band.node_weights.tolist())
return np.array(all_node_weights, dtype=np.float32)
[docs]
def get_all_element_weights(self) -> np.ndarray:
"""Get all element weights for all nests.
Returns:
Array of element weights.
"""
all_element_weights = []
for nest in self.nests:
# Add weights from each grid band
for band in nest.grid_bands:
all_element_weights.extend(band.element_weights.tolist())
return np.array(all_element_weights, dtype=np.float32)
[docs]
def get_interpolation_coordinates(self, horizontal_position: str,
vertical_position: str,
horizontal_coordinate_system: Optional[str] = "geographic",
vertical_coordinate_system: Optional[str] = "z") -> InterpolationCoordinates:
"""Get interpolation coordinates for a specific grid position.
Args:
horizontal_position: Whether coordinates are at mesh nodes or element centres ('node' or 'element').
vertical_position: Whether depth coordinates are at layer centres or layer interfaces
('layer_centre' or 'layer_interface').
horizontal_coordinate_system: The coordinate system ("geographic" or "cartesian") for the interpolation coordinates.
vertical_coordinate_system: The vertical coordinate system ("z" or "sigma") for the interpolation coordinates.
Returns:
InterpolationCoordinates: The interpolation coordinates for the specified grid position.
"""
if horizontal_position not in ['node', 'element']:
raise PyFVCOM2ValueError("horizontal_position must be either 'node' or 'element'")
if vertical_position not in ['layer_centre', 'layer_interface']:
raise PyFVCOM2ValueError("vertical_position must be either 'layer_centre' or 'layer_interface'")
if horizontal_coordinate_system not in ["geographic", "cartesian"]:
raise PyFVCOM2ValueError("horizontal_coordinate_system must be either 'geographic' or 'cartesian'")
if vertical_coordinate_system not in ["z", "sigma"]:
raise PyFVCOM2ValueError("vertical_coordinate_system must be either 'z' or 'sigma'")
if horizontal_position == 'node':
indices = self.get_all_nest_nodes()
if horizontal_coordinate_system == "geographic":
x1 = self._grid_ref.lon_nodes[indices]
x2 = self._grid_ref.lat_nodes[indices]
else: # cartesian
x1 = self._grid_ref.x_nodes[indices]
x2 = self._grid_ref.y_nodes[indices]
if vertical_position == 'layer_interface':
if vertical_coordinate_system == 'z':
x3 = self._grid_ref.z_levels_static[indices, :].T
else: # 'sigma'
x3 = self._grid_ref.sigma_levels[indices, :].T
else: # 'layer_centre'
if vertical_coordinate_system == 'z':
x3 = self._grid_ref.z_layers_static[indices, :].T
else: # 'sigma'
x3 = self._grid_ref.sigma_layers[indices, :].T
else: # horizontal_position == 'element'
indices = self.get_all_nest_elements()
if horizontal_coordinate_system == "geographic":
x1 = self._grid_ref.lon_elements[indices]
x2 = self._grid_ref.lat_elements[indices]
else: # cartesian
x1 = self._grid_ref.x_elements[indices]
x2 = self._grid_ref.y_elements[indices]
if vertical_position == 'layer_interface':
if vertical_coordinate_system == 'z':
x3 = self._grid_ref.zc_levels_static[indices, :].T
else: # 'sigma'
x3 = self._grid_ref.sigmac_levels[indices, :].T
else: # 'layer_centre'
if vertical_coordinate_system == 'z':
x3 = self._grid_ref.zc_layers_static[indices, :].T
else: # 'sigma'
x3 = self._grid_ref.sigmac_layers[indices, :].T
return InterpolationCoordinates(self._dates, x3, x2, x1, horizontal_coordinate_system=horizontal_coordinate_system, vertical_coordinate_system=vertical_coordinate_system)
[docs]
def add_tidal_data(self, tide_manager: TideManager) -> None:
"""Add tidal forcing data using a TideManager.
Computes tidal predictions for zeta at nodes, and u, v at element
centres, for all nest positions at the dates set via set_dates.
The barotropic tidal velocities are also stored as ua and va.
Args:
tide_manager: TideManager with interpolators registered for
the required variables (zeta, u, v).
"""
if not self._dates:
raise PyFVCOM2ValueError(
"Dates must be set before adding tidal data. Call set_dates first."
)
datetimes = np.array(self._dates)
# Predict zeta at node positions
node_indices = self.get_all_nest_nodes()
node_lons = self._grid_ref.lon_nodes[node_indices]
node_lats = self._grid_ref.lat_nodes[node_indices]
self._tidal_data['zeta'] = tide_manager.predict(
'zeta', datetimes, node_lons, node_lats
)
# Predict u and v at element centre positions
element_indices = self.get_all_nest_elements()
element_lons = self._grid_ref.lon_elements[element_indices]
element_lats = self._grid_ref.lat_elements[element_indices]
for var in ['u', 'v']:
prediction = tide_manager.predict(
var, datetimes, element_lons, element_lats
)
self._tidal_data[var] = prediction
# Barotropic tidal velocity equals depth-averaged velocity
self._tidal_data[f'{var}a'] = prediction
def _get_adjusted_data(self, var_name: str, adjust_tides: Optional[list[str]]) -> np.ndarray:
"""Get forcing data, optionally adjusted by adding tidal predictions.
For 3D variables (time, depth, points), the 2D tidal prediction is
broadcast uniformly across all depth levels.
Args:
var_name: Variable name in _forcing_data.
adjust_tides: List of variable names to adjust, or None.
Returns:
Forcing data array, with tidal adjustment if requested.
"""
data = self._forcing_data[var_name]
if adjust_tides and var_name in adjust_tides:
if var_name not in self._tidal_data:
raise PyFVCOM2ValueError(
f"Tidal data for '{var_name}' not available. "
f"Call add_tidal_data first."
)
tidal = self._tidal_data[var_name]
if data.ndim == 3 and tidal.ndim == 2:
# Broadcast 2D barotropic tide across depth dimension
tidal = tidal[:, np.newaxis, :]
data = data + tidal
return data
[docs]
def get_tidal_data(self, variable: Optional[str] = None) -> dict | np.ndarray:
"""Get tidal prediction data.
Args:
variable: If given, return the array for that variable only.
If None, return the full dict of all tidal variables.
Returns:
Dictionary of variable name to array, or a single array when
*variable* is specified.
Raises:
PyFVCOM2ValueError: If no tidal data has been added, or the
requested variable is not available.
"""
if not self._tidal_data:
raise PyFVCOM2ValueError(
"No tidal data available. Call add_tidal_data first."
)
if variable is not None:
if variable not in self._tidal_data:
raise PyFVCOM2ValueError(
f"Tidal data for '{variable}' not available. "
f"Available variables: {list(self._tidal_data.keys())}"
)
return self._tidal_data[variable]
return dict(self._tidal_data)
[docs]
def get_forcing_data(self, variable: Optional[str] = None,
adjust_tides: bool = False) -> dict | np.ndarray:
"""Get forcing data, optionally with tidal adjustment applied.
Args:
variable: If given, return the array for that variable only.
If None, return a dict of all forcing variables.
adjust_tides: If True, add tidal predictions to the applicable
forcing variables (those for which tidal data exists).
Requires that add_tidal_data has been called first.
Returns:
Dictionary of variable name to array, or a single array when
*variable* is specified.
Raises:
PyFVCOM2ValueError: If no forcing data has been added, or the
requested variable is not available.
"""
if not self._forcing_data:
raise PyFVCOM2ValueError(
"No forcing data available. Call add_forcing_data first."
)
tide_vars = list(self._tidal_data.keys()) if adjust_tides else None
if variable is not None:
if variable not in self._forcing_data:
raise PyFVCOM2ValueError(
f"Forcing data for '{variable}' not available. "
f"Available variables: {list(self._forcing_data.keys())}"
)
return self._get_adjusted_data(variable, tide_vars)
return {
var: self._get_adjusted_data(var, tide_vars)
for var in self._forcing_data
}
[docs]
def add_forcing_data(self, interpolator: Interpolator, fvcom_var_name: str, horizontal_position: str) -> None:
""" Add forcing data for the nests
Args:
interpolator: Interpolator instance to use for interpolation.
fvcom_var_name: FVCOM name for the forcing variable.
horizontal_position: Whether coordinates are at mesh nodes or element centres ('node' or 'element').
"""
interpolation_coords = self.get_interpolation_coordinates(horizontal_position, 'layer_centre')
forcing_data = interpolator.interpolate(interpolation_coords, fvcom_var_name)
self._forcing_data[fvcom_var_name] = forcing_data
# For velocities, calculate the vertically averaged component
if fvcom_var_name in ['u', 'v']:
indices = self.get_all_nest_elements()
layer_thickness = (self._grid_ref.sigmac_levels.T[0:-1, indices] - self._grid_ref.sigmac_levels.T[1:, indices])
self._forcing_data[f'{fvcom_var_name}a'] = zbar(forcing_data, layer_thickness)
[docs]
def create_forcing_file(self, output_path: str, nest_type: int,
adjust_tides: Optional[list[str]] = None,
format='NETCDF4', **kwargs) -> None:
""" Write the nest forcing data to a NetCDF file
Args:
output_path: Path to the output NetCDF file.
nest_type: Type of model nesting.
adjust_tides: List of variable names to adjust by adding tidal
predictions (e.g. ['zeta', 'u', 'v', 'ua', 'va']). Requires
that add_tidal_data has been called first.
format: NetCDF format to use. Defaults to 'NETCDF4'.
**kwargs: Additional keyword arguments for writing the forcing file.
"""
ncfile = os.path.basename(output_path)
nodes = self.get_all_nest_nodes()
elements = self.get_all_nest_elements()
# Counters
n_dates = len(self._dates)
n_nodes = len(nodes)
n_elements = len(elements)
n_sigma_layers = self._grid_ref.n_sigma_layers
n_sigma_levels = self._grid_ref.n_sigma_levels
# Prepare the data.
hyw = np.zeros((n_dates, n_sigma_levels, n_nodes)) # Always zero, for now at least
# Set weights if bdy type is 3
if nest_type == 3:
node_weights = self.get_all_node_weights()
node_weights = np.tile(node_weights, [n_dates, 1])
element_weights = self.get_all_element_weights()
element_weights = np.tile(element_weights, [n_dates, 1])
else:
node_weights = None
element_weights = None
# Options for compression etc.
if 'ncopts' in kwargs:
ncopts = kwargs.pop('ncopts')
else:
ncopts = {}
# Define the global attributes
globals = {'type': 'FVCOM nestING TIME SERIES FILE',
'title': f'FVCOM nestING TYPE {nest_type} TIME SERIES data for open boundary',
'history': f'File created using PyFVCOM2 version {full_version}',
'filename': str(ncfile),
'Conventions': 'CF-1.0'}
# Dimensions
dims = {'nele': n_elements, 'node': n_nodes, 'time': 0,
'DateStrLen': 26, 'three': 3,
'siglay': n_sigma_layers, 'siglev': n_sigma_levels}
with FVCOMWriter(str(ncfile), dims, global_attributes=globals,
clobber=True, format=format, **kwargs) as nest_ncfile:
# Add standard times
# ------------------
nest_ncfile.write_fvcom_time(self._dates, ncopts=ncopts)
# Add space variables
# -------------------
atts = {'units': 'meters', 'long_name': 'nodal x-coordinate'}
nest_ncfile.add_variable('x', self._grid_ref.x_nodes[nodes], ['node'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'meters', 'long_name': 'nodal y-coordinate'}
nest_ncfile.add_variable('y', self._grid_ref.y_nodes[nodes], ['node'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'degrees_east', 'standard_name': 'longitude',
'long_name': 'nodal longitude'}
nest_ncfile.add_variable('lon', self._grid_ref.lon_nodes[nodes], ['node'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'degrees_north', 'standard_name': 'latitude',
'long_name': 'nodal latitude'}
nest_ncfile.add_variable('lat', self._grid_ref.lat_nodes[nodes], ['node'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'meters', 'long_name': 'zonal x-coordinate'}
nest_ncfile.add_variable('xc', self._grid_ref.x_elements[elements], ['nele'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'meters', 'long_name': 'zonal y-coordinate'}
nest_ncfile.add_variable('yc', self._grid_ref.y_elements[elements], ['nele'],
attributes=atts, ncopts=ncopts)
atts = {'units': 'degrees_east', 'standard_name': 'longitude',
'long_name': 'zonal longitude'}
nest_ncfile.add_variable('lonc', self._grid_ref.lon_elements[elements],
['nele'], attributes=atts, ncopts=ncopts)
atts = {'units': 'degrees_north', 'standard_name': 'latitude',
'long_name': 'zonal latitude'}
nest_ncfile.add_variable('latc', self._grid_ref.lat_elements[elements],
['nele'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'nodes surrounding element'}
nv = self._grid_ref.triangles[elements, :].T + 1 # FVCOM uses 1-based indexing
nest_ncfile.add_variable('nv', nv,
['three', 'nele'], format='i4', attributes=atts,
ncopts=ncopts)
atts = {'long_name': 'Sigma Layers',
'standard_name': 'ocean_sigma/general_coordinate',
'positive': 'up',
'valid_min': -1.,
'valid_max': 0.,
'formula_terms': 'sigma: siglay eta: zeta depth: h'}
nest_ncfile.add_variable('siglay', self._grid_ref.sigma_layers[nodes, :].T,
['siglay', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Sigma Levels',
'standard_name': 'ocean_sigma/general_coordinate',
'positive': 'up',
'valid_min': -1.,
'valid_max': 0.,
'formula_terms': 'sigma: siglev eta: zeta depth: h'}
nest_ncfile.add_variable('siglev', self._grid_ref.sigma_levels[nodes, :].T,
['siglev', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Sigma Layers',
'standard_name': 'ocean_sigma/general_coordinate',
'positive': 'up',
'valid_min': -1.,
'valid_max': 0.,
'formula_terms': 'sigma: siglay_center eta: '
+ 'zeta_center depth: h_center'}
nest_ncfile.add_variable('siglay_center', self._grid_ref.sigmac_layers[
elements, :].T, ['siglay', 'nele'], attributes=atts,
ncopts=ncopts)
atts = {'long_name': 'Sigma Levels',
'standard_name': 'ocean_sigma/general_coordinate',
'positive': 'up',
'valid_min': -1.,
'valid_max': 0.,
'formula_terms': 'sigma: siglev_center eta: '
+ 'zeta_center depth: h_center'}
nest_ncfile.add_variable('siglev_center', self._grid_ref.sigmac_levels[
elements, :].T, ['siglev', 'nele'], attributes=atts,
ncopts=ncopts)
atts = {'long_name': 'Bathymetry',
'standard_name': 'sea_floor_depth_below_geoid',
'units': 'm',
'positive': 'down',
'grid': 'Bathymetry_mesh',
'coordinates': 'x y',
'type': 'data'}
nest_ncfile.add_variable('h', self._grid_ref.bathy_nodes[nodes], ['node'],
attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Bathymetry',
'standard_name': 'sea_floor_depth_below_geoid',
'units': 'm',
'positive': 'down',
'grid': 'grid1 grid3',
'coordinates': 'latc lonc',
'grid_location': 'center'}
nest_ncfile.add_variable('h_center', self._grid_ref.bathy_elements[elements],
['nele'], attributes=atts, ncopts=ncopts)
if nest_type == 3:
atts = {'long_name': 'Weights for nodes in relaxation zone',
'units': 'no units',
'grid': 'fvcom_grid',
'type': 'data'}
nest_ncfile.add_variable('weight_node', node_weights,
['time', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Weights for elements in relaxation zone',
'units': 'no units',
'grid': 'fvcom_grid',
'type': 'data'}
nest_ncfile.add_variable('weight_cell', element_weights,
['time', 'nele'], attributes=atts, ncopts=ncopts)
# Now all the data
# ----------------
atts = {'long_name': 'Water Surface Elevation',
'units': 'meters',
'positive': 'up',
'standard_name': 'sea_surface_height_above_geoid',
'grid': 'Bathymetry_Mesh',
'coordinates': 'time lat lon',
'type': 'data',
'location': 'node'}
nest_ncfile.add_variable('zeta', self._get_adjusted_data('zeta', adjust_tides),
['time', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Vertically Averaged x-velocity',
'units': 'meters s-1',
'grid': 'fvcom_grid',
'type': 'data'}
nest_ncfile.add_variable('ua', self._get_adjusted_data('ua', adjust_tides),
['time', 'nele'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Vertically Averaged y-velocity',
'units': 'meters s-1',
'grid': 'fvcom_grid',
'type': 'data'}
nest_ncfile.add_variable('va', self._get_adjusted_data('va', adjust_tides),
['time', 'nele'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Eastward Water Velocity',
'units': 'meters s-1',
'standard_name': 'eastward_sea_water_velocity',
'grid': 'fvcom_grid',
'coordinates': 'time siglay latc lonc',
'type': 'data',
'location': 'face'}
nest_ncfile.add_variable('u', self._get_adjusted_data('u', adjust_tides),
['time', 'siglay', 'nele'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Northward Water Velocity',
'units': 'meters s-1',
'standard_name': 'Northward_sea_water_velocity',
'grid': 'fvcom_grid',
'coordinates': 'time siglay latc lonc',
'type': 'data',
'location': 'face'}
nest_ncfile.add_variable('v', self._get_adjusted_data('v', adjust_tides),
['time', 'siglay', 'nele'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Temperature',
'standard_name': 'sea_water_temperature',
'units': 'degrees Celcius',
'grid': 'fvcom_grid',
'coordinates': 'time siglay lat lon',
'type': 'data',
'location': 'node'}
nest_ncfile.add_variable('temp', self._forcing_data['temp'],
['time', 'siglay', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'Salinity',
'standard_name': 'sea_water_salinity',
'units': '1e-3',
'grid': 'fvcom_grid',
'coordinates': 'time siglay lat lon',
'type': 'data',
'location': 'node'}
nest_ncfile.add_variable('salinity', self._forcing_data['salinity'],
['time', 'siglay', 'node'], attributes=atts, ncopts=ncopts)
atts = {'long_name': 'hydro static vertical velocity',
'units': 'meters s-1',
'grid': 'fvcom_grid',
'type': 'data',
'coordinates': 'time siglev lat lon'}
nest_ncfile.add_variable('hyw', hyw,
['time', 'siglev', 'node'], attributes=atts, ncopts=ncopts)
[docs]
def apply_ramp(self, ramp_length: float, initial_ts: Optional[list] = None,
ramp_type: str = 'cosine') -> None:
"""Apply a ramp to forcing (and optionally tidal) data.
Blends each variable smoothly from an initial value at t=0 to the
full forcing values, preventing a shock at model start-up.
Velocities (u, v, ua, va) and sea surface elevation (zeta) are always
ramped from zero. Temperature and salinity are only ramped when
initial_ts is provided.
Must be called after add_forcing_data (and add_tidal_data if tidal
data is present) and before create_forcing_file.
Args:
ramp_length: Spin-up duration or timescale in seconds.
For 'cosine' and 'linear', the ramp reaches full amplitude at
exactly t = ramp_length. For 'tanh', it is a timescale: the
ramp reaches ~76 % at t = ramp_length and ~99.5 % at
t = 3 * ramp_length (asymptotic, never strictly reaches 1).
initial_ts: [temp_initial, salinity_initial] scalar values to ramp
from for temperature and salinity. If None those variables are
not ramped.
ramp_type: Shape of the ramp function. One of:
- 'cosine' (default): half-cosine, C¹ continuous at both
t=0 and t=ramp_length. Reaches full amplitude at t=ramp_length.
- 'tanh': hyperbolic tangent, C∞ everywhere but asymptotic.
- 'linear': linear rise, C⁰ (kinked at t=0 and t=ramp_length).
"""
if not self._dates:
raise PyFVCOM2ValueError("Dates must be set before applying a ramp.")
if not self._forcing_data:
raise PyFVCOM2ValueError("Forcing data must be added before applying a ramp.")
t0 = self._dates[0]
t_sec = np.array([(t - t0).total_seconds() for t in self._dates])
ramp = self._build_ramp(t_sec, ramp_length, ramp_type)
zero_init_vars = [v for v in ('u', 'v', 'ua', 'va', 'zeta')
if v in self._forcing_data]
for var in zero_init_vars:
self._forcing_data[var] = self._apply_ramp_to_array(
self._forcing_data[var], ramp, 0.0
)
if var in self._tidal_data:
self._tidal_data[var] = self._apply_ramp_to_array(
self._tidal_data[var], ramp, 0.0
)
if initial_ts is not None:
for var, init_val in zip(('temp', 'salinity'), initial_ts):
if var in self._forcing_data:
self._forcing_data[var] = self._apply_ramp_to_array(
self._forcing_data[var], ramp, init_val
)
@staticmethod
def _build_ramp(t_sec: np.ndarray, ramp_length: float,
ramp_type: str) -> np.ndarray:
"""Build a ramp weight array from 0 to 1 over ramp_length seconds.
Args:
t_sec: Time elapsed from simulation start in seconds, shape (n_times,).
ramp_length: Duration or timescale of the ramp in seconds.
ramp_type: Shape of the ramp. One of 'cosine', 'tanh', or 'linear'.
Returns:
Ramp weight array in [0, 1], shape (n_times,).
Raises:
PyFVCOM2ValueError: If ramp_type is not recognised.
"""
if ramp_type == 'cosine':
return np.where(t_sec >= ramp_length,
1.0,
0.5 * (1 - np.cos(np.pi * t_sec / ramp_length)))
elif ramp_type == 'tanh':
return np.tanh(t_sec / ramp_length)
elif ramp_type == 'linear':
return np.minimum(t_sec / ramp_length, 1.0)
else:
raise PyFVCOM2ValueError(
f"Unknown ramp_type '{ramp_type}'. Must be 'cosine', 'tanh', or 'linear'."
)
@staticmethod
def _apply_ramp_to_array(data: np.ndarray, ramp: np.ndarray,
initial_val: float) -> np.ndarray:
"""Blend data from initial_val at t=0 to full values using ramp.
Handles 2-D (time, points) and 3-D (time, depth, points) arrays.
Args:
data: Forcing array with time as the first axis.
ramp: 1-D array of ramp weights in [0, 1], length == data.shape[0].
initial_val: Scalar initial value to ramp from.
Returns:
Ramped array with the same shape as data.
"""
if data.ndim == 2:
r = ramp[:, np.newaxis]
else:
r = ramp[:, np.newaxis, np.newaxis]
return initial_val * (1 - r) + data * r