Source code for pynsitu.geo

import warnings

import numpy as np
import numpy.testing as npt
import xarray as xr
import pandas as pd

try:
    import pyproj

    crs_wgs84 = pyproj.CRS("EPSG:4326")
except:
    print("Warning: could not import pyproj")

import matplotlib.pyplot as plt

try:
    from bokeh.io import output_notebook, show
    from bokeh.layouts import gridplot
    from bokeh.models import HoverTool, CustomJSHover
    from bokeh.models import CrosshairTool
    from bokeh.plotting import figure
    import hvplot.pandas
except:
    print("Warning: could not import bokeh")
    CustomJSHover = lambda *args, **kargs: None

# ------------------------------ parameters ------------------------------------

g = 9.81
deg2rad = np.pi / 180.0
rad2deg = 180.0 / np.pi
deg2m = 111319
earth_radius = 6378.0

omega_earth = 2.0 * np.pi / 86164.0905

# ------------------------------ Coriolis --------------------------------------


[docs] def coriolis(lat, signed=True): """returns Coriolis frequency in rad/s""" if signed: return 2.0 * omega_earth * np.sin(lat * deg2rad) else: return 2.0 * omega_earth * np.sin(np.abs(lat) * deg2rad)
def dfdy(lat, units="1/s/m"): df = 2.0 * omega_earth * np.cos(lat * deg2rad) * deg2rad / deg2m if units == "cpd/100km": df = df * 86400 / 2.0 / np.pi * 100 * 1e3 return df # ----------------------------- projections, distances -----------------------
[docs] class projection(object): """wrapper around pyproj to easily convert to local cartesian coordinates""" def __init__(self, lon_ref, lat_ref): self.proj = pyproj.Proj( proj="aeqd", lat_0=lat_ref, lon_0=lon_ref, datum="WGS84", units="m", )
[docs] def lonlat2xy(self, lon, lat): """transforms lon, lat to x,y coordinates""" return self.proj.transform(lon, lat)
[docs] def xy2lonlat(self, x, y): """transforms x,y to lon, lat coordinates""" _inv_dir = pyproj.enums.TransformDirection.INVERSE return self.proj.transform(x, y, direction=_inv_dir)
[docs] def azimuth_distance(lon0, lat0, lon1, lat1, ellps="WGS84"): """compute azimuths and distances between two points Returns ------- az12 (deg), az21 (deg), dist (meters) """ g = pyproj.Geod(ellps=ellps) # az12,az21,dist return g.inv(lon0, lat0, lon1, lat1)
# ----------------------------- lon/lat formatters ----------------------------
[docs] def dec2degmin(dec): """decimal degrees to degrees and minutes""" sign = np.sign(dec) adeg = int(abs(dec)) min = (abs(dec) - adeg) * 60.0 return [int(sign * adeg), min]
[docs] def degmin2dec(deg, min): """converts lon or lat in deg, min to decimal""" return deg + np.sign(deg) * min / 60.0
## bokeh formatters lon_hover_formatter = CustomJSHover( code=""" var D = value; var deg = Math.abs(Math.trunc(D)); if (D<0){ var dir = "W"; } else { var dir = "E"; } var min = (Math.abs(D)-deg)*60.0; return deg + dir + " " + min.toFixed(3) """ ) lat_hover_formatter = CustomJSHover( code=""" var D = value; var deg = Math.abs(Math.trunc(D)); if (D<0){ var dir = "S"; } else { var dir = "N"; } var min = (Math.abs(D)-deg)*60.0; return deg + dir + " " + min.toFixed(3) """ ) # ----------------------------- geo extension, common part ------------------- class GeoAccessor: def __init__(self, _obj): self._lon, self._lat = self._validate(_obj) self._obj = _obj self._reset_geo() def _reset_geo(self): """reset all variables related to geo""" self._geo_proj_ref = None self._geo_proj = None def set_projection_reference(self, ref, reset=True): """set projection reference point, (lon, lat) tuple""" if reset: self._reset_geo() self._geo_proj_ref = ref @property def projection_reference(self): """define a reference projection if none is available""" if self._geo_proj_ref is None: # return the geographic center point lat, lon = self._obj[self._lat], self._obj[self._lon] lon_ref, lat_ref = lon.median(), lat.median() assert not np.isnan(lat_ref) and not np.isnan( lon_ref ), "lat, lon data do not contain any valid data" self._geo_proj_ref = (lon_ref, lat_ref) return self._geo_proj_ref @property def projection(self): if self._geo_proj is None: lonc, latc = self.projection_reference self._geo_proj = projection(lonc, latc) return self._geo_proj # ----------------------------- pandas geo extension --------------------------
[docs] @pd.api.extensions.register_dataframe_accessor("geo") class PdGeoAccessor(GeoAccessor): """Pandas DataFrame accessor in order to process geographical data""" # @staticmethod def _validate(self, obj): """verify there is a column latitude and a column longitude""" lon, lat = None, None lat_potential = ["lat", "latitude"] lon_potential = ["lon", "long", "longitude"] for c in list(obj.columns): if c.lower() in lat_potential: lat = c elif c.lower() in lon_potential: lon = c if not lat or not lon: raise AttributeError( "Did not find latitude and longitude columns. Case insentive options are: " + "/".join(lat_potential) + " , " + "/".join(lon_potential) ) else: return lon, lat
[docs] def project(self, overwrite=True): """add (x,y) projection to object""" d = self._obj if "x" not in d.columns or "y" not in d.columns or overwrite: d.loc[:, "x"], d.loc[:, "y"] = self.projection.lonlat2xy( d.loc[:, self._lon], d.loc[:, self._lat], )
[docs] def compute_lonlat(self): """update longitude and latitude from projected coordinates""" d = self._obj assert ("x" in d.columns) and ( "y" in d.columns ), "x/y coordinates must be available" d[self._lon], d[self._lat] = self.projection.xy2lonlat(d["x"], d["y"])
## --- time series and/or campaign related material
[docs] def trim(self, d): """given a deployment item, trim data""" time = self._obj.index df = self._obj.loc[(time >= d.start.time) & (time <= d.end.time)] return df
[docs] def apply_xy(self, fun, **kwargs): """apply a function that requires working with projected coordinates x/y""" # ensures projection exists self.project() # apply function df = fun(self._obj, **kwargs) # update lon/lat df.loc[:, self._lon], df.loc[:, self._lat] = self.projection.xy2lonlat( df["x"], df["y"] ) return df
[docs] def resample( self, rule, interpolate=False, # inplace=True, **kwargs, ): """temporal resampling https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.resample.html Parameters ---------- rule: DateOffset, Timedelta or str Passed to pandas.DataFrame.resample, examples: - '10T': 10 minutes - '10S': 10 seconds inplace: boolean, optional turn inplace resampling on, default is False interpolate: boolean, optional turn on interpolation for upsampling kwargs: passed to resample """ def _resample(df): df = df.resample(rule, **kwargs).mean() if interpolate: df = df.interpolate(method="linear") return df return self.apply_xy(_resample)
## --- velocity/acceleration
[docs] def compute_velocities( self, time="index", distance="geoid", centered=True, keep_dt=False, fill_startend=True, names=None, inplace=False, # need to return something to give to apply_xy ): """compute velocity Parameters ---------- time: str, optional Column name. Default is "index", i.e. considers the index distance: str, optional Method to compute distances. Default is geoid ("WGS84" with pyproj). Uses projected fields otherwise ("x", "y") centered: boolean, optional Centers velocity calculation temporally (True by default). keep_dt: boolean, optional Keeps time intervals (False by default). fill_startend : boolean, optional fill dataframe start and end (Nan values due to the derivation/centering method) (True by default). names : tuple, optional Contains columns names for eastern, northen and norm velocities ("velocity_east", "velocity_north", "velocity" by default inplace : boolean, optional if True add velocities to dataset, if False return only a dataframe with time, id (for identification) and computed velocities """ if "x" not in self._obj.columns or "y" not in self._obj.columns: self.project() df = compute_velocities( self._obj, time, names=names, centered=centered, fill_startend=fill_startend, lon_key=self._lon, lat_key=self._lat, distance=distance, keep_dt=keep_dt, inplace=inplace, ) if not inplace: return df
[docs] def compute_dt( self, time="index", fill_startend=True, inplace=False, # need to return something to give to apply_xy ): """compute dt Parameters ---------- time: str, optional Column name. Default is "index", i.e. considers the index fill_startend : boolean, optional fill dataframe start and end (Nan values due to the derivation/centering method) (True by default). inplace : boolean, optional if True add dt to dataset, if False return only a dataframe with time, id (for identification) and computed velocities """ df = compute_dt( self._obj, time, fill_startend=fill_startend, inplace=inplace, ) if not inplace: return df
[docs] def compute_accelerations( self, from_=( "velocities", "velocity_east", "velocity_north", ), names=None, centered_velocity=True, time="index", keep_dt=False, fill_startend=True, inplace=False, ): """compute acceleration from velocities or position Parameters ---------- df : dataframe, dataframe containing trajectories from_ : tuple of str, optional (key, east_name, north_name) if key = 'velocities', compute accelaration from velocities if key = 'lonlat', compute acceleration from lonlat time series if key = 'xy', compute acceleration from xy time series names : tuple, optional Contains columns names for eastern, northen and norm acceleration ("acceleration_east", "acceleration_north", "acceleration") by default centered_velocities : boolean, optional True if the velocities is centered temporally (True by default) time: str, optional Column name. Default is "index", i.e. considers the index keep_dt: boolean Keeps time intervals (False by default). fill_startend : boolean fill dataframe start and end (Nan values due to the derivation/centering method) (True by default). inplace : boolean if True add acceleration to dataset, if False return only a dataframe with time, id (for identification) and computed acceleration """ df = compute_accelerations( self._obj, from_, names, centered_velocity, time, keep_dt, fill_startend, inplace, ) if not inplace: return df
# --- transect
[docs] def compute_transect(self, ds, vmin=None, dt_max=None): """Average data along a transect of step ds Parameters ---------- ds: float transect spacing in meters vmin: float, optional ship minimum speed, used to compute a maximum search time for each transect cell dt_max: pd.Timedelta, optional maximum search time for each transect cell """ # compute velocities, thereby ensures projection exists df = self.compute_velocities() # init transect time and position t = df.index[0] x = df.x[0] y = df.y[0] if vmin is not None: dt_max = pd.Timedelta(ds / vmin, unit="seconds") T, D = [], [] while t: t, x, y, d = _step_trajectory(df, t, x, y, ds, dt_max) if t: T.append(t) D.append(d) df = pd.concat(D, axis=1).T df.loc[:, "time"] = T # compute and add along-transect coordinate dx = df.x.diff().fillna(0) dy = df.y.diff().fillna(0) s = np.sqrt(dx**2 + dy**2).cumsum() df.loc[:, "s"] = s return df.set_index("s")
# ---- plotting
[docs] def plot_bokeh( self, deployments=None, rule=None, mindec=True, velocity=False, acceleration=False, ): """Plot time series: longitude, latitude, velocities, acceleration Parameters ---------- deployments: dict-like, pynsitu.events.Deployments for instance, optional Deployments rule: str, optional resampling rule mindec: boolean Plot longitude and latitude as minute/decimals """ if rule is not None: df = self.resample(rule) else: df = self._obj if (velocity and "velocity" not in df.columns) or ( acceleration and "acceleration" not in df.columns ): df = df.geo.compute_velocities() df = df.geo.compute_accelerations() if mindec: _lon_tooltip = "@" + self._lon + "{custom}" _lat_tooltip = "@" + self._lat + "{custom}" _lon_formatter = lon_hover_formatter _lat_formatter = lat_hover_formatter # ll_formater = FuncTickFormatter(code=""" # return Math.floor(tick) + " + " + (tick % 1).toFixed(2) # """) else: _lon_tooltip = "@{" + self._lon + "}{0.4f}" _lat_tooltip = "@{" + self._lat + "}{0.4f}" _lon_formatter = "printf" _lat_formatter = "printf" output_notebook() figkwargs = dict( tools="pan,wheel_zoom,box_zoom,reset,help", width=350, height=300, x_axis_type="datetime", ) crosshair = CrosshairTool(dimensions="both") # line specs lw, c = 3, "black" from .events import Deployment if deployments is not None: if isinstance(deployments, Deployment): deployments = deployments.to_deployments() def _add_start_end(s, ymin, ymax=None): """add deployments start and end as colored vertical bars""" # _y = y.iloc[y.index.get_loc(_d.start.time), method='nearest')] if ymax is None: ymax = ymin.max() elif not isinstance(ymax, float): ymax = ymax.max() if not isinstance(ymin, float): ymin = ymin.min() if deployments is not None: for label, d in deployments.items(): s.line( x=[d.start.time, d.start.time], y=[ymin, ymax], color="cadetblue", line_width=2, ) s.line( x=[d.end.time, d.end.time], y=[ymin, ymax], color="salmon", line_width=2, ) # create a new plot and add a renderer s1 = figure(title="longitude", **figkwargs) s1.line("time", self._lon, source=df, line_width=lw, color=c) s1.add_tools( HoverTool( tooltips=[ ("Time", "@time{%F %T}"), ("longitude", _lon_tooltip), ], # formatters={ "@time": "datetime", "@" + self._lon: _lon_formatter, }, #'printf' mode="vline", ) ) _add_start_end(s1, df[self._lon]) s1.add_tools(crosshair) S = [s1] # s2 = figure(title="latitude", x_range=s1.x_range, **figkwargs) s2.line("time", self._lat, source=df, line_width=lw, color=c) s2.add_tools( HoverTool( tooltips=[ ("Time", "@time{%F %T}"), ("latitude", _lat_tooltip), ], formatters={ "@time": "datetime", "@" + self._lat: _lat_formatter, }, mode="vline", ) ) _add_start_end(s2, df[self._lat]) s2.add_tools(crosshair) S.append(s2) # if velocity: s3 = figure(title="speed", x_range=s1.x_range, **figkwargs) s3.line( "time", "velocity", source=df, line_width=lw, color=c, legend_label="velocity", ) s3.line( "time", "velocity_east", source=df, line_width=lw, color="orange", legend_label="velocity_east", ) s3.line( "time", "velocity_north", source=df, line_width=lw, color="blue", legend_label="velocity_north", ) s3.add_tools( HoverTool( tooltips=[ ("Time", "@time{%F %T}"), ("Velocity", "@{velocity}{0.3f} m/s"), ], formatters={ "@time": "datetime", "@velocity": "printf", }, mode="vline", ) ) _add_start_end(s3, -np.abs(df["velocity"]), np.abs(df["velocity"])) s3.add_tools(crosshair) S.append(s3) if acceleration: s4 = figure(title="acceleration", x_range=s1.x_range, **figkwargs) s4.line("time", "acceleration", source=df, line_width=lw, color=c) s4.line( "time", "acceleration_east", source=df, line_width=lw, color="orange", legend_label="acceleration_east", ) s4.line( "time", "acceleration_north", source=df, line_width=lw, color="blue", legend_label="acceleration_north", ) s4.add_tools( HoverTool( tooltips=[ ("Time", "@time{%F %T}"), ("Acceleration", "@{acceleration}{0.2e} m/s^2"), ], formatters={ "@time": "datetime", "@acceleration": "printf", }, mode="vline", ) ) _add_start_end(s4, -df["acceleration"], ymax=df["acceleration"]) s4.add_tools(crosshair) S.append(s4) # p = gridplot(S, ncols=2) show(p)
[docs] def plot_on_map(self, rule=None, coords="geo", **kwargs): """Produce map with trajectory on map Requires geoviews Parameters ---------- rule: str, optional resampling rule coords: str, optional Controls coordinates: - "xy": x/y space - "geo": geographical coordinates (lon/lat) **kwargs: passed to hvplot """ dkwargs = dict(hover_cols=["time"], width=600, height=400) if coords == "geo": coords = dict(x=self._lon, y=self._lat, geo=True) dkwargs["tiles"] = "CartoLight" elif coords == "xy": self.project() coords = dict(x="x", y="y") dkwargs = dict(**coords, **dkwargs) dkwargs.update(**kwargs) if rule is not None: df = self.resample(rule) else: df = self._obj return df.hvplot.points(**dkwargs), coords
def _step_trajectory(df, t, x, y, ds, dt_max): """compute next position along transect""" # select temporally df = df.loc[(df.index > t)].copy() if dt_max: df = df.loc[(df.index < t + dt_max)] # select spatially df.loc[:, "s"] = np.sqrt((df.loc[:, "x"] - x) ** 2 + (df.loc[:, "y"] - y) ** 2) df = df.loc[(df.loc[:, "s"] > ds / 2) & (df.loc[:, "s"] < 1.5 * ds)] if df.empty: return None, None, None, None t = df.reset_index().loc[:, "time"].mean() dfm = df.mean() x, y = dfm["x"], dfm["y"] return t, x, y, dfm
[docs] def compute_accelerations( df, from_=( "velocities", "velocity_east", "velocity_north", ), names=None, centered_velocity=True, time="index", keep_dt=False, fill_startend=True, inplace=False, ): """compute acceleration from velocities or position Parameters ---------- df : dataframe, dataframe containing trajectories from_ : tuple of str, (key, east_name, north_name) if key = 'velocities', compute accelaration from velocities if key = 'lonlat', compute acceleration from lonlat time series if key = 'xy', compute acceleration from xy time series if key = 'xy_spectral' compute from velocities via spectral method if key = 'velocities_spectral' compute from velocities via spectral method names : tuple, optional Contains columns names for eastern, northen and norm acceleration ("acceleration_east", "acceleration_north", "acceleration") by default centered_velocities : boolean True if the velocities is centered temporally (True by default) time: str, optional Column name. Default is "index", i.e. considers the index keep_dt: boolean Keeps time intervals (False by default). fill_startend : boolean fill dataframe start and end (Nan values due to the derivation/centering method) (True by default). inplace : boolean if True add acceleration to dataset, if False return only a dataframe with time, id (for identification) and computed acceleration """ assert from_[1] in df.columns and from_[2] in df.columns, ( from_[1] + " and/or " + from_[2] + " not in the dataframe, check names" ) if names is None: names = ("acceleration_east", "acceleration_north", "acceleration") # drop duplicates if not inplace: df = df[~df.index.duplicated(keep="first")] else: if df.index.duplicated(keep="first").any(): df.reset_index(inplace=True) df.drop_duplicates(subset="time", keep="first", inplace=True) df.set_index("time", inplace=True) # df.drop_duplicates(keep="first", inplace=True)# drop all nan lines, does not consider the index time -> pb if not inplace: df = df.copy() if not centered_velocity: warnings.warn( "Acceleration computation with non centered velocity", UserWarning ) # dt if time == "index": t = df.index.to_series() assert t.is_monotonic_increasing, "dataframe must be sorted along time" dt = t.diff() / pd.Timedelta("1s") dt.index = df.index # necessary? if "dt" in df.columns: npt.assert_allclose( dt[1:], # ignores first point, which is ultimately padded below df["dt"][1:], atol=1e-2, err_msg="dt already in present and different than derived one", ) keep_dt = True df.loc[:, "dt"] = dt else: assert df[time].is_monotonic_increasing, "dataframe must be sorted along time" t = df[time] dt = t.diff() / pd.Timedelta("1s") df.loc[:, "dt"] = dt # is_uniform = df["dt"].dropna().unique().size == 1 # compute acc from velocities assert from_[0] in [ "lonlat", "xy", "velocities", "xy_spectral", "velocities_spectral", ], "from_ should be 'lonlat', 'xy', 'velocities', 'xy_spectral', 'velocities_spectral'" if from_[0] == "velocities": if centered_velocity: w = dt / (dt + dt.shift(-1)) ae = df.loc[:, from_[1]].diff() / dt an = df.loc[:, from_[2]].diff() / dt normv = np.sqrt(df.loc[:, from_[1]] ** 2 + df.loc[:, from_[2]] ** 2) a = normv.diff() / dt df.loc[:, names[0]] = ae + (ae.shift(-1) - ae) * w df.loc[:, names[1]] = an + (an.shift(-1) - an) * w df.loc[:, names[2]] = a + (a.shift(-1) - a) * w else: dt_acc = (dt.shift(-1) + dt) * 0.5 normv = np.sqrt(df[from_[1]] ** 2 + df[from_[2]] ** 2) df.loc[:, names[0]] = (df[from_[1]].shift(-1) - df[from_[1]]) / dt_acc df.loc[:, names[1]] = (df[from_[2]].shift(-1) - df[from_[2]]) / dt_acc df.loc[:, names[2]] = (normv.shift(-1) - normv) / dt_acc # compute acc from positions in lonlat elif from_[0] == "lonlat": df_v = compute_velocities( df, from_[1], from_[2], time, None, False, False, ) dt_acc = (dt.shift(-1) + dt) * 0.5 normv = np.sqrt(df_v["velocity_east"] ** 2 + df_v["velocity_north"] ** 2) df.loc[:, names[0]] = ( df_v["velocity_east"].shift(-1) - df_v["velocity_east"] ) / dt_acc df.loc[:, names[1]] = ( df_v["velocity_north"].shift(-1) - df_v["velocity_north"] ) / dt_acc df.loc[:, names[2]] = (normv.shift(-1) - normv) / dt_acc # compute acc from positions in xy elif from_[0] == "xy": # leverage local projection, less accurate away from central point normX = np.sqrt(df[from_[1]] ** 2 + df[from_[2]] ** 2) dxdt = df[from_[1]].diff() / df["dt"] # u_i = x_i - x_{i-1} dydt = df[from_[2]].diff() / df["dt"] # v_i = y_i - y_{i-1} dXdt = normX.diff() / df["dt"] dt_acc = (dt.shift(-1) + dt) * 0.5 df.loc[:, names[0]] = (dxdt.shift(-1) - dxdt) / dt_acc df.loc[:, names[1]] = (dydt.shift(-1) - dydt) / dt_acc df.loc[:, names[2]] = (dXdt.shift(-1) - dXdt) / dt_acc elif from_[0] == "xy_spectral": normX = np.sqrt(df[from_[1]] ** 2 + df[from_[2]] ** 2) df.loc[:, names[0]] = spectral_diff(df[from_[1]], df["dt"][1:], 2) df.loc[:, names[1]] = spectral_diff(df[from_[2]], df["dt"][1:], 2) df.loc[:, names[2]] = spectral_diff(normX, df["dt"][1:], 2) elif from_[0] == "velocities_spectral": normv = np.sqrt(df[from_[1]] ** 2 + df[from_[2]] ** 2) df.loc[:, names[0]] = spectral_diff(df[from_[1]], df["dt"][1:], 1) df.loc[:, names[1]] = spectral_diff(df[from_[2]], df["dt"][1:], 1) df.loc[:, names[2]] = spectral_diff(normv, df["dt"][1:], 2) else: assert ( False ), "from_ should be 'lonlat', 'xy', 'velocities', 'xy_spectral', 'velocities_spectral'" # update acceleration norm (modified to be done for each method, including sign) # df.loc[:, names[2]] = np.sqrt(df[names[0]] ** 2 + df[names[1]] ** 2) if not keep_dt: del df["dt"] # fill end values if fill_startend: if inplace: df.bfill(inplace=True) df.ffill(inplace=True) else: df = df.bfill().ffill() if not inplace: return df
[docs] def compute_velocities( df, time, names, centered, fill_startend, distance, lon_key="lon", lat_key="lat", keep_dt=False, inplace=False, ): """core method to compute velocity from a dataframe Parameters ---------- df : dataframe, dataframe containing trajectories lon_key: str longitude column name in dataframe lat_key: str latitude column name in dataframe time: str Column name corresponding to time. Can be "index", in which case the index is used names : tuple Contains columns names for eastern, northen and norm velocities ("velocity_east", "velocity_north", "velocity" by default centered: boolean Centers velocity calculation temporally fill_startend : boolean fill dataframe start and end (Nan values due to the derivation/centering method) (True by default) distance: str Method to compute distances: - "geoid" is based geodetic distance and bearing ("WGS84" with pyproj) - "spectral" is a spectral estimation (requires uniform time sampling) - "xy" is from "x" and "y" columns (projected fields) keep_dt: boolean, optional Keeps time intervals (False by default). inplace : boolean, optional if True add velocities to dataset, if False return only a dataframe with time, id (for identification) and computed velocities. """ if distance == "geoid": assert lon_key in df.columns and lat_key in df.columns, ( lon_key + " and/or " + lat_key + " not in the dataframe, check names" ) if names is None: names = ("velocity_east", "velocity_north", "velocity") # drop duplicates if not inplace: df = df[~df.index.duplicated(keep="first")] else: if df.index.duplicated(keep="first").any(): df.reset_index(inplace=True) df.drop_duplicates(subset="time", keep="first", inplace=True) df.set_index("time", inplace=True) if not centered: warnings.warn("Velocity computation is not centered", UserWarning) # dt_i = t_i - t_{i-1} if time == "index": t = df.index.to_series() dt = t.diff() / pd.Timedelta("1s") dt.index = df.index # necessary? df.loc[:, "dt"] = dt else: t = df[time] dt = t.diff() / pd.Timedelta("1s") df.loc[:, "dt"] = dt # is_uniform = df["dt"].dropna().unique().size == 1 assert distance in [ "geoid", "spectral", "xy", ], "distance must be one of 'geoid', 'spectral', 'xy'" if distance == "geoid": from pyproj import Geod g = Geod(ellps="WGS84") # see: # https://pyproj4.github.io/pyproj/stable/api/geod.html#pyproj.Geod.inv # https://proj.org/usage/ellipsoids.html lon, lat = df[lon_key], df[lat_key] az12, az21, dist = g.inv(lon.shift(1), lat.shift(1), lon, lat) # need to convert into dx and dy dxdt = pd.Series(dist * np.sin(az12 * deg2rad), index=df.index) / df["dt"] dydt = pd.Series(dist * np.cos(az12 * deg2rad), index=df.index) / df["dt"] elif distance == "spectral": assert ( df.dt.iloc[2:] == df.dt.iloc[1] ).all(), "time must be regularly sampled to apply spectral method" dxdt = spectral_diff(df["x"], df["dt"][1:], 1) dydt = spectral_diff(df["y"], df["dt"][1:], 1) # skips first dt which is in general NaN centered = False elif distance == "xy": # leverage local projection, less accurate away from central point dxdt = df["x"].diff() / df["dt"] # u_i = x_i - x_{i-1} dydt = df["y"].diff() / df["dt"] # v_i = y_i - y_{i-1} if centered: w = dt / (dt + dt.shift(-1)) df.loc[:, names[0]] = dxdt + (dxdt.shift(-1) - dxdt) * w df.loc[:, names[1]] = dydt + (dydt.shift(-1) - dydt) * w # boundaries, impose constant acceleration i0, i1 = df.index[[0, -1]] # print( dxdt[1] - dt[1] / dt[2] * (dxdt[2] - dxdt[1]), dxdt[1], dxdt[2], dt[1] / dt[2] ) df.loc[i0, names[0]] = dxdt[1] - dt[1] / dt[2] * (dxdt[2] - dxdt[1]) df.loc[i0, names[1]] = dydt[1] - dt[1] / dt[2] * (dydt[2] - dydt[1]) df.loc[i1, names[0]] = dxdt[-2] + dt[-1] / dt[-2] * (dxdt[-2] - dxdt[-3]) df.loc[i1, names[1]] = dydt[-2] + dt[-1] / dt[-2] * (dydt[-2] - dydt[-3]) else: df.loc[:, names[0]] = dxdt df.loc[:, names[1]] = dydt # boundaries ? df.loc[:, names[2]] = np.sqrt(df.loc[:, names[0]] ** 2 + df.loc[:, names[1]] ** 2) if not keep_dt: del df["dt"] # fill end values if fill_startend: if inplace: df.bfill(inplace=True) df.ffill(inplace=True) else: df = df.bfill().ffill() if not inplace: return df
[docs] def compute_dt( df, time, fill_startend=True, inplace=False, ): """core method to compute dt from a dataframe Parameters ---------- df : dataframe, dataframe containing trajectories time: str Column name corresponding to time. Can be "index", in which case the index is used fill_startend : boolean fill dataframe start and end (Nan values due to the derivation/centering method) (True by default) inplace : boolean, optional if True add dt to dataset, if False return only a dataframe with time, id (for identification) and computed dt. """ # drop duplicates if not inplace: df = df[~df.index.duplicated(keep="first")] else: if df.index.duplicated(keep="first").any(): df.reset_index(inplace=True) df.drop_duplicates(subset="time", keep="first", inplace=True) df.set_index("time", inplace=True) # dt_i = t_i - t_{i-1} if time == "index": t = df.index.to_series() dt = t.diff() / pd.Timedelta("1s") dt.index = df.index # necessary? df.loc[:, "dt"] = dt else: t = df[time] dt = t.diff() / pd.Timedelta("1s") df.loc[:, "dt"] = dt # is_uniform = df["dt"].dropna().unique().size == 1 # fill end values if fill_startend: if inplace: df.bfill(inplace=True) df.ffill(inplace=True) else: df = df.bfill().ffill() if not inplace: return df
[docs] def spectral_diff(x, dt, order, dx0=0.0, time=None): """Differentiate (order=1, 2) or integrate (order=-1) spectrally a pd.Series presumed uniform Parameters ---------- x: pd.Series time series to differentiate/integrate dt: array-like time intervals used to estimate the time step and verify timeline is uniform order: int order of differentiation: 1, 2, -1 (integration) dx0: float initial values for integrations (order=-1) time: np.array array of datetimes """ if time == None: time = x.index.values from scipy.fftpack import diff # from scipy.signal import detrend _dt = np.unique(dt) assert len(_dt) == 1, "timeseries need to be uniform for spectral differentiation" assert ( not x.isnull().any() ), "position data must not contain NaNs for spectral differentation" # make signal periodic if order > 0: _t = (time - time[0]) / pd.Timedelta("1s") fit = np.polyfit(_t, x, 2) x = x - (fit[2] + fit[1] * _t + fit[0] * _t**2) # detrend # x_detrended = detrend(x) # x_trend = x - x_detrended # x = x_detrended npad = x.size // 2 xp = np.pad(x, npad, mode="reflect") # apply diff dx = diff(xp, order=order, period=xp.size) / _dt**order # adjust output if order == -1: # trend needs to be added back to the integrated signal dx += np.cumsum(dx * 0 + xp.mean()) * _dt dx = dx[npad : npad + x.size] if dx0 is not None and order == -1: # initial value is adjusted dx = dx - dx[0] + dx0 if order == 1: dx = dx + fit[1] + 2 * fit[0] * _t elif order == 2: dx = dx + fit[0] return pd.Series(dx, index=x.index)
# ----------------------------- xarray accessor --------------------------------
[docs] @xr.register_dataset_accessor("geo") class XrGeoAccessor(GeoAccessor): """Xarray Dataset accessor in order to process geographical data""" # @staticmethod def _validate(self, obj): """verify there are latitude and longitude variables""" lon, lat = None, None lat_potential = ["lat", "latitude"] lon_potential = ["lon", "longitude"] for c in list(obj.variables): if c.lower() in lat_potential: lat = c elif c.lower() in lon_potential: lon = c if not lat or not lon: raise AttributeError( "Did not find latitude and longitude variables. Case insentive options are: " + "/".join(lat_potential) + " , " + "/".join(lon_potential) ) else: return lon, lat
[docs] def project(self, overwrite=True, **kwargs): """add (x,y) projection to object""" d = self._obj if "x" not in d.variables or "y" not in d.variables or overwrite: _x, _y = self.projection.lonlat2xy(d[self._lon], d[self._lat]) dims = d[self._lon].dims d["x"], d["y"] = (dims, _x), (dims, _y) else: print("project silently fails")
[docs] def compute_lonlat(self, x=None, y=None, **kwargs): """update longitude and latitude from projected coordinates""" d = self._obj if x is None and y is None: x, y = d["x"], d["y"] lon, lat = self.projection.xy2lonlat(x, y) return (x.dims, lon), (x.dims, lat)
# time series related code # speed ...