Source code for pyfvcom2.tide

from pyfvcom2.exceptions import PyFVCOM2ValueError
import pandas
import numpy as np
import sys
import time
from typing import Optional
from netCDF4 import date2num
from datetime import datetime, timedelta
from utide import solve as utide_solve, reconstruct, ut_constants
from utide.utilities import Bunch
import multiprocessing

from .interpolation_coordinates import InterpolationCoordinates
from .tide_reader import HarmonicsData
from .fvcom_writer import FVCOMWriter
from .fvcom_reader import FVCOMReader


# Modified Julian Day zero point
MJD_ZERO_POINT = "1858-11-17"

# Default tidal constituents: eight primaries + three shallow-water.
DEFAULT_CONSTIT = ('M2', 'S2', 'N2', 'K2', 'K1', 'O1', 'P1', 'Q1', 'M4', 'MS4', 'MN4')


# ── Private progress-reporting helpers ────────────────────────────────────────

def _fmt_time(seconds: float) -> str:
    """Format a duration in seconds as a compact human-readable string."""
    s = int(seconds)
    if s < 60:
        return f'{s}s'
    m, s = divmod(s, 60)
    if m < 60:
        return f'{m}m {s:02d}s'
    h, m = divmod(m, 60)
    return f'{h}h {m:02d}m'


def _print_progress(label: str, done: int, total: int, t_start: float,
                    _state: list) -> None:
    """Write a rate-limited progress line to stdout.

    Parameters
    ----------
    label : str
        Short description shown on the left (e.g. ``'zeta'`` or ``'u [σ 3/20]'``).
    done : int
        Number of positions completed so far.
    total : int
        Total number of positions to process.
    t_start : float
        Timestamp (from :func:`time.perf_counter`) when processing started.
    _state : list
        Single-element mutable list ``[last_print_time]`` used to rate-limit
        output to at most once per second.  Pass ``[0.0]`` on first call.
    """
    now = time.perf_counter()
    finished = done >= total
    if not finished and (now - _state[0]) < 1.0:
        return
    _state[0] = now

    elapsed = now - t_start
    pct = 100.0 * done / total
    w = len(str(total))   # field width for counts
    if done > 0:
        eta_str = _fmt_time(elapsed * (total - done) / done)
    else:
        eta_str = '?'

    line = (f'  {label:<20s}: {done:{w}d}/{total}  ({pct:5.1f}%)'
            f'  elapsed {_fmt_time(elapsed):>7s}  ETA {eta_str:>7s}')
    if finished:
        print(f'\r{line}', flush=True)
    else:
        print(f'\r{line}', end='', flush=True)


# ─────────────────────────────────────────────────────────────────────────────

[docs] class TideManager: """Manages tidal harmonics interpolation and prediction. Orchestrates the pipeline from TPXO tidal harmonic data through interpolation onto target positions to tidal time series prediction via UTide. Typical usage: 1. Create a TideManager with the desired constituents. 2. Register TPXOInterpolator instances for each variable (zeta, u, v) via add_interpolator. 3. Pass the TideManager to NestManager.add_tidal_data(), which calls predict() for each variable with the appropriate target positions. Args: constituents: List of tidal constituent names (e.g. ['M2', 'S2']). parallel: Whether to run UTide predictions in parallel. Default True. pool_size: Number of parallel processes. Default None (use all CPUs). """ def __init__(self, constituents: list[str], parallel: bool = True, pool_size: Optional[int] = None): self._constituents = constituents self._parallel = parallel self._pool_size = pool_size self._interpolators = {} @property def constituents(self) -> list[str]: """List of tidal constituent names.""" return self._constituents
[docs] def add_interpolator(self, variable: str, interpolator) -> None: """Register a TPXOInterpolator for a tidal variable. Args: variable: Tidal variable name ('zeta', 'u', or 'v'). interpolator: TPXOInterpolator loaded with harmonics data for this variable. """ self._interpolators[variable] = interpolator
[docs] def predict(self, variable: str, datetimes: np.ndarray, longitudes: np.ndarray, latitudes: np.ndarray) -> np.ndarray: """Interpolate harmonics onto target positions and predict tides. Args: variable: Variable name ('zeta', 'u', or 'v'). datetimes: Array of datetime objects for prediction times. longitudes: Target longitude positions. latitudes: Target latitude positions. Returns: Predicted tidal time series, shape (n_times, n_points). """ if variable not in self._interpolators: raise PyFVCOM2ValueError( f"No interpolator registered for '{variable}'. " f"Call add_interpolator first." ) interpolated = self._interpolate_harmonics(variable, longitudes, latitudes) results = predict_tide( datetimes, self._constituents, interpolated.amplitudes, interpolated.phases, latitudes, self._parallel, self._pool_size ) return np.asarray(results).T # (n_times, n_points)
def _interpolate_harmonics(self, variable: str, longitudes: np.ndarray, latitudes: np.ndarray) -> HarmonicsData: """Interpolate harmonics onto target positions. Args: variable: Variable name. longitudes: Target longitudes. latitudes: Target latitudes. Returns: HarmonicsData with amplitudes and phases interpolated to targets. """ interpolator = self._interpolators[variable] # TPXOInterpolator.interpolate only uses x1 (lon) and x2 (lat) coords = InterpolationCoordinates( dates=np.array([]), x3=np.empty((0, len(longitudes))), x2=latitudes, x1=longitudes, horizontal_coordinate_system='geographic', vertical_coordinate_system='z', ) return interpolator.interpolate(coords)
[docs] def predict_tide( datetimes: np.ndarray, constituents: list[str], amplitudes: np.ndarray, phases: np.ndarray, latitudes: np.ndarray, parallel: bool = True, pool_size: Optional[int] = None, ): """ Reconstruct tidal variations in zeta, u, v etc at the provided datetimes and latitudes using the provided tidal constituent amplitudes and phases. Args: datetimes : np.ndarray Array of datetime objects for prediction times. interval : float Time interval between datetimes in days. constituents : list[str] List of tidal constituent names to read. amplitudes : np.ndarray Amplitude of the relevant constituents shaped [nlocs, nconst]. phases : np.ndarray Array of the phase of the relevant constituents shaped [nlocs, nconst]. latitudes : np.ndarray Latitudes of the positions to predict. parallel : bool, optional Whether to run the predictions in parallel using multiprocessing. Default is True. pool_size : int, optional Number of parallel processes to use. If 1, runs serially. Default is 1. Returns: results : list[np.ndarray] List of predicted zeta time series arrays for each location. """ const_indices = np.asarray( [ut_constants["const"]["name"].tolist().index(i) for i in constituents] ) frq = ut_constants["const"]["freq"][const_indices] # Extend the provided datetimes by +/- 1 day to avoid edge effects in UTide (TBC). Maintain an # appropriate time interval for the predictions. extended_datetimes = extend_datetime_array(datetimes, extension_length_in_days=1.0) # Convert extended datetimes to datenums extended_times = date2num(extended_datetimes, units=f"days since {MJD_ZERO_POINT} 00:00:00") # Take reference time to be the median time in the input times ref_time = np.median(extended_times) coef = Bunch(name=constituents, mean=0, slope=0) coef["aux"] = Bunch(reftime=ref_time, lind=const_indices, frq=frq) coef["aux"]["opt"] = Bunch( twodim=False, nodsatlint=False, nodsatnone=False, gwchlint=False, gwchnone=False, notrend=True, prefilt=[], nodiagn=True, ) # Prepare the time data for predicting the time series. # UTide needs netCDF date2num times. args = [ (latitudes[i], extended_times, coef, amplitudes[i], phases[i]) for i in range(len(latitudes)) ] if parallel is False or pool_size == 1: results = [] for arg in args: results.append(reconstruct_wrapper(arg)) else: pool = multiprocessing.Pool(pool_size) results = pool.map(reconstruct_wrapper, args) pool.close() # Remove the extended time predictions to return only those for the original datetimes mask = (extended_datetimes >= datetimes[0]) & (extended_datetimes <= datetimes[-1]) results = [r[mask] for r in results] return results
[docs] def reconstruct_wrapper(args: tuple) -> np.ndarray: """ For the given time and coefficients (in coef) reconstruct the tidal elevation or current component time series at the given latitude. Args: args : tuple Tuple of (lats, times, coef, amplitudes, phases) where: - lats: Latitude of the position to predict. - times: Array of datenums (days since MJD zero point). - coef: UTide coefficients Bunch. - amplitudes: Amplitude of the relevant constituents shaped [nconst]. - phases: Phase of the relevant constituents shaped [nconst]. Returns: zeta : np.ndarray Time series of surface elevations. Notes ----- Uses utide.reconstruct() for the predicted tide. Accepts a single tuple argument for compatibility with multiprocessing.Pool.map. """ lats, times, coef, amplitudes, phases = args coef["aux"]["lat"] = lats coef["A"] = amplitudes coef["g"] = phases coef["A_ci"] = np.zeros(amplitudes.shape) coef["g_ci"] = np.zeros(phases.shape) pred = reconstruct(times, coef, epoch=f"{MJD_ZERO_POINT}") zeta = pred["h"] return zeta
[docs] def extend_datetime_array(datetimes: np.ndarray, extension_length_in_days: float=1.0) -> np.ndarray: """ Extend a datetime array by a given number of days at both ends. TODO This was in the original PyFVCOM, although in a different form. Unsure whether it is actually needed for UTide. A simpler approach would be to just add one day at each end, but this might create unevenly spaced time arrays. Would this matter? Need to test it. Args: datetimes (np.ndarray): Original array of datetime objects. extension_length_in_days (float): Number of days to extend at both ends. Returns: np.ndarray: Extended array of datetime objects. """ # Calculate the interval between original datetimes. Check the interval is consistent across the array. if len(datetimes) < 2: raise ValueError("datetimes array must contain at least two elements to determine the interval.") # Does the time interval need to be consistent across the array? Copying what was done in PyFVCOM. intervals = np.diff(datetimes) interval = intervals[0] if not np.all(intervals == interval): raise PyFVCOM2ValueError("Inconsistent time intervals found.") # Use the extension length to create new start and end datetimes extended_datetimes_start = datetimes[0] - timedelta(days=extension_length_in_days) extended_datetimes_end = datetimes[-1] + timedelta(days=extension_length_in_days) # Create the extended datetime array with the same interval extended_datetimes = pandas.date_range(extended_datetimes_start, extended_datetimes_end, freq=timedelta(days=interval.days, seconds=interval.seconds)) return extended_datetimes.to_pydatetime()
[docs] class HarmonicAnalysisWriter(FVCOMWriter): """Write tidal harmonic analysis results to a NetCDF file. Subclasses FVCOMWriter to provide methods for writing the grid, sigma coordinates, constituent names, harmonic amplitudes/phases, and optionally raw or predicted time series. Typical usage:: dims = { 'node': nx, 'nele': ne, 'siglay': nz, 'siglev': nzlev, 'three': 3, 'nconsts': len(consts), 'NameStrLen': 4, 'DateStrLen': 26, } global_atts = {'title': 'FVCOM harmonic analysis', ...} with HarmonicAnalysisWriter('harmonics.nc', dims, global_atts) as nc: nc.add_grid(lon, lat, lonc, latc, h, h_center, nv, siglay, siglev, consts) nc.write_fvcom_time(times) # only needed when writing time series nc.add_harmonics('zeta', z_amp, z_phase) nc.add_harmonics('ua', ua_amp, ua_phase) nc.add_harmonics('u', u_amp, u_phase) Notes ----- Dimensions must be passed to __init__ before calling add_grid() or add_harmonics(). The 'time' dimension should be set to 0 (unlimited) when writing raw or predicted time series, and omitted otherwise. The ncopts compression settings default to zlib level 7, matching the original PyFVCOM HarmonicOutput behaviour. """ # Depth-resolved variables that require a siglay dimension. _SIGLAY_VARS = {'u', 'v'} # Variables located at nodes (vs. elements). _NODE_VARS = {'zeta'} # Default compression options. _NCOPTS = {'zlib': True, 'complevel': 7}
[docs] def add_grid(self, lon, lat, lonc, latc, h, h_center, nv, siglay, siglev, consts): """Write grid coordinates, bathymetry, connectivity and constituent names. Parameters ---------- lon, lat : array-like Node longitudes and latitudes. lonc, latc : array-like Element centre longitudes and latitudes. h : array-like Node bathymetry (m, positive down). h_center : array-like Element centre bathymetry (m, positive down). nv : array-like Node-to-element connectivity table, shape (3, nele). siglay : array-like Sigma layer coordinates, shape (siglay, node). siglev : array-like Sigma level coordinates, shape (siglev, node). consts : list of str Tidal constituent names, padded to 4 characters (e.g. 'M2 '). """ ncopts = self._NCOPTS self.add_variable('lon', lon, ['node'], attributes={'units': 'degrees_east', 'long_name': 'nodal longitude', 'standard_name': 'longitude'}, ncopts=ncopts) self.add_variable('lat', lat, ['node'], attributes={'units': 'degrees_north', 'long_name': 'nodal latitude', 'standard_name': 'latitude'}, ncopts=ncopts) self.add_variable('lonc', lonc, ['nele'], attributes={'units': 'degrees_east', 'long_name': 'element centre longitude', 'standard_name': 'longitude'}, ncopts=ncopts) self.add_variable('latc', latc, ['nele'], attributes={'units': 'degrees_north', 'long_name': 'element centre latitude', 'standard_name': 'latitude'}, ncopts=ncopts) self.add_variable('h', h, ['node'], attributes={'long_name': 'Bathymetry', 'standard_name': 'sea_floor_depth_below_geoid', 'units': 'm', 'positive': 'down'}, ncopts=ncopts) self.add_variable('h_center', h_center, ['nele'], attributes={'long_name': 'Bathymetry', 'standard_name': 'sea_floor_depth_below_geoid', 'units': 'm', 'positive': 'down', 'grid_location': 'center'}, ncopts=ncopts) self.add_variable('nv', nv, ['three', 'nele'], attributes={'long_name': 'nodes surrounding element'}, format='i', ncopts=ncopts) self.add_variable('siglay', siglay, ['siglay', 'node'], attributes={'long_name': 'Sigma Layers', 'standard_name': 'ocean_sigma/general_coordinate', 'positive': 'up', 'valid_min': -1.0, 'valid_max': 0.0, 'formula_terms': 'sigma: siglay eta: zeta depth: h'}, ncopts=ncopts) self.add_variable('siglev', siglev, ['siglev', 'node'], attributes={'long_name': 'Sigma Levels', 'standard_name': 'ocean_sigma/general_coordinate', 'positive': 'up', 'valid_min': -1.0, 'valid_max': 0.0, 'formula_terms': 'sigma: siglev eta: zeta depth: h'}, ncopts=ncopts) # Constituent names: one variable per field type (z, u, v). for prefix in ('z', 'u', 'v'): self.add_variable(f'{prefix}_const_names', consts, ['nconsts', 'NameStrLen'], attributes={'long_name': f'Tidal constituent names for {prefix}'}, format='c', ncopts=ncopts)
[docs] def add_harmonics(self, variable, amplitude, phase): """Write harmonic amplitude and phase for a given variable. Parameters ---------- variable : str One of 'zeta', 'ua', 'va', 'u', 'v'. amplitude : array-like Harmonic amplitudes (m or m/s). Shape is (nconsts, node) for 'zeta', (nconsts, nele) for depth-averaged velocity, or (nconsts, siglay, nele) for depth-resolved velocity. phase : array-like Harmonic phases (degrees), same shape as amplitude. """ ncopts = self._NCOPTS if variable == 'zeta': dims = ['nconsts', 'node'] coords = 'lon lat nconsts' elif variable in ('ua', 'va'): dims = ['nconsts', 'nele'] coords = 'nconsts lonc latc' elif variable in ('u', 'v'): dims = ['nconsts', 'siglay', 'nele'] coords = 'nconsts lonc latc' else: raise PyFVCOM2ValueError( f"Unsupported variable '{variable}'. " "Expected one of 'zeta', 'ua', 'va', 'u', 'v'." ) short = variable self.add_variable(f'{short}_amp', amplitude, dims, attributes={'long_name': f'Tidal harmonic amplitude of {short}', 'standard_name': f'{short}_amplitude', 'units': 'm' if variable == 'zeta' else 'm s-1', 'coordinates': coords}, ncopts=ncopts) self.add_variable(f'{short}_phase', phase, dims, attributes={'long_name': f'Tidal harmonic phase of {short}', 'standard_name': f'{short}_phase', 'units': 'degrees', 'coordinates': coords}, ncopts=ncopts)
[docs] def add_raw(self, variable, data): """Write the raw (input) time series used in the harmonic analysis. Requires a 'time' dimension to have been created and write_fvcom_time() to have been called first. Parameters ---------- variable : str One of 'zeta', 'ua', 'va', 'u', 'v'. data : array-like Time series data. Shape is (time, node) for 'zeta', (time, nele) for depth-averaged, or (time, siglay, nele) for depth-resolved. """ self._write_timeseries(variable, data, suffix='raw')
[docs] def add_predicted(self, variable, data): """Write a predicted time series reconstructed from the harmonics. Requires a 'time' dimension to have been created and write_fvcom_time() to have been called first. Parameters ---------- variable : str One of 'zeta', 'ua', 'va', 'u', 'v'. data : array-like Predicted time series. Same shape conventions as add_raw(). """ self._write_timeseries(variable, data, suffix='pred')
def _write_timeseries(self, variable, data, suffix): """Common implementation for add_raw() and add_predicted().""" ncopts = self._NCOPTS label = 'Modelled' if suffix == 'raw' else 'Predicted' if variable == 'zeta': dims = ['time', 'node'] coords = 'time lat lon' long_name = f'{label} surface elevation' elif variable in ('ua', 'va'): dims = ['time', 'nele'] coords = 'time latc lonc' direction = 'Eastward' if variable == 'ua' else 'Northward' long_name = f'{label} {direction} depth-averaged velocity' elif variable in ('u', 'v'): dims = ['time', 'siglay', 'nele'] coords = 'time latc lonc' direction = 'Eastward' if variable == 'u' else 'Northward' long_name = f'{label} {direction} velocity' else: raise PyFVCOM2ValueError( f"Unsupported variable '{variable}'. " "Expected one of 'zeta', 'ua', 'va', 'u', 'v'." ) units = 'm' if variable == 'zeta' else 'm s-1' self.add_variable(f'{variable}_{suffix}', data, dims, attributes={'long_name': long_name, 'units': units, 'coordinates': coords}, ncopts=ncopts)
def _solve_one(args: tuple): """Solve UTide harmonics for a single spatial position. Defined at module level for :mod:`multiprocessing` pickle compatibility. Each argument is packed into a single tuple by :func:`analyse_harmonics`. Returns ------- tuple or None ``(phase, amplitude, predicted_or_None)`` on success; ``None`` when the position has a NaN latitude (output row is left as NaN). """ import warnings times, u, lat, constit, epoch, verbose, predict = args if np.isnan(lat): return None with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, module='utide') res = utide_solve(t=times, u=u, lat=lat, method='ols', constit=constit, epoch=epoch, verbose=verbose) c_order = [res['name'].tolist().index(cc) for cc in constit] g = res['g'][c_order] # phase A = res['A'][c_order] # amplitude h = None if predict: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, module='utide') rec = reconstruct(t=times, coef=res, epoch=epoch, verbose=verbose) h = rec['h'] return (g, A, h)
[docs] def analyse_harmonics( times: np.ndarray, elevations: np.ndarray, latitudes: np.ndarray, predict: bool = False, constit: tuple = DEFAULT_CONSTIT, pool_size: int = 1, label: str = '', **kwargs, ) -> np.ndarray: """Run UTide harmonic analysis on a set of time series. Parameters ---------- times : np.ndarray Modified Julian Day times, shape (ntimes,). elevations : np.ndarray Time series data, shape (npositions, ntimes). latitudes : np.ndarray Latitude for each position, shape (npositions,). Positions with a NaN latitude are skipped and left as NaN in the output. predict : bool, optional If True, also return a reconstructed predicted time series alongside the harmonics. Defaults to False. constit : tuple, optional Tidal constituent names to include in the analysis. Defaults to DEFAULT_CONSTIT. pool_size : int, optional Number of worker processes to use. ``1`` (default) runs a simple serial loop. Values greater than 1 dispatch positions across a :class:`multiprocessing.Pool` via ``imap``, producing near-linear speedup on multi-core machines. Use ``multiprocessing.cpu_count()`` to utilise all available cores. Tasks are yielded lazily so that memory use scales with *pool_size*, not with *npositions* — important for long records (hourly or sub-hourly over months). label : str, optional If non-empty, a live progress line is printed to stdout showing completed positions, elapsed time, and estimated time remaining. :class:`HarmonicsAnalyser` sets this automatically when ``show_progress=True``; it can also be set directly when calling :func:`analyse_harmonics` standalone. Defaults to ``''`` (no output). **kwargs Additional keyword arguments forwarded to :func:`utide.solve`. The ``verbose`` key is also forwarded to :func:`utide.reconstruct` when *predict* is True. Returns ------- harmonics : np.ndarray Shape ``(npositions, 2, len(constit))``. Index 0 along the second axis is phase (degrees), index 1 is amplitude (m or m/s). Positions with a NaN latitude are left as NaN. predicted : np.ndarray Only returned when *predict* is True. Shape ``(npositions, ntimes)``. Positions with a NaN latitude are left as NaN. Notes ----- UTide returns constituents in an order that may differ from *constit*. The output arrays are always re-sorted to match the order given in *constit*. """ from multiprocessing import Pool import warnings npositions = len(latitudes) harmonics = np.full((npositions, 2, len(constit)), np.nan) if predict: predicted = np.full((npositions, len(times)), np.nan) # Extract verbose for reconstruct; utide.solve also accepts it via **kwargs. verbose = kwargs.get('verbose', False) # Progress tracking: _state[0] holds the timestamp of the last print. _state = [0.0] t_start = time.perf_counter() if pool_size == 1: # ── Serial path ─────────────────────────────────────────────────────── for i, (timeseries, lat) in enumerate(zip(elevations, latitudes)): if label: _print_progress(label, i, npositions, t_start, _state) if np.isnan(lat): continue with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, module='utide') res = utide_solve(t=times, u=timeseries, lat=lat, method='ols', constit=constit, epoch=MJD_ZERO_POINT, **kwargs) # UTide returns constituents in a different order; re-sort to match constit. c_order = [res['name'].tolist().index(cc) for cc in constit] harmonics[i, 0, :] = res['g'][c_order] # phase harmonics[i, 1, :] = res['A'][c_order] # amplitude if predict: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning, module='utide') reconstructed = reconstruct(t=times, coef=res, epoch=MJD_ZERO_POINT, verbose=verbose) predicted[i, :] = reconstructed['h'] if label: _print_progress(label, npositions, npositions, t_start, _state) else: # ── Parallel path ───────────────────────────────────────────────────── # Tasks are yielded lazily so only pool_size * chunksize time-series # arrays are held in memory at any one time — important for long # records (hourly/sub-hourly over months). def _task_gen(): for u, lat in zip(elevations, latitudes): yield (times, u, lat, constit, MJD_ZERO_POINT, verbose, predict) # chunksize batches tasks per IPC round-trip: small enough to keep # memory bounded, large enough to amortise round-trip overhead. chunksize = max(1, pool_size * 4) with Pool(processes=pool_size) as pool: for i, result in enumerate( pool.imap(_solve_one, _task_gen(), chunksize=chunksize) ): if label: _print_progress(label, i, npositions, t_start, _state) if result is None: continue # NaN latitude — leave output row as NaN g, A, h = result harmonics[i, 0, :] = g harmonics[i, 1, :] = A if predict and h is not None: predicted[i, :] = h if label: _print_progress(label, npositions, npositions, t_start, _state) if predict: return harmonics, predicted return harmonics
[docs] class HarmonicsAnalyser: """Orchestrate tidal harmonic analysis from FVCOM output. Loads data from a :class:`~pyfvcom2.fvcom_reader.FVCOMReader`, runs UTide harmonic analysis for each requested variable, and writes results to a :class:`HarmonicAnalysisWriter` NetCDF file. Parameters ---------- reader : FVCOMReader Open reader for the FVCOM model output to analyse. output_file : str Path to the NetCDF output file to create. constit : tuple, optional Tidal constituent names. Defaults to :data:`DEFAULT_CONSTIT`. predict : bool, optional If True, also write a predicted (reconstructed) time series for each variable. Defaults to False. dump_raw : bool, optional If True, also write the raw (input) time series for each variable. Defaults to False. node_indices : array-like of int, optional Integer indices into the full node array selecting the subset of nodes to analyse. If None (default), all nodes are analysed. Useful for restricting the analysis to a spatial region of interest and avoiding the cost of analysing the full mesh. Typically constructed as:: mask = (lon >= lon_min) & (lon <= lon_max) & (lat >= lat_min) & (lat <= lat_max) node_indices = np.where(mask)[0] element_indices : array-like of int, optional Integer indices into the full element array selecting the subset of elements to analyse. If None (default), all elements are analysed. Constructed analogously to *node_indices* using element-centred coordinates (``reader.lon_elements``, ``reader.lat_elements``). verbose : bool, optional If True, print UTide's per-solve progress messages (``"solve: matrix prep … solution … done."``). Defaults to False. pool_size : int, optional Number of worker processes for parallel analysis. ``1`` (default) runs serially. Set to the number of physical cores (e.g. ``multiprocessing.cpu_count()``) for maximum throughput. Passed directly to :func:`analyse_harmonics`. show_progress : bool, optional If True (default), print a live progress line to stdout for each variable (and each sigma layer for 3-D variables) showing the count of completed positions, elapsed time, and estimated time remaining. Set to False to suppress all progress output. Examples -------- :: reader = FVCOMReader(['output_0001.nc', 'output_0002.nc']) analyser = HarmonicsAnalyser(reader, 'harmonics.nc', predict=True) analyser.run(['zeta', 'u', 'v']) """ def __init__( self, reader: FVCOMReader, output_file: str, constit: tuple = DEFAULT_CONSTIT, predict: bool = False, dump_raw: bool = False, node_indices: np.ndarray = None, element_indices: np.ndarray = None, verbose: bool = False, pool_size: int = 1, show_progress: bool = True, ): self._reader = reader self._output_file = output_file self._constit = constit self._predict = predict self._dump_raw = dump_raw self._node_indices = node_indices self._element_indices = element_indices self._verbose = verbose self._pool_size = pool_size self._show_progress = show_progress # Pre-compute MJD times once — used by analyse_harmonics and write_fvcom_time. self._times = date2num( reader.dates, units=f'days since {MJD_ZERO_POINT} 00:00:00' ) # Fixed-width (4-char) constituent name lists for netCDF character array. self._cnames = [list(f'{c:4s}') for c in constit]
[docs] def run(self, variables: list = None): """Run harmonic analysis for the specified variables. Opens the output file, writes the grid and (when applicable) time, then runs the analysis variable by variable. Parameters ---------- variables : list of str, optional FVCOM variable names to analyse. Valid values are ``'zeta'``, ``'u'``, ``'v'``, ``'ua'``, ``'va'``. Defaults to all five. """ if variables is None: variables = ['zeta', 'u', 'v', 'ua', 'va'] reader = self._reader node_idx = self._node_indices elem_idx = self._element_indices n_nodes = reader.n_nodes if node_idx is None else len(node_idx) n_ele = reader.n_elements if elem_idx is None else len(elem_idx) lon = reader.lon_nodes if node_idx is None else reader.lon_nodes[node_idx] lat = reader.lat_nodes if node_idx is None else reader.lat_nodes[node_idx] lonc = reader.lon_elements if elem_idx is None else reader.lon_elements[elem_idx] latc = reader.lat_elements if elem_idx is None else reader.lat_elements[elem_idx] h = reader.bathy_nodes if node_idx is None else reader.bathy_nodes[node_idx] h_ctr = reader.bathy_elements if elem_idx is None else reader.bathy_elements[elem_idx] siglay = reader.sigma_layers_nodes if node_idx is None else reader.sigma_layers_nodes[:, node_idx] siglev = reader.sigma_levels_nodes if node_idx is None else reader.sigma_levels_nodes[:, node_idx] # nv is only meaningful for a full-mesh run (subset connectivity # would require re-indexing which is not attempted here). if node_idx is None: nv = (reader.grid.triangles + 1).T else: # Placeholder: store a zero array; plotting via scatter not tripcolor. nv = np.zeros((3, n_ele), dtype=int) dims = { 'node': n_nodes, 'nele': n_ele, 'siglay': reader.n_sigma_layers, 'siglev': reader.n_sigma_levels, 'three': 3, 'nconsts': len(self._constit), 'NameStrLen': 4, } if self._predict or self._dump_raw: dims['time'] = 0 # unlimited dims['DateStrLen'] = 26 # ISO-8601 string length for write_fvcom_time global_atts = { 'title': 'FVCOM tidal harmonic analysis', 'history': 'Created by pyfvcom2 HarmonicsAnalyser', } with HarmonicAnalysisWriter(self._output_file, dims, global_atts) as ncout: ncout.add_grid( lon=lon, lat=lat, lonc=lonc, latc=latc, h=h, h_center=h_ctr, nv=nv, siglay=siglay, siglev=siglev, consts=self._cnames, ) if self._predict or self._dump_raw: ncout.write_fvcom_time(reader.dates) for var in variables: self._process_variable(var, ncout)
def _process_variable(self, var: str, ncout: HarmonicAnalysisWriter): """Load, analyse, and write harmonics for one variable.""" reader = self._reader is_node = reader.var_is_node_based(var) has_depth = reader.get_vertical_position(var) == 'layer_centre' indices = self._node_indices if is_node else self._element_indices lat_all = reader.lat_nodes if is_node else reader.lat_elements latitudes = lat_all if indices is None else lat_all[indices] npositions = len(latitudes) ntimes = len(reader.dates) nconsts = len(self._constit) if has_depth: nz = reader.n_sigma_layers amp = np.full((nconsts, nz, npositions), np.nan) phase = np.full((nconsts, nz, npositions), np.nan) if self._predict: pred = np.full((ntimes, nz, npositions), np.nan) if self._dump_raw: raw = np.full((ntimes, nz, npositions), np.nan) for zlev in range(nz): # data shape: (ntimes, npositions) data = self._load_full_timeseries(var, zlev=zlev) if self._dump_raw: raw[:, zlev, :] = data label = f'{var} [\u03c3 {zlev + 1}/{nz}]' if self._show_progress else '' # analyse_harmonics expects elevations as (npositions, ntimes) result = analyse_harmonics( self._times, data.T, latitudes, predict=self._predict, constit=self._constit, pool_size=self._pool_size, label=label, verbose=self._verbose, ) if self._predict: harmonics, predicted = result pred[:, zlev, :] = predicted.T # (ntimes, npositions) else: harmonics = result amp[:, zlev, :] = harmonics[:, 1, :].T # (nconsts, npositions) phase[:, zlev, :] = harmonics[:, 0, :].T else: data = self._load_full_timeseries(var) # (ntimes, npositions) if self._dump_raw: raw = data.copy() label = var if self._show_progress else '' result = analyse_harmonics( self._times, data.T, latitudes, predict=self._predict, constit=self._constit, pool_size=self._pool_size, label=label, verbose=self._verbose, ) if self._predict: harmonics, predicted = result pred = predicted.T # (ntimes, npositions) else: harmonics = result amp = harmonics[:, 1, :].T # (nconsts, npositions) phase = harmonics[:, 0, :].T ncout.add_harmonics(var, amp, phase) if self._dump_raw: ncout.add_raw(var, raw) if self._predict: ncout.add_predicted(var, pred) def _load_full_timeseries(self, var: str, zlev: int = None) -> np.ndarray: """Load the complete time series for one variable. Reads each underlying NetCDF file in a single bulk slice rather than one timestep at a time, which is substantially faster for variables with many time steps. Parameters ---------- var : str FVCOM variable name. zlev : int, optional Sigma layer index (0-based). Required for depth-resolved variables; omit for 2-D variables such as ``zeta``, ``ua``, ``va``. Returns ------- data : np.ndarray Shape ``(ntimes, npositions)``, where npositions reflects any spatial subset set on the analyser. """ from netCDF4 import Dataset as _Dataset reader = self._reader is_node = reader.var_is_node_based(var) spatial_idx = self._node_indices if is_node else self._element_indices npositions = (reader.n_nodes if is_node else reader.n_elements) \ if spatial_idx is None else len(spatial_idx) ntimes = len(reader.dates) data = np.full((ntimes, npositions), np.nan) # Build a per-file list of (global_time_indices, local_time_indices) # using the reader's internal time→file mapping. file_slices: dict[str, list] = {} for global_t, dt in enumerate(reader.dates): fp, local_t = reader._time_to_file_map[dt] file_slices.setdefault(fp, []).append((global_t, local_t)) for fp, index_pairs in file_slices.items(): global_idxs = [p[0] for p in index_pairs] local_idxs = [p[1] for p in index_pairs] with _Dataset(fp) as ds: raw = ds.variables[var][local_idxs, ...] # (n_t_local, ...) if np.ma.is_masked(raw): raw = np.ma.getdata(raw) if zlev is not None: # 3-D variable: (n_t_local, nz, npositions) slice_ = raw[:, zlev, :] if spatial_idx is None else raw[:, zlev, :][:, spatial_idx] else: # 2-D variable: (n_t_local, npositions) slice_ = raw if spatial_idx is None else raw[:, spatial_idx] data[global_idxs, :] = slice_ return data