Source code for IceAdvect.advect

#!/usr/bin/env python3
"""
advect.py
Written by Tyler Sutterley (02/2026)
Routines for advecting ice parcels using velocity estimates

PYTHON DEPENDENCIES:
    numpy: Scientific Computing Tools For Python
        https://numpy.org
        https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
    xarray: N-D labeled arrays and datasets in Python
        https://docs.xarray.dev/en/stable/

UPDATE HISTORY:
    Updated 02/2026: adjust logging levels for advection steps
    Written 01/2026
"""

from __future__ import annotations

import copy
import logging
import numpy as np
import xarray as xr
import timescale.time


# default epoch for time conversions
__epoch__ = timescale.time._j2000_epoch


[docs] class Advect: """ Data class for advecting ice parcels using velocity estimates Attributes ---------- ds: xarray.DataTree xarray DataTree of velocity data x: np.ndarray x-coordinates y: np.ndarray y-coordinates t: np.ndarray time coordinates x0: np.ndarray or NoneType, default None Final x-coordinate after advection y0: np.ndarray or NoneType, default None Final y-coordinate after advection t0: np.ndarray or float, default 0.0 Ending time for advection ds: xarray.Dataset xarray Dataset of velocity data integrator: str Advection function - ``'euler'`` - ``'RK4'`` - ``'RKF45'`` method: str, default 'linear' Interpolation method for velocities - ``'linear'``, ``'nearest'``: xarray interpolation method time_units: str, default 'seconds' Units for input time coordinates fill_value: float or NoneType, default np.nan invalid value for output data """ np.seterr(invalid="ignore") def __init__(self, ds, **kwargs): # set default keyword arguments kwargs.setdefault("x", None) kwargs.setdefault("y", None) kwargs.setdefault("t", None) kwargs.setdefault("t0", 0.0) kwargs.setdefault("integrator", "RK4") kwargs.setdefault("method", "linear") kwargs.setdefault("time_units", "seconds since 2018-01-01T00:00:00") # parse time units epoch, to_sec = timescale.time.parse_date_string(kwargs["time_units"]) # set default class attributes self.x = copy.deepcopy(kwargs["x"]) self.y = copy.deepcopy(kwargs["y"]) # convert times to deltatime in seconds since J2000 self._time = timescale.from_deltatime( to_sec * np.array(kwargs["t"], dtype="f8"), epoch=epoch ) self.t = self._time.to_deltatime(epoch=__epoch__, scale=86400.0) self._time0 = timescale.from_deltatime( to_sec * np.array(kwargs["t0"], dtype="f8"), epoch=epoch ) self.t0 = self._time0.to_deltatime(epoch=__epoch__, scale=86400.0) self.velocity = ds self.integrator = copy.deepcopy(kwargs["integrator"]) self.method = copy.deepcopy(kwargs["method"])
[docs] def run(self, **kwargs): """ Runs the advection of parcels using specified parameters Returns ------- x0: np.ndarray Final x-coordinate after advection y0: np.ndarray Final y-coordinate after advection """ # advect the parcels self.translate(**kwargs) # return final coordinates return self.x0, self.y0
[docs] def interp( self, x: np.ndarray, y: np.ndarray, t: float | np.ndarray = 0.0, **kwargs, ): """ Interpolates velocity data to specified coordinates Parameters ---------- x: np.ndarray x-coordinates y: np.ndarray y-coordinates t: float or np.ndarray, default 0.0 time coordinates kwargs: dict keyword arguments for xarray interpolation """ # create xarray Dataset for interpolated velocities ds = xr.Dataset() # interpolate to specified coordinates if "t" in self.velocity: # clip time to within range of velocity dataset clipped = np.clip( copy.deepcopy(t), np.min(self.velocity.t.values), np.max(self.velocity.t.values), ) # interpolate to x, y, and t coordinates for v in ("U", "V"): ds[v] = self.velocity[v].interp(x=x, y=y, t=clipped, **kwargs) else: # interpolate to x and y coordinates for v in ("U", "V"): ds[v] = self.velocity[v].interp(x=x, y=y, **kwargs) # return the dataset return ds
# PURPOSE: translate a parcel between two times using an advection function
[docs] def translate(self, **kwargs): """ Translates a parcel using an advection function Parameters ---------- integrator: str Advection function - ``'euler'`` - ``'RK4'`` - ``'RKF45'`` method: str Interpolation method from xarray - ``'linear'``: linear interpolation for regular grids - ``'nearest'``: nearest-neighbor interpolation step: int or float, default 86400 Temporal step size for advection (in seconds) N: int or NoneType, default None Number of integration steps Default is determined based on the temporal step size t0: float, default 0.0 Ending time for advection """ # set default keyword arguments kwargs.setdefault("integrator", self.integrator) kwargs.setdefault("method", self.method) kwargs.setdefault("step", 86400) kwargs.setdefault("N", None) kwargs.setdefault("t0", self.t0) # update advection class attributes if kwargs["integrator"] != self.integrator: self.integrator = copy.deepcopy(kwargs["integrator"]) if kwargs["method"] != self.method: self.method = copy.deepcopy(kwargs["method"]) if kwargs["t0"] != self.t0: self.t0 = copy.deepcopy(kwargs["t0"]) # advect the parcel every step # (using closest number of iterations) step = np.float64(kwargs["step"]) # set or calculate the number of steps to advect the dataset if kwargs["N"] is not None: n_steps = copy.deepcopy(kwargs["N"]) elif np.min(self.t0) < np.min(self.t): # maximum number of steps to advect backwards in time n_steps = np.abs(np.max(self.t) - np.min(self.t0)) / step elif np.max(self.t0) > np.max(self.t): # maximum number of steps to advect forward in time n_steps = np.abs(np.max(self.t0) - np.min(self.t)) / step elif (np.ndim(self.t0) == 0) or (np.ndim(self.t) == 0): # maximum number of steps between the two datasets n_steps = np.max(np.abs(self.t0 - self.t)) / step else: # average number of steps between the two datasets n_steps = np.abs(np.mean(self.t0) - np.mean(self.t)) / step # check input advection functions kwargs.update({"N": np.int64(n_steps)}) logging.info(f"Advecting {kwargs['N']} steps") if self.integrator == "euler": # euler: Explicit Euler method return self.euler(**kwargs) elif self.integrator == "RK4": # RK4: Fourth-order Runge-Kutta method return self.RK4(**kwargs) elif self.integrator == "RKF45": # RKF45: adaptive Runge-Kutta-Fehlberg 4(5) method return self.RKF45(**kwargs) else: raise ValueError("Invalid advection function")
# PURPOSE: Advects parcels using an Explicit Euler integration
[docs] def euler(self, **kwargs): """ Advects parcels using an Explicit Euler integration Parameters ---------- N: int, default 1 Number of integration steps """ # set default keyword options kwargs.setdefault("N", 1) # translate parcel from t to t0 at time step dt = (self.t0 - self.t) / np.float64(kwargs["N"]) self.x0 = copy.deepcopy(self.x) self.y0 = copy.deepcopy(self.y) # keep track of time for 3-dimensional interpolations t = copy.deepcopy(self.t) for i in range(kwargs["N"]): logging.debug(f"Euler step {i + 1} of {kwargs['N']}") ds = self.interp(x=self.x0, y=self.y0, t=t) # add displacements to x0 and y0 self.x0 += ds.U.values * dt self.y0 += ds.V.values * dt # add to time t += dt # return the translated coordinates return self
# PURPOSE: Advects parcels using a fourth-order Runge-Kutta integration
[docs] def RK4(self, **kwargs): """ Advects parcels using a fourth-order Runge-Kutta integration Parameters ---------- N: int, default 1 Number of integration steps """ # set default keyword options kwargs.setdefault("N", 1) # translate parcel from t to t0 at time step dt = np.squeeze(self.t0 - self.t) / np.float64(kwargs["N"]) self.x0 = copy.deepcopy(self.x) self.y0 = copy.deepcopy(self.y) # keep track of time for 3-dimensional interpolations t = copy.deepcopy(self.t) for i in range(kwargs["N"]): logging.debug(f"RK4 step {i + 1} of {kwargs['N']}") ds1 = self.interp(x=self.x0, y=self.y0, t=t) x2 = self.x0 + 0.5 * ds1.U.values * dt y2 = self.y0 + 0.5 * ds1.V.values * dt ds2 = self.interp(x=x2, y=y2, t=t) x3 = self.x0 + 0.5 * ds2.U.values * dt y3 = self.y0 + 0.5 * ds2.V.values * dt ds3 = self.interp(x=x3, y=y3, t=t) x4 = self.x0 + ds3.U.values * dt y4 = self.y0 + ds3.V.values * dt ds4 = self.interp(x=x4, y=y4, t=t) # add displacements to x0 and y0 self.x0 += ( dt * ( ds1.U.values + 2.0 * ds2.U.values + 2.0 * ds3.U.values + ds4.U.values ) / 6.0 ) self.y0 += ( dt * ( ds1.V.values + 2.0 * ds2.V.values + 2.0 * ds3.V.values + ds4.V.values ) / 6.0 ) # add to time t += dt # return the translated coordinates return self
# PURPOSE: Advects parcels using a Runge-Kutta-Fehlberg integration
[docs] def RKF45(self, **kwargs): """ Advects parcels using a Runge-Kutta-Fehlberg 4(5) integration Parameters ---------- N: int, default 1 Number of integration steps """ # set default keyword options kwargs.setdefault("N", 1) # coefficients in Butcher tableau for Runge-Kutta-Fehlberg 4(5) method b4 = [ 25.0 / 216.0, 0.0, 1408.0 / 2565.0, 2197.0 / 4104.0, -1.0 / 5.0, 0.0, ] b5 = [ 16.0 / 135.0, 0.0, 6656.0 / 12825.0, 28561.0 / 56430.0, -9.0 / 50.0, 2.0 / 55.0, ] # using an adaptive step size: # iterate solution until the difference is less than the tolerance # difference between the 4th and 5th order solutions sigma = np.inf # tolerance for solutions tolerance = 5e-2 # multiply scale by factors of 2 until iteration reaches tolerance level scale = 1 self.x0 = copy.deepcopy(self.x) self.y0 = copy.deepcopy(self.y) # while the difference (sigma) is greater than the tolerance while (sigma > tolerance) or np.isnan(sigma): # translate parcel from t to t0 at time step dt = (self.t0 - self.t) / np.float64(scale * kwargs["N"]) X4OA = copy.deepcopy(self.x) Y4OA = copy.deepcopy(self.y) X5OA = copy.deepcopy(self.x) Y5OA = copy.deepcopy(self.y) # keep track of time for 3-dimensional interpolations t = copy.deepcopy(self.t) for i in range(scale * kwargs["N"]): logging.debug(f"RKF45 step {i + 1} of {scale * kwargs['N']}") # calculate fourth order accurate solutions u4, v4 = self.RFK45_interp(X4OA, Y4OA, dt, t=t) # add displacements to X40A and Y40A X4OA += dt * np.dot(b4, u4) Y4OA += dt * np.dot(b4, v4) # calculate fifth order accurate solutions u5, v5 = self.RFK45_interp(X5OA, Y5OA, dt, t=t) # add displacements to X50A and Y50A X5OA += dt * np.dot(b5, u5) Y5OA += dt * np.dot(b5, v5) # add to time t += dt # calculate difference between 4th and 5th order accurate solutions variance = (X5OA - X4OA) ** 2 + (Y5OA - Y4OA) ** 2 sigma = np.sqrt(np.nanmean(variance)) logging.info(f"RKF45 sigma: {sigma:.4f} (tolerance: {tolerance})") # if sigma is less than the tolerance: save x and y coordinates # else: multiply scale by factors of 2 and re-run iteration if (sigma <= tolerance) or np.isnan(sigma): self.x0 = copy.deepcopy(X4OA) self.y0 = copy.deepcopy(Y4OA) else: scale *= 2 # return the translated coordinates return self
# PURPOSE: calculates X and Y velocities for Runge-Kutta-Fehlberg 4(5) method
[docs] def RFK45_interp( self, xi: np.ndarray, yi: np.ndarray, dt: np.ndarray, **kwargs ): """ Calculates X and Y velocities for Runge-Kutta-Fehlberg 4(5) method Parameters ---------- xi: np.ndarray x-coordinates yi: np.ndarray y-coordinates dt: np.ndarray integration time step size t: np.ndarray or NoneType, default None time coordinates """ kwargs.setdefault("t", None) # Butcher tableau for Runge-Kutta-Fehlberg 4(5) method A = np.array( [ [1.0 / 4.0, 0.0, 0.0, 0.0, 0.0], [3.0 / 32.0, 9.0 / 32.0, 0.0, 0.0, 0.0], [1932.0 / 2197.0, -7200.0 / 2197.0, 7296.0 / 2197.0, 0.0, 0.0], [439.0 / 216.0, -8.0, 3680.0 / 513.0, -845.0 / 4104.0, 0.0], [ -8.0 / 27.0, 2.0, -3544.0 / 2565.0, 1859.0 / 4104.0, -11.0 / 40.0, ], ] ) # calculate velocities and parameters for iteration ds1 = self.interp(x=xi, y=yi, t=kwargs["t"]) x2 = xi + A[0, 0] * ds1.U.values * dt y2 = yi + A[0, 0] * ds1.V.values * dt ds2 = self.interp(x=x2, y=y2, t=kwargs["t"]) x3 = xi + (A[1, 0] * ds1.U.values + A[1, 1] * ds2.U.values) * dt y3 = yi + (A[1, 0] * ds1.V.values + A[1, 1] * ds2.V.values) * dt ds3 = self.interp(x=x3, y=y3, t=kwargs["t"]) x4 = ( xi + ( A[2, 0] * ds1.U.values + A[2, 1] * ds2.U.values + A[2, 2] * ds3.U.values ) * dt ) y4 = ( yi + ( A[2, 0] * ds1.V.values + A[2, 1] * ds2.V.values + A[2, 2] * ds3.V.values ) * dt ) ds4 = self.interp(x=x4, y=y4, t=kwargs["t"]) x5 = ( xi + ( A[3, 0] * ds1.U.values + A[3, 1] * ds2.U.values + A[3, 2] * ds3.U.values + A[3, 3] * ds4.U.values ) * dt ) y5 = ( yi + ( A[3, 0] * ds1.V.values + A[3, 1] * ds2.V.values + A[3, 2] * ds3.V.values + A[3, 3] * ds4.V.values ) * dt ) ds5 = self.interp(x=x5, y=y5, t=kwargs["t"]) x6 = ( xi + ( A[4, 0] * ds1.U.values + A[4, 1] * ds2.U.values + A[4, 2] * ds3.U.values + A[4, 3] * ds4.U.values + A[4, 4] * ds5.U.values ) * dt ) y6 = ( yi + ( A[4, 0] * ds1.V.values + A[4, 1] * ds2.V.values + A[4, 2] * ds3.V.values + A[4, 3] * ds4.V.values + A[4, 4] * ds5.V.values ) * dt ) ds6 = self.interp(x=x6, y=y6, t=kwargs["t"]) U = np.array( [ ds1.U.values, ds2.U.values, ds3.U.values, ds4.U.values, ds5.U.values, ds6.U.values, ] ) V = np.array( [ ds1.V.values, ds2.V.values, ds3.V.values, ds4.V.values, ds5.V.values, ds6.V.values, ] ) return (U, V)
@property def distance(self): """ Calculates displacement between the start and end coordinates Returns ------- dist: np.ndarray Eulerian distance between start and end points """ try: dist = np.sqrt((self.x0 - self.x) ** 2 + (self.y0 - self.y) ** 2) except Exception as exc: return None else: return dist def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, value): setattr(self, key, value)