Source code for pynsitu.drifters

import warnings
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype

from numpy.linalg import inv
from scipy.linalg import solve
from scipy.sparse import diags
from scipy.special import erf

from pynsitu.geo import GeoAccessor, compute_velocities, compute_accelerations

from numba import njit, guvectorize, int32, float64, prange

#################################################################################
# ------------------------ DRIFTER DATA CLEANING --------------------------------
#################################################################################


[docs] def despike_isolated(df, acceleration_threshold, acc_key=None, verbose=False): """Drops isolated anomalous positions (spikes) in a position time series. Anomalous positions are first detected if acceleration exceed the provided threshold. Detected values are masked if they are combined with an adequate pattern of acceleration sign reversals, e.g. +-+ or -+- Speed acceleration should have been computed with the pynsitu.geo.GeoAccessor, e.g.: df.geo.compute_velocities(centered=False, acceleration=True) Parameters ---------- df: `pandas.DataFrame` Input dataframe, must contain an `acceleration` column acceleration_threshold: float Threshold used to detect anomalous values acc_key: tuple, optional Keys/labels/column identifiers for x/y/absolute value of acceleration verbose: boolean Outputs number of anomalous values detected Default is True Returns ------- df: `pandas.DataFrame` Output dataframe with spikes removed. """ if acc_key is None: acc_key = "acceleration_east", "acceleration_north", "acceleration" assert acc_key[2] in df.columns, ( "'acceleration' should be a column. You may need to leverage the " + "geo accessor first (pynsitu.geo.GeoAccessor) with " + "`df.geo.compute_velocities(acceleration=True)``" ) # first pass: anomalous large acceleration values spikes = df[df[acc_key[2]] > acceleration_threshold] # second pass: seach for adequate sign reversals validated_single_spikes = [] for t in spikes.index: C = [] # check for a double sign reversal of acceleration for acc in acc_key[:1]: if t > df.index[0] and t < df.index[-1]: am = df.loc[:t, acc].iloc[-2] a = spikes.loc[t, acc] ap = df.loc[t:, acc].iloc[1] # check if am and ap have opposite sign to a C.append(am * a < 0 and ap * a < 0) if len(C) > 0 and any(C): validated_single_spikes.append(t) if verbose: print( f"{len(validated_single_spikes)} single spikes dropped out of {spikes.index.size}" + f" potential ones (acceleration threshold)" ) # drops single spikes df = df.drop(validated_single_spikes) return df
[docs] def despike_all(df, acceleration_threshold, acc_key=None, verbose=False): """Drops isolated anomalous positions (spikes) in a position time series. Anomalous positions are first detected if acceleration exceed the provided threshold. Speed acceleration should have been computed with the pynsitu.geo.GeoAccessor, e.g.: df.geo.compute_velocities(centered=False, acceleration=True) Parameters ---------- df: `pandas.DataFrame` Input dataframe, must contain an `acceleration` column acceleration_threshold: float Threshold used to detect anomalous values acc_key: tuple, optional Keys/labels/column identifiers for x/y/absolute value of acceleration verbose: boolean Outputs number of anomalous values detected Default is True Returns ------- df: `pandas.DataFrame` Output dataframe with spikes removed. """ if acc_key is None: acc_key = "acceleration_east", "acceleration_north", "acceleration" assert acc_key[2] in df.columns, ( "'acceleration' should be a column. You may need to leverage the " + "geo accessor first (pynsitu.geo.GeoAccessor) with " + "`df.geo.compute_velocities(acceleration=True)``" ) return df[ (abs(df[acc_key[0]]) < acceleration_threshold) | (abs(df[acc_key[1]]) < acceleration_threshold) ]
[docs] def despike_pm(df, acceleration_threshold, pm=1, acc_key=None, verbose=False): """Drops anomalous positions (spikes) in a position time series. Anomalous positions are first detected if acceleration exceed the provided threshold. The pm points before and after are also removed Speed acceleration should have been computed with the pynsitu.geo.GeoAccessor, e.g.: df.geo.compute_velocities(centered=False, acceleration=True) Parameters ---------- df: `pandas.DataFrame` Input dataframe, must contain an `acceleration` column acceleration_threshold: float Threshold used to detect anomalous values pm : int number of point before and after to remove acc_key: tuple, optional Keys/labels/column identifiers for x/y/absolute value of acceleration verbose: boolean Outputs number of anomalous values detected Default is True Returns ------- df: `pandas.DataFrame` Output dataframe with spikes removed. """ if acc_key is None: acc_key = "acceleration_east", "acceleration_north", "acceleration" assert acc_key[2] in df.columns, ( "'acceleration' should be a column. You may need to leverage the " + "geo accessor first (pynsitu.geo.GeoAccessor) with " + "`df.geo.compute_velocities(acceleration=True)``" ) df_ = df.copy() df_ = df_.reset_index() spikes = df_[ (abs(df_[acc_key[1]]) > acceleration_threshold) | (abs(df_[acc_key[1]]) > acceleration_threshold) ].index.values spikes = np.unique(np.concatenate([spikes + pm, spikes - pm, spikes])) spikes = df.iloc[spikes].index return df.drop(spikes)
######################################################## # -----------FIND AND FILL WITH NAN BIG GAPS------------#
[docs] def nan_in_gap(df, df_gap, dtmax, inplace=False): """Fill gaps bigger than dtmax with nan Parameters ---------- df : dataframe on which we want to put the gap df_gap : original dataframe dtmax: float max gap length in seconds inplace : boolean """ df = df.reset_index() if not inplace: df = df.copy() time_start, time_end = find_gap(df_gap, dtmax) for i in range(len(time_start)): test1 = df.time > time_start[i] test2 = df.time < time_end[i] test = np.logical_not(test1 & test2) df = df.where(test) if not inplace: return df.set_index("time")
######################################################## # -----------COMPUTATION OF ACC (SAME FOR ALL)------------# def compute_acc(df_out, geo, spectral_diff, method, velocities_key, accelerations_key): if spectral_diff: if geo: if method == "lowess" or method == "variational": df_out.geo.compute_accelerations( from_=("xy_spectral", "x", "y"), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, ) if method == "spydell": df_out.geo.compute_accelerations( from_=("velocities_spectral", velocities_key[0], velocities_key[1]), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, ) # should still recompute for non-geo datasets else: if method == "lowess" or method == "variational": compute_accelerations( df_out, from_=("xy_spectral", "x", "y"), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=False, ) if method == "spydell": compute_accelerations( df_out, from_=("velocities_spectral", velocities_key[0], velocities_key[1]), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=True, ) else: if geo: if method == "lowess" or method == "variational": df_out.geo.compute_accelerations( from_=("xy", "x", "y"), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, ) if method == "spydell": df_out.geo.compute_accelerations( from_=("velocities", velocities_key[0], velocities_key[1]), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, ) # should still recompute for non-geo datasets else: if method == "lowess" or method == "variational": compute_accelerations( df_out, from_=("xy", "x", "y"), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=False, ) if method == "spydell": compute_accelerations( df_out, from_=("velocities", velocities_key[0], velocities_key[1]), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=True, ) ########################################### # -----------VARIATIONNAL METHOD------------#
[docs] def variational_smooth( df, t_target, acc_cut, position_error, acceleration_amplitude, acceleration_T, time_chunk=2, velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], spectral_diff=True, geo=True, ): """Smooth and resample a drifter position time series The smoothing balances positions information according to the specified position error and the smoothness of the output time series by specifying a typical acceleration amplitude and decorrelation timescale (assuming exponential decorrelation). The output trajectory `x` minimizes: || I(x) - x_obs ||^2 / e_x^2 + (D2 x)^T R^{-1} (D2 x) where e_x is the position error, `I` the time interpolation operator, `R` the acceleration autocorrelation, `D2` the second order derivative. Closest reference (but no temporal autocorrelation of acceleration considered): Yaremchuk and Coelho 2015. Filtering Drifter Trajectories Sampled at Submesoscale, Resolution. IEEE Journal of Oceanic Engineering Parameters ---------- df: `pandas.DataFrame` Input drifter time series, must contain projected positions (`x` and `y`) t_target: `pandas.core.indexes.datetimes.DatetimeIndex` Output time series, as typically given by pd.date_range Note that the problem seems ill-posed in the downsampling case ... need to be fixed acc_cut : float, acceleration spike cut position_error: float Position error in meters acceleration_amplitude: float Acceleration typical amplitude acceleration_T: float Acceleration decorrelation timescale in seconds time_chunk: int/float, optional Maximum time chunk (in days) to process at once. Data is processed by chunks and patched together. velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc spectral_diff : boolean computing velocities and accelaration with spectral diff or not import_columns : list of str list of df constant columns we want to import (ex: id, platform) geo: boolean, optional if geo obj with projection Return : interpolated dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, +norms, +import_columns with index time """ df = df.copy() # store projection to align with dataframes produced if geo: if "lonc" in df: proj_ref = (df.lonc.values[0], df.latc.values[0]) if "lonc" not in import_columns: import_columns += ["lonc", "latc"] else: proj_ref = df.geo.projection_reference # index = time if df.index.name != "time": if df.index.name == None: df = df.set_index("time") else: df = df.reset_index().set_index("time") # assert x, y in dataframe if "x" not in df or "y" not in df: assert False, "positions must be labelled as 'x' and 'y'" if geo: if "lon" not in df or "lat" not in df: assert False, "longitude, latitude must be labelled as 'lon' and 'lat'" if accelerations_key[2] not in df: assert False, "'acceleration' should be provided" # despike acceleration try: # df = despike_isolated(df, acc_cut, accelerations_key)# spike are made of not only one points # df = despike_all(df, acc_cut, accelerations_key)# spike before and after often are also not ok df = despike_pm(df, acc_cut, pm=1, acc_key=accelerations_key) except: assert False, "pb despike" # select only x, y if "id" not in import_columns: import_columns += ["id"] var = ["x", "y"] + import_columns # if geo: # var += ["lon", "lat"] df = df[var] # t_target if isinstance(t_target, str): t_target = pd.date_range( df.index.min().ceil(t_target), df.index.max(), freq=t_target ) else: # enforce t_target type t_target = pd.DatetimeIndex(t_target) # Time series length in days T = (t_target[-1] - t_target[0]) / pd.Timedelta("1d") if not time_chunk or T < time_chunk * 1.1: df_out = _variational_smooth_one( df, t_target, position_error, acceleration_amplitude, acceleration_T, ) else: print(f"Chunking dataframe into {time_chunk} days chunks") # divide target timeline into chunks D = _divide_into_time_chunks(t_target, time_chunk, overlap=0.3) # split computation delta = pd.Timedelta("3h") R = [] for time in D: df_chunk = df.loc[ (df.index > time[0] - delta) & (df.index < time[-1] + delta) ] df_chunk_smooth = _variational_smooth_one( df_chunk, time, position_error, acceleration_amplitude, acceleration_T, ) R.append(df_chunk_smooth) # brute concatenation: introduce strong discontinuities in velocity/acceleration # df_out = pd.concat(R) # removes duplicated times # df_out = df_out.loc[~df_out.index.duplicated(keep="first")] ## patch timeseries together, not that simple ... col_float = [c for c in df.columns if np.issubdtype(df[c].dtype, float)] # note: is_numeric_dtype(df[c].dtype) lets int pass through i = 0 while i < len(R) - 1: if i == 0: df_left = R[i] else: df_left = df_right df_right = R[i + 1] delta = df_left.index[-1] - df_right.index[0] t_mid = df_right.index[0] + delta * 0.5 # bring time series on a common timeline index = df_left.index.union(df_right.index) # df_left = df_left.reindex(index, method=None) df_right = df_right.reindex(index, method=None) # build weights w = (1 - erf((df_left.index.to_series() - t_mid) * 5 / delta)) * 0.5 # note: the width in the error function needs to be much smaller than delta. # Discontinuities visible on acceleration are visible otherwise # A factor 5 is chosen here df_left = df_left.fillna(df_right) df_right = df_right.fillna(df_left) # patch values with float dtypes for c in col_float: df_right.loc[:, c] = df_left.loc[:, c] * w + df_right.loc[:, c] * ( 1 - w ) i += 1 df_out = df_right # fix non float dtypes # for c in df_out.columns: # df_out[c] = df_out[c].astype(df[c].dtype) # fill na df_out = df_out.bfill().ffill() # compute velocity if spectral_diff: dist = "spectral" else: dist = "xy" if geo: # initiate lon, lat (needed to compute_acc, even if it is computed from x, y) df_out["lon"] = df.lonc.mean() df_out["lat"] = df.latc.mean() df_out.geo.compute_velocities( names=velocities_key, distance=dist, inplace=True, fill_startend=True, ) else: compute_velocities( df_out, "index", names=velocities_key, distance=dist, inplace=True, centered=True, fill_startend=True, ) # compute acceleration compute_acc( df_out, geo, spectral_diff, "variational", velocities_key, accelerations_key ) # update lon/lat if geo: # first reset reference from df df_out.geo.set_projection_reference(proj_ref) # inplace df_out.geo.compute_lonlat() # inplace df_out["X"] = np.sqrt(df_out["x"] ** 2 + df_out["y"] ** 2) # df_out[accelerations_key[2]] = np.sqrt( # df_out[accelerations_key[0]] ** 2 + df_out[accelerations_key[1]] ** 2 # ) # import columns/info ex: id or time if import_columns: for column in import_columns: try: df_out[column] = df[column].iloc[0] except: assert False, df.columns return df_out
def _variational_smooth_one( df, t_target, position_error, acceleration_amplitude, acceleration_T, ): """core processing for variational_smooth, process one time window""" # init final structure df_out = df.reindex(df.index.union(t_target), method="nearest").reindex(t_target) # providing "nearest" above is essential to preserve type (on int64 data typically) # override with interpolation for float data col_float = [c for c in df.columns if np.issubdtype(df[c], float)] for c in col_float: df_out[c] = ( df[c] .reindex(df.index.union(t_target)) .interpolate("time") .reindex(t_target) ) df_out.index.name = "time" # exponential acceleration autocorrelation R = lambda dt: acceleration_amplitude**2 * np.exp(-np.abs(dt / acceleration_T)) # get operators L, I = _get_smoothing_operators(t_target, df.index, position_error, R) # x df_out["x"] = solve(L, I.T.dot(df["x"].values)) # y df_out["y"] = solve(L, I.T.dot(df["y"].values)) return df_out def _divide_into_time_chunks(time, T, overlap=0.1): """Divide a dataframe into chunks of duration T (in days) Parameters ---------- time: pd.DatetimeIndex Timeseries T: float Size of time chunks in days """ Td = pd.Timedelta("1d") * T # assumes time is the index t_first = time[0] t_last = time[-1] t = t_first D = [] while t < t_last: # try to keep a chunk of size T even last one if t + Td > t_last: tb = max(t_last - Td, t_first) start, end = tb, tb + Td else: start, end = t, t + Td D.append(time[(time >= start) & (time <= end)]) t = t + Td * (1 - overlap) return D def _get_smoothing_operators(t_target, t, position_error, acceleration_R): """Core operators in order to minimize: (Ix - x_obs)^2 / e_x^2 + (D2 x)^T R^{-1} (D2 x) where R is the acceleration autocorrelation function, assumed to follow """ # assumes t_target is uniform dt = t_target[1] - t_target[0] # build linear interpolator Nt = t_target.size I = np.zeros((t.size, Nt)) i_t = np.searchsorted( t_target, t ) # Find the indices into a sorted array `t_target` such that, if the corresponding elements in `t` were inserted before the indices, the order of `t_target` would be preserved i = np.where((i_t > 0) & (i_t < Nt))[ 0 ] # remove times before and after the target times j = i_t[i] # t[0] is between t_target[i_t[0]-1=j[0]-1] and t_target[i_t[0]=j[0]] w = (t[i] - t_target[j - 1]) / dt # weight =distance to neightboors I[i, j - 1] = w I[i, j] = 1 - w # second order derivative one_second = pd.Timedelta("1s") dt2 = (dt / one_second) ** 2 D2 = diags( [1 / dt2, -2 / dt2, 1 / dt2], [-1, 0, 1], shape=(Nt, Nt) ).toarray() # Nt*Nt with -2/dt2 at the diagonale, 1/dt2 just below and above # fix boundaries # D2[0, :] = 0 # D2[-1, :] = 0 # need to impose boundary conditions or else pulls acceleration towards 0 as it is # D2[0, [0, 1]] = [-1/dt2, 1/dt2] # not good: pull velocity towards 0 at edges # D2[-1, [-2, -1]] = [-1/dt2, 1/dt2] # not good: pull velocity towards 0 at edges # constant acceleration at boundaries (does not work ... weird): # D2[0, [0, 1, 2, 3]] = [-1 / dt2, 3 / dt2, -3 / dt2, 1 / dt2] # D2[-1, [-4, -3, -2, -1]] = [1 / dt2, -3 / dt2, 3 / dt2, -1 / dt2] # acceleration autocorrelation _t = t_target.values R = acceleration_R((_t[:, None] - _t[None, :]) / one_second) # apply constraint on laplacian only on inner points (should try to impose above boundary treatment instead) D2 = D2[1:-1, :] R = R[1:-1, 1:-1] # boundaries # R[0,:] = 0 # R[0,0] = R[1,1]*1000 # R[-1,:] = 0 # R[-1,-1] = R[-2,-2]*1000 # iR = inv(R) # assemble final operator L = I.T.dot(I) + D2.T.dot(iR.dot(D2)) * position_error**2 return L, I ########################################### # -----------EMPIRICAL METHOD------------# import warnings
[docs] def spydell_smooth( df, t_target, acc_cut=1e-3, nb_pt_mean=5, velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], spectral_diff=True, geo=True, ): """ Smooth and interpolated a trajectory with the method described in Spydell et al. 2021. Parameters: ----------- df : dataframe with raw trajectory, must contain 'time', 'x', 'y','u' and 'v' t_target: `pandas.core.indexes.datetimes.DatetimeIndex` or str Output time series, as typically given by pd.date_range or the delta time of the output time series as str In this case, t_target is then recomputed taking start-end the start end of the input trajectory and the given delta time acc_cut : float, acceleration spike cut value nb_pt_mean : odd int, number of points of wich is applied the box mean velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc import_columns : list of str, list of df constant columns we want to import (ex: id, platform) geo: boolean, optional if geo obj with projection Return : interpolated dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, +norms, id, platform with index time """ # index = time if df.index.name != "time": if df.index.name == None: df = df.set_index("time") else: df = df.reset_index().set_index("time") # assert x, y in dataframe if "x" not in df or "y" not in df: assert False, "positions must be labelled as 'x' and 'y'" if velocities_key[0] not in df or velocities_key[1] not in df: assert False, f"velocities must be labelled as {velocities_key}" # store projection to align with dataframes produced if geo: if "lonc" in df: proj_ref = (df.lonc.values[0], df.latc.values[0]) if "lonc" not in import_columns: import_columns += ["lonc", "latc"] else: proj_ref = df.geo.projection_reference # t_target if isinstance(t_target, str): t_target = pd.date_range( df.index.min().ceil(t_target), df.index.max(), freq=t_target ) # xarray for easy interpolation ds = df.to_xarray()[["x", "y", velocities_key[0], velocities_key[1]]] # fill little gap # 3) linearly interpolate velocities ds = ds.interp(time=t_target, method="linear") reg_dt = t_target[1] - t_target[0] # 4) integrate velocities and find constant # ms_x, ms_y = (df.x**2).mean(), (df.y**2).mean() x_cum = ds[velocities_key[0]].cumsum("time") * reg_dt / pd.Timedelta("1s") y_cum = ds[velocities_key[1]].cumsum("time") * reg_dt / pd.Timedelta("1s") # def msx_difference(x_0) : # return abs(ms_x-((x_0+x_cum)**2).mean()) # def msy_difference(y_0) : # return abs(ms_y-((y_0+y_cum)**2).mean()) from scipy.optimize import minimize def msx_difference(x_0): return ((ds.x - x_0 - x_cum) ** 2).mean() def msy_difference(y_0): return ((ds.y - y_0 - y_cum) ** 2).mean() from scipy.optimize import minimize x_0 = minimize(msx_difference, ds.x[0]).x y_0 = minimize(msy_difference, ds.y[0]).x ds["x"] = x_0 + x_cum ds["y"] = y_0 + y_cum # 5) remove spike and interpolate ds[accelerations_key[0]] = ds[velocities_key[0]].differentiate( "time", datetime_unit="s" ) ds[accelerations_key[1]] = ds[velocities_key[1]].differentiate( "time", datetime_unit="s" ) x = ds.where(ds[accelerations_key[0]] < acc_cut).x y = ds.where(ds[accelerations_key[1]] < acc_cut).y print( f"nb of spike removed { np.isnan(x).sum('time').values} over {ds.dims['time']}" ) ds["x"] = x.interpolate_na("time") ds["y"] = y.interpolate_na("time") # 6) Box mean on nb_pt_mean if nb_pt_mean % 2 == 0: warnings.warn("nb_pt_mean should be odd, set to np_pt_window+1") nb_pt_mean += 1 if nb_pt_mean == 0: assert False, "np_pt_window=0" n = nb_pt_mean // 2 ds0 = 0 ds1 = ds for i in np.arange(-n, n + 1): ds0 += ds1.shift({"time": i}) ds1 = ds ds0 = ds0 / nb_pt_mean # test box mean assert ( ds0.isel(time=n) == ds.isel(time=slice(0, nb_pt_mean)).mean() ), "pb with mean over n points" ds0 = ds0.drop([accelerations_key[0], accelerations_key[1]]) # Build full dataframe df_out = ds0.to_dataframe() # import columns/info ex: id or time if "id" not in import_columns: import_columns += ["id"] if import_columns: for column in import_columns: df_out[column] = df[column][0] # fill na df_out = df_out.bfill().ffill() # compute acceleration if geo: # initiate lon, lat (needed to compute_acc, even if it is computed from x, y) df_out["lon"] = df.lonc.mean() df_out["lat"] = df.latc.mean() # update lon/lat if geo: # first reset reference from df df_out.geo.set_projection_reference(proj_ref) # inplace df_out.geo.compute_lonlat() # inplace df_out["X"] = np.sqrt(df_out["x"] ** 2 + df_out["y"] ** 2) compute_acc( df_out, geo, spectral_diff, "spydell", velocities_key, accelerations_key ) # df_out[accelerations_key[2]] = np.sqrt( # df_out[accelerations_key[0]] ** 2 + df_out[accelerations_key[1]] ** 2 # ) return df_out
########################################### # -----------w METHOD------------#
[docs] @njit def find_nearest_neighboors(time, t, i): """Find 3 remaining neighbouring points i is a starting value (closest point) """ nt = len(time) nb = 5 ib = [0 for _ in range(nb)] ib[0] = i # initiate direction for advance search (with problem at boundaries solved) if i == nt: # end-boundary case delta_plus = 0 else: delta_plus = 1 if i == 0: delta_minus = 0 # starting boundary case else: delta_minus = -1 # if not at boundaries delta_minus=-1 and delta_plus=1 counter = 1 while counter < nb: ib[counter], delta_minus, delta_plus = advance_search( nt, time, t, i, delta_plus, delta_minus ) counter += 1 return np.sort(np.array(ib))
# @guvectorize([(float64[:], float64[:])], '(n)->(n)') # @guvectorize(["void(float64[:], float64[:])"], '(n)->(n)') # guvectorize cannot be called from a jit method at the moment, see: https://github.com/numba/numba/issues/5720 # def I_func(v, res): @njit def I_func(v): I = np.zeros_like(v) for i in range(v.shape[0]): if v[i] > -1 or v[i] < 1: I[i] = v[i] else: I[i] = 0.0 return I @njit def solve_position_velocity(t_nb, x_nb, time_target): # solve for x and u : x + u*(t_nb-date_target) = x_nb t = t_nb - time_target t_nbs = np.sort(t_nb) dt = t_nbs[-1] - t_nbs[0] weights = 70 / 81 * (1 - np.abs(t / dt) ** 3) ** 3 * I_func(t / dt) w = np.sum(weights) wt = np.sum(weights * t) wt2 = np.sum(weights * t**2) A = np.array([[w, wt], [wt, wt2]]) # coef gradients b = np.array([np.sum(weights * x_nb), np.sum(weights * x_nb * t)]) out = np.linalg.solve(A, b) return out[0], out[1] @njit def solve_position_velocity_acceleration(t_nb, x_nb, time_target): # solve for x and u : x + u*(t_nb-date_target) = x_nb t_nbs = np.sort(t_nb) dt = t_nbs[-1] - t_nbs[0] # t = t_nb - time_target t = (t_nb - time_target) / dt weights = 70 / 81 * (1 - np.abs(t / dt) ** 3) ** 3 * I_func(t / dt) w = np.sum(weights) wt = np.sum(weights * t) wt2 = np.sum(weights * t**2) wt3 = np.sum(weights * t**3) wt4 = np.sum(weights * t**4) A = np.array( [[w, wt, wt2 / 2], [wt, wt2, wt3 / 2], [wt2, wt3, wt4 / 2]] ) # coef gradients b = np.array( [ # np.sum(weights * x_nb), # np.sum(weights * x_nb * t), # np.sum(weights * x_nb * t**2), np.sum(weights * (x_nb - x_nb[0])), np.sum(weights * (x_nb - x_nb[0]) * t), np.sum(weights * (x_nb - x_nb[0]) * t**2), ] ) # out = np.linalg.solve(A, b)# pb of multiple solution or no solution https://stackoverflow.com/questions/13795682/numpy-error-singular-matrix out = np.linalg.lstsq(A, b) # return out[0], out[1], out[2] # return out[0][0], out[0][1], out[0][2] # return out[0][0] + x_nb[0], out[0][1], out[0][2] return out[0][0] + x_nb[0], out[0][1] / dt, out[0][2] / dt**2 # @njit("UniTuple(float64[:], 2)(float64[:], float64[:], float64[:])")
[docs] @njit def lowess(time, x, time_target, nb=4, degree=2): """perform a lowess interpolation Parameters ---------- time: np.array time array, assumed to be sorted in time, should be floats x: np.array positions time_target: np.array target timeline nb : number of closest neighboors to consider degree : 2 or 3, of the polynomial """ nt = len(time_target) assert time_target[0] >= time[0], "time_target[0] is not within time span" assert time_target[-1] <= time[-1], "time_target[-1] is not within time span" # find closest values # d = np.abs(time[:,None] - time_target[None, :]) # nope (numba '0.56.3') d = np.abs(time.reshape(len(time), 1) - time_target.reshape(1, nt)) i_closest = np.argmin( d, axis=0 ) # the indice of the nearest time in the raw time series (time) for each time of the regular time series (time_target) x_out = np.full(nt, np.nan) u_out = np.full(nt, np.nan) a_out = np.full(nt, np.nan) for i in prange(nt): if degree == 2: nb = nb if degree == 3: nb = nb i_nb = np.arange(i_closest[i] - nb // 2, i_closest[i] + nb // 2 + 1) a = [i < 0 or i > len(time) - 1 for i in i_nb] # np.any not ok with numba if True in a: continue # start-end edge stay nan values # i_nb = find_nearest_neighbours(time, time_target[i], i_closest[i]) t_nb = time[i_nb] x_nb = x[i_nb] if degree == 2: try: x_out[i], u_out[i] = solve_position_velocity(t_nb, x_nb, time_target[i]) except: print( "WARNING : pb with solve_position_velocity, set to nan", "t_target =", time_target[i], ) x_out[i], u_out[i] = np.nan, np.nan elif degree == 3: try: x_out[i], u_out[i], a_out[i] = solve_position_velocity_acceleration( t_nb, x_nb, time_target[i] ) except: print( "WARNING : pb with solve_position_velocity, set to nan", "t_target =", time_target[i], ) x_out[i], u_out[i], a_out[i] = np.nan, np.nan, np.nan return x_out, u_out, a_out
[docs] def lowess_smooth( df, t_target, degree=2, iteration=3, nb=4, T_low_pass=None, cutoff_low_pass=None, velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=None, spectral_diff=False, geo=False, ): """perform a lowess interpolation with optional posteriori low pass filter Parameters ---------- df: dataframe, must contain x, y t_target: `pandas.core.indexes.datetimes.DatetimeIndex` or str Output time series, as typically given by pd.date_range or the delta time of the output time series as str In this case, t_target is then recomputed taking start-end the start end of the input trajectory and the given delta time degree : 2 or 3, degree of the polynomial for the lowess method iteration : number of time to apply LOWESS (interpolation on t_target at the last iteration) nb : number of closest neighboors to consider T_low_pass : float Filter length in days, if None (default), does not apply filter cutoff_low_pass : float low pass filter cutoff frequency in cpp velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc import_columns : list of str list of df constant columns we want to import (ex: id, platform) spectral_diff : boolean, if True use spectral differentiation instead of central differentiation geo: boolean, optional if geo obj with projection Return : dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, and ae-an computed via lowess if degree = 3,+norms, id, platform """ # index = time if df.index.name != "time": if df.index.name == None: df = df.set_index("time") else: df = df.reset_index().set_index("time") # assert x, y in dataframe if "x" not in df or "y" not in df: assert False, "positions must be labelled as 'x' and 'y'" # store projection to align with dataframes produced if geo: if "lonc" in df: if "lonc" not in import_columns: import_columns += ["lonc", "latc"] proj_ref = (df.lonc.values[0], df.latc.values[0]) else: proj_ref = df.geo.projection_reference # time in seconds from first time df["date"] = (df.index - df.index.min()) / pd.Timedelta("1s") # t_target if isinstance(t_target, str): t_target = pd.date_range( df.index.min().ceil(t_target), df.index.max(), freq=t_target ) # t_ target in seconds from first time date_target = (t_target - t_target[0]) / pd.Timedelta("1s") x_out = df.x.values y_out = df.y.values time = df.date.values time_target = time # apply lowess for i in range(iteration): if i == iteration - 1: time_target = ( date_target.values ) # final interpolation on regular t_target time x_out, u_out, ax_out = lowess(time, x_out, time_target, nb=nb, degree=degree) y_out, v_out, ay_out = lowess(time, y_out, time_target, nb=nb, degree=degree) # dataframe if degree == 2: df_out = pd.DataFrame( { "x": x_out, "y": y_out, velocities_key[0]: u_out, velocities_key[1]: v_out, "time": t_target, } ) elif degree == 3: df_out = pd.DataFrame( { "x": x_out, "y": y_out, velocities_key[0]: u_out, velocities_key[1]: v_out, accelerations_key[0]: ax_out, accelerations_key[1]: ay_out, "time": t_target, } ) # df_out[accelerations_key[2]] = np.sqrt( # df_out[accelerations_key[0]] ** 2 + df_out[accelerations_key[1]] ** 2 # ) df_out = df_out.set_index("time") # fill na (misssing at least 2 values at the border, because lowess need 2 nearest neightbours at each sides df_out = df_out.bfill().ffill() # APPLY LOW PASS -> only on velocities + reintegrate positions if T_low_pass: df_out = low_pass_(df_out, T_low_pass, cutoff_low_pass, velocities_key) print(f"LOW-PASS : {cutoff_low_pass}cpd with {T_low_pass}days length") # import columns/info ex: id or time if "id" not in import_columns: import_columns += ["id"] if import_columns: for column in import_columns: df_out[column] = df[column][0] # compute acceleration if geo: # initiate lon, lat (needed to compute_acc, even if it is computed from x, y) df_out["lon"] = df.lonc.mean() df_out["lat"] = df.latc.mean() if degree != 3: compute_acc( df_out, geo, spectral_diff, "lowess", velocities_key, accelerations_key ) # update lon/lat if geo: # first reset reference from df df_out.geo.set_projection_reference(proj_ref) # inplace df_out.geo.compute_lonlat() # inplace df_out["X"] = np.sqrt(df_out["x"] ** 2 + df_out["y"] ** 2) df_out[velocities_key[2]] = np.sqrt( df_out[velocities_key[0]] ** 2 + df_out[velocities_key[1]] ** 2 ) # df_out[accelerations_key[2]] = np.sqrt( # df_out[accelerations_key[0]] ** 2 + df_out[accelerations_key[1]] ** 2 # ) return df_out
########################################### # -----------LOWPASS------------#
[docs] def posteriori_low_pass_xy( df, T=1, cutoff=13, velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], ): """Apply low pass filter to a smoothed trajectory a posteriori Parameters ---------- df: dataframe, must contain x, y T : float Filter length in days cutoff : float low pass filter cutoff frequency in cpp import_columns : list of str list of df constant columns we want to import (ex: id, platform) velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc Return : dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, accelerations, +norms, id, platform """ from scipy.signal import filtfilt from scipy.integrate import cumulative_trapezoid from scipy.optimize import minimize # coefficients dt = df.dt.mean() / 3600 / 24 # in days from pynsitu.tseries import generate_filter taps = generate_filter(band="low", dt=dt, T=T, bandwidth=cutoff) dff = df[["x", "y"]] # apply filter dff["x"] = filtfilt(taps, 1, df.x.values) dff["y"] = filtfilt(taps, 1, df.y.values) # recompute velocities compute_velocities( dff, "index", names=velocities_key, distance="xy", inplace=True, centered=True, fill_startend=True, ) # recompute acceleration compute_accelerations( dff, from_=("xy", "x", "y"), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=False, ) # compute_accelerations( # dff, # from_=("velocities", velocities_key[0], velocities_key[1]), # names=("au", "av", "Auv"), # centered_velocity=True, # time="index", # fill_startend=True, # inplace=True, # keep_dt=True, # ) # import columns/info ex: id or time if "id" not in import_columns: import_columns += ["id"] if import_columns: for column in import_columns: dff[column] = df[column][0] dff["X"] = np.sqrt(dff["x"] ** 2 + dff["y"] ** 2) dff[velocities_key[2]] = np.sqrt( dff[velocities_key[0]] ** 2 + dff[velocities_key[1]] ** 2 ) return dff
[docs] def posteriori_low_pass_uv( df, T=20, cutoff=4, velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], ): """Apply low pass filter to a smoothed trajectory a posteriori Parameters ---------- df: dataframe, must contain x, y T : float Filter length in days cutoff : float low pass filter cutoff frequency in cpp import_columns : list of str list of df constant columns we want to import (ex: id, platform) Return : dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, accelerations, +norms, id, platform """ from scipy.signal import filtfilt from scipy.integrate import cumulative_trapezoid from scipy.optimize import minimize # coefficients dt = df.dt.mean() / 3600 / 24 # in days from pynsitu.tseries import generate_filter taps = generate_filter(band="low", dt=dt, T=T, bandwidth=cutoff) dff = df[[velocities_key[0], velocities_key[1]]] # apply filter dff[velocities_key[0]] = filtfilt(taps, 1, df[velocities_key[0]].values) dff[velocities_key[1]] = filtfilt(taps, 1, df[velocities_key[1]].values) # recompute position # ms_x, ms_y = (df.x**2).mean(), (df.y**2).mean() x_cum = cumulative_trapezoid(dff.u, dx=df.dt.mean(), initial=0) y_cum = cumulative_trapezoid(dff.v, dx=df.dt.mean(), initial=0) def msx_difference(x_0): return ((df.x - x_0 - x_cum) ** 2).mean() def msy_difference(y_0): return ((df.y - y_0 - y_cum) ** 2).mean() x_0 = minimize(msx_difference, df.x[0]).x y_0 = minimize(msy_difference, df.y[0]).x dff["x"] = x_0 + x_cum dff["y"] = y_0 + y_cum # recompute acceleration # compute_accelerations( # dff, # from_=("xy", "x", "y"), # names=accelerations_key, # centered_velocity=True, # time="index", # fill_startend=True, # inplace=True, # keep_dt=False, # ) compute_accelerations( dff, from_=("velocities", velocities_key[0], velocities_key[1]), names=accelerations_key, centered_velocity=True, time="index", fill_startend=True, inplace=True, keep_dt=True, ) # import columns/info ex: id or time if "id" not in import_columns: import_columns += ["id"] if import_columns: for column in import_columns: dff[column] = df[column][0] dff["X"] = np.sqrt(dff["x"] ** 2 + dff["y"] ** 2) dff[velocities_key[2]] = np.sqrt( dff[velocities_key[0]] ** 2 + dff[velocities_key[1]] ** 2 ) return dff
# low_pass to integrate in a smmothing function
[docs] def low_pass_( df, T=1, cutoff=11.5, velocities_key=("velocity_east", "velocity_north", "velocity") ): """apply low pass filter on velocity and reintegrate x, y Parameters ---------- df: dataframe, must contain u, v cutoff : float, low pass filter cutoff frequency in cpp T : float Filter length in days velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc Return : dataframe with x, y, u, v """ from scipy.signal import filtfilt from scipy.integrate import cumulative_trapezoid from scipy.optimize import minimize # index = time if df.index.name != "time": if df.index.name == None: df = df.set_index("time") else: df = df.reset_index().set_index("time") # test regularly sampled dt = df.index[1] - df.index[0] assert (df.index[1:] - df.index[:-1]).mean() == dt, "Not regularly sampled" # coefficients from pynsitu.tseries import generate_filter taps = generate_filter( band="low", dt=dt / pd.Timedelta("1d"), T=T, bandwidth=cutoff ) dff = df[[velocities_key[0], velocities_key[1]]] # create dff # apply filter dff[velocities_key[0]] = filtfilt( taps, 1, df[velocities_key[0]].values, padlen=len(df[velocities_key[0]].values) - 1, ) # CHECK WITH AURELIEN dff[velocities_key[1]] = filtfilt( taps, 1, df[velocities_key[1]].values, padlen=len(df[velocities_key[1]].values) - 1, ) # recompute position # ms_x, ms_y = (df.x**2).mean(), (df.y**2).mean() x_cum = cumulative_trapezoid( dff[velocities_key[0]], dx=dt / pd.Timedelta("1s"), initial=0 ) y_cum = cumulative_trapezoid( dff[velocities_key[1]], dx=dt / pd.Timedelta("1s"), initial=0 ) def msx_difference(x_0): return ((df.x - x_0 - x_cum) ** 2).mean() def msy_difference(y_0): return ((df.y - y_0 - y_cum) ** 2).mean() x_0 = minimize(msx_difference, df.x[0]).x y_0 = minimize(msy_difference, df.y[0]).x dff["x"] = x_0 + x_cum dff["y"] = y_0 + y_cum return dff
########################################### # -----------APPLY INTERPOLATIONS METHODS------------#
[docs] def find_gap(df_gap, maxgap): """Find gaps (return time start, time end) bigger than dtmax in the dataset Parameters ---------- df_gap : original dataframe maxgap: float max gap length in seconds """ df_gap = df_gap.reset_index() time_end = df_gap[df_gap.dt > maxgap].time index_start = time_end.index.values - 1 time_start = df_gap.iloc[index_start].time return time_start.values, time_end.values
[docs] def divide_blocs(df, t_target, maxgap): """Cut out gaps bigger than maxgap and return blocs Parameters ---------- df : original dataframe t_target : pd.datetime index, interpolation times maxgap: float max gap length in seconds Returns ---------- DF : list of dataframe, blocs without gaps bigger than maxgap DF_target :list of pd.date_time_index, list of times out gaps bigger than maxgap DF_target_gap : list of pd.date_time_index, list of times in gaps bigger than maxgap """ dti = t_target[1] - t_target[0] # check that maxgap > t_target delta assert maxgap > dti / pd.Timedelta( "1s" ), "maxgap should be bigger than the t_target delta" # divide into blocs if gap bigger than maxgap DF = [] DF_target = [] DF_target_gap = [] df = df.reset_index() ts, te = find_gap(df, maxgap) tcut = np.sort([df.time.min()] + list(ts) + list(te) + [df.time.max()]) tcut0 = tcut tcut_to_remove = [] for i in range(0, len(tcut) - 1, 2): test1 = df.time >= tcut[i] test2 = df.time <= tcut[i + 1] test = test1 & test2 df_ = df[test] # if less than two points in the if ( len(df_) < 5 or (df_.time.max() - df_.time.min()) < dti * 6 ): # minimum of 5 values for matrice n in variational method not to be singular print( "WARNING: not enougth points between two gaps, include them in the gap" ) tcut_to_remove.append(tcut[i]) tcut_to_remove.append(tcut[i + 1]) continue else: DF.append(df_) tcut = [t for t in tcut if t not in tcut_to_remove] for i in range(0, len(tcut) - 1, 2): test = (t_target >= tcut[i]) & (t_target <= tcut[i + 1]) DF_target.append(t_target[test]) DF_target_gap.append(t_target[np.logical_not(test)]) return DF, DF_target, DF_target_gap
[docs] def gap_array(time, t_target): """Returns time distance between time in t_target and their nearest neightbor in time Parameters ---------- time : ndarray of datetime, t_target : ndarray of datetime, """ nt = len(t_target) assert t_target[0] >= time[0], "time_target[0] is not within time span" assert t_target[-1] <= time[-1], "time_target[-1] is not within time span" d = np.abs(time.reshape(len(time), 1) - t_target.reshape(1, nt)) i_closest = np.argmin(d, axis=0) t_closest = np.min(d, axis=0) return (t_target - time[i_closest]) / pd.Timedelta("1s")
[docs] def smooth( df, method, t_target, maxgap=4 * 3600, parameters=dict(), velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], spectral_diff=False, geo=True, ): """ Smooth and interpolated a trajectory Parameters: ----------- df : dataframe with raw trajectory, must contain 'time','x','y', 'u', 'v', 'dt' method : str smoothing method among : 'spydell', 'variational' or 'lowess' t_target: `pandas.core.indexes.datetimes.DatetimeIndex` or str Output time series, as typically given by pd.date_range or the delta time of the output time series as str In this case, t_target is then recomputed taking start-end the start end of the input trajectory and the given delta time maxgap : float, max gap tolerated in SECONDS parameters : dict, contains all parameters to give to method : - variational : dict(acc_cut =, position_error=, acceleration_amplitude=, acceleration_T=,time_chunk=) - lowess : dict(degree=) - spydell : dict(acc_cut =, nb_pt_mean=) velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc import_columns : list of str, list of df constant columns we want to import (ex: id, platform) geo: boolean, optional if geo obj with projection acc: boolean, optional compute acceleration Return : interpolated dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, +norms, id, platform with index time """ # check maxgap assert maxgap > df.dt.min(), "maxgap smaller than the smallest dt" # index = time if df.index.name != "time": if df.index.name == None: df = df.set_index("time") else: df = df.reset_index().set_index("time") # check time index is sorted if not df.index.is_monotonic_increasing: df.sort_index(inplace=True) # t_target if isinstance(t_target, str): t_target = pd.date_range( df.index.min().ceil(t_target), df.index.max(), freq=t_target ) # add ceil so all drifters in smooth_all are on the same time grid else: # enforce t_target type t_target = pd.DatetimeIndex(t_target) # divide into blocs if gap bigger than maxgap DF, DF_target, DF_target_gap = divide_blocs(df, t_target, maxgap) print(f"Divided into {len(DF)} segments") # APPLY ON SEGMENTS if method == "variational" or method == "spydell": DF_out = [] for i in range(len(DF)): try: df_, t_target_ = DF[i], DF_target[i] except: continue # if DF_target is empty if method == "variational": param = [ "acc_cut", "position_error", "acceleration_amplitude", "acceleration_T", "time_chunk", ] assert np.all( [p in param for p in parameters] ), f"parameters keys must be in {param}" # try : df_out = variational_smooth( df_, t_target_, **parameters, velocities_key=velocities_key, accelerations_key=accelerations_key, import_columns=import_columns, spectral_diff=spectral_diff, geo=geo, ) # except : # assert False, (df_, t_target_, len(df_), len(t_target_)) elif method == "spydell": param = ["acc_cut", "nb_pt_mean"] assert np.all( [p in param for p in parameters] ), f"parameters keys must be in {param}" try: df_out = spydell_smooth( df_, t_target_, **parameters, velocities_key=velocities_key, accelerations_key=accelerations_key, import_columns=import_columns, spectral_diff=spectral_diff, geo=geo, ) except: assert False, (df_, t_target_, len(df_), len(t_target_)) DF_out.append(df_out) dfo = pd.concat(DF_out) dfo = dfo.reindex(t_target).interpolate( method="slinear", limit_area="inside" ) # LINEAR INTERPOLATION IN GAPS # APPLY ON the whole trajectory (linear interpolation in gaps already done by LOWESS) elif method == "lowess": param = ["degree", "iteration", "nb", "T_low_pass", "cutoff_low_pass"] assert np.all( [p in param for p in parameters] ), f"parameters keys must be in {param}" try: dfo = lowess_smooth( df, t_target, **parameters, velocities_key=velocities_key, accelerations_key=accelerations_key, import_columns=import_columns, spectral_diff=spectral_diff, geo=geo, ) except: assert False, (df, t_target) else: assert False, 'method must be "variational", "lowess", or "spydell"' # gap_mask : t_target_values = pd.DatetimeIndex( pd.concat([pd.Series(dti) for dti in DF_target]) ).sort_values() gapmask = pd.DataFrame(index=t_target_values) gapmask["gap_mask"] = 1 # 1 where a value is computed gapmask = gapmask.reindex( t_target, fill_value=0 ) # 0 in gaps = linearly interpolated data dfo = pd.concat([dfo, gapmask], axis=1) # gap value : time distance to the nearest neightbors in seconds dfo["gaps"] = gap_array(df.index.values, t_target.values) # import columns/info ex: id or time if "id" not in import_columns: import_columns += ["id"] if import_columns: for column in import_columns: dfo[column] = df[column][0] return dfo
[docs] def smooth_all( df, method, t_target, maxgap=4 * 3600, parameters=dict(), velocities_key=("velocity_east", "velocity_north", "velocity"), accelerations_key=("acceleration_east", "acceleration_north", "acceleration"), import_columns=["id"], spectral_diff=True, geo=True, ): """ Smooth and interpolated all trajectories Parameters: ----------- df : dataframe with raw trajectory, must contain 'time', 'velocity_east', 'velocity_north' method : str smoothing method among : 'spydell', 'variational' or 'lowess' t_target: `pandas.core.indexes.datetimes.DatetimeIndex` or str Output time series, as typically given by pd.date_range or the delta time of the output time series as str In this case, t_target is then recomputed taking start-end the start end of the input trajectory and the given delta time maxgap : float, max gap tolerated in SECONDS parameters : dict, contains all parameters to give to method : - variational : dict(acc_cut =, position_error=, acceleration_amplitude=, acceleration_T=,time_chunk=) - lowess : dict(degree=) - spydell : dict(acc_cut =, nb_pt_mean=) velocities_key : (,,) of str, ex : ('velocity_east','velocity_north', 'velocity') or ('u','v', 'U') etc accelerations_key : (,,) of str, ex : ('acceleration_east','acceleration_north', 'acceleration') or ('ax','ay', 'Axy') or ('au','av', 'Auv') etc import_columns : list of str, list of df constant columns we want to import (ex: id, platform) geo: boolean, optional if geo obj with projection acc: boolean, optional compute acceleration Return : interpolated dataframe with x, y, u, v, ax-ay computed from xy, au-av computed from u-v, +norms, id, platform with index time """ dfa = df.groupby("id").apply( smooth, method, t_target, maxgap, parameters, velocities_key, accelerations_key, import_columns, spectral_diff, geo, ) dfa = ( dfa.reset_index(level="id", drop=True) .reset_index() .rename(columns={"index": "time"}) .set_index("time") ) return dfa
################################################################################# # ------------------------ time window processing -------------------------------
[docs] def time_window_processing( df, myfun, T, overlap=0.5, id_label="id", dt=None, limit=None, geo=None, xy=None, **myfun_kwargs, ): """Break each drifter time series into time windows and process each windows myfun signature must be myfun(df, **kwargs) and it must return a pandas Series Drop duplicates if a `date` column is present Parameters ---------- df: Dataframe This dataframe represents a drifter time series myfun Method that will be applied to each window T: float, pd.Timedelta Length of the time windows, must be in the same dtype and units than column "time" overlap: float Amount of overlap between temporal windows. Should be between 0 and 1. Default is 0.5 id_label: str, optional Label used to identify drifters dt: float, str Conform time series to some time step, if string must conform to rule option of pandas resample method geo: boolean Turns on geographic processing of spatial coordinates xy: tuple specify x, y spatial coordinates if not geographic **myfun_kwargs Keyword arguments for myfun """ from pandas.api.types import is_datetime64_any_dtype as is_datetime if hasattr(df, id_label): dr_id = df[id_label].unique()[0] elif df.index.name == id_label: dr_id = df.index.unique()[0] elif hasattr(df, "name"): # when mapped after groupby dr_id = df.name else: assert False, "Cannot find float id" # # dim_x, dim_y, geo = guess_spatial_dims(df) if geo != None: # old, used to go through 3 vectors # df = compute_vector(df, lon_key=dim_x, lat_key=dim_y) # new, leverage GeoAccessor df.geo.project() proj = df.geo.projection if df.index.name == "time": df = df.reset_index() # drop duplicated values - requires a date column if "date" in df.columns: df = df.drop_duplicates(subset="date") # p = p.where(p.time.diff() != 0).dropna() # duplicates - old df = df.sort_values("time") t_is_date = is_datetime(df["time"]) if isinstance(T, str): T = pd.Timedelta(T) # temporal resampling to fill gaps if dt != None: if isinstance(dt, float): # enforce regular sampling tmin, tmax = df.index[0], df.index[-1] tmax = tmin + int((tmax - tmin) / dt) * dt regular_time = np.arange(tmin, tmax, dt) df = df.reindex(regular_time).interpolate(limit=limit) elif isinstance(dt, str): # df = df.set_index("date").resample(dt).pad().reset_index() c = None if t_is_date: c = "time" elif "date" in df.columns and is_datetime(df["date"]): c = "date" assert ( c is not None ), "dt is str but no `time` nor `date` columns are datetime-like" # df = df.set_index(c).resample(dt).interpolate(limit=limit) df = _resample(df.set_index(c), dt, limit=limit) # fill some NaNs # df[id_label] = df[id_label].interpolate() # if c == "date": # df["time"] = df["time"].interpolate() df = df.reset_index() # by default converts to days then dt = pd.Timedelta(dt) / pd.Timedelta("1d") if geo is not None: df.geo.compute_lonlat() # df = df.set_index("time") tmin, tmax = df.index[0], df.index[-1] # if geo is not None: xy = ["lon", "lat"] elif xy is not None: xy = list(xy) else: xy = [] out = None t = tmin while t + T < tmax: # _df = df.reset_index() _df = _df.loc[(_df.time >= t) & (_df.time < t + T)].set_index("time") # _df = df.loc[t : t + T] # not robust for floats if t_is_date: # iloc because pandas include the last date _df = _df.iloc[:-1, :] # compute average position if geo: # x, y = mean_position(_df, Lx=Lx) x, y = proj.xy2lonlat(_df["x"].mean(), _df["y"].mean()) else: x, y = _df[xy[0]].mean(), _df[xy[1]].mean() # apply myfun myfun_out = myfun(df, **myfun_kwargs) if out is None: size_out = myfun_out.index.size columns_out = xy + ["id"] + list(myfun_out.index) out = pd.DataFrame({c: [] for c in columns_out}) # combine with mean position and time if myfun_out.index.size == size_out: out.loc[t + T / 2.0] = [x, y] + [dr_id] + list(myfun_out) t += T * (1 - overlap) out.index = out.index.rename("time") return out
# kept for archives - not used anymore def _mean_position(df, Lx=None): """Compute the mean position of a dataframe !!! to be overhauled !!! Parameters: ----------- df: dafaframe dataframe containing position data Lx: float, optional Domain width for periodical domains """ # guess grid type dim_x, dim_y, geo = guess_spatial_dims(df) # lon = next((c for c in df.columns if "lon" in c.lower()), None) # lat = next((c for c in df.columns if "lat" in c.lower()), None) if geo: lon, lat = dim_x, dim_y if "v0" not in df: df = compute_vector(df, lon_key=lon, lat_key=lat) mean = compute_lonlat( df.mean(), dropv=True, lon_key=lon, lat_key=lat, ) return mean[lon], mean[lat] else: if Lx is not None: x = ( ( np.angle(np.exp(1j * (df[dim_x] * 2.0 * np.pi / L - np.pi)).mean()) + np.pi ) * Lx / 2.0 / np.pi ) else: x = df[dim_x].mean() y = df[dim_y].mean() return x, y def _resample(df, dt, **kwargs): """resample with object dtypes""" # .interpolate(limit=limit) # https://github.com/pandas-dev/pandas/issues/53631 # interpolate number columns df_numbers = df.select_dtypes(include=["number"]).resample(dt).interpolate(**kwargs) # and forward-fill non-number columns df_non_numbers = df.select_dtypes(exclude=["number"]).resample(dt).ffill() # combine the two df = pd.concat([df_numbers, df_non_numbers], axis=1) return df ################################################################################# # ------------------------ OPTIMIZE METHODS ------------------------------- optimized_parameters_lowess = dict( degree=2, iteration=2, T_low_pass=0.45, nb=4, cutoff_low_pass=8 ) optimized_parameters_var = dict( acc_cut=3e-4, position_error=70, acceleration_amplitude=7.5e-6, acceleration_T=0.15 * 86400, time_chunk=2, ) optimized_parameters_spydell = dict(nb_pt_mean=7, acc_cut=3e-4) maxgap = 3 * 3600