# -*- coding: utf-8 -*-
"""
Functions for applying tidal analysis to timeseries of tidal flow parameters.
"""
#Harmonic reduction of tidally forced variables
#
#Chris Old
#IES, School of Engineering, University of Edinburgh
#Aug 2023
# ----------------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
# Non-Standard Python Dependencies
import numpy as np
import scipy.io as sio
from scipy.signal import argrelextrema
from utide import solve, reconstruct
# Local Module Dependencies
from .fileTools import generateGlobalAttributes
from .spectral import nextpow2, signal_from_fft, fft
from .timeFuncs import dateStrFromDateNum
from .timeFuncs import dateNumFromDateStr, dateNumFromDatetime
from .timeFuncs import datetimeFromNum
# Other Dependencies
# ----------------------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------------------
#--------------------------------------------------------------------------
# FUNCTION DEFINITIONS
#--------------------------------------------------------------------------
[docs]
def harmonicReduction(param,dt,lat=55.8,constit='auto',meth="ols",ci='MC',fullOutput=False,verbose=False):
"""
Apply harmonic reduction to a timeseries of data using tidal harmonic constitutents.
"""
# Need to force user to pass a latitude value - currently set for SoI
coef = solve(dt,param,lat=lat,constit=constit,method=meth,conf_int=ci,verbose=verbose)
if fullOutput:
mserror = coef['A_ci']
return coef, mserror
else:
return coef
[docs]
def saveTide(param,paramStr,dt,coef,outFile,descriptStr=''):
"""
Save harmonic reduction parameters to a \*.mat file.
"""
dnum = [dateNumFromDatetime(t) for t in dt]
glbAttr = generateGlobalAttributes(descriptStr)
dict = {}
dict['globalAttributes'] = glbAttr
dict['paramName'] = paramStr
dict['times'] = dnum
dict['param'] = param
dict['recon'] = reconstruct(np.asarray(dt),coef,verbose=False)
sio.savemat(outFile,dict)
return
[docs]
def signalDecomposition(dnum, val, lat=55.8, constit='auto',cutoff_period:float=25.0):
"""
Decompose a timeseries of data into tidal, non-tidal low frequency, and non-tidal high frequency components.
"""
#-----------------------------------------------------------------------------
# Apply tidal harmonic reduction
#-----------------------------------------------------------------------------
dt = []
for t in dnum:
dt.append(datetimeFromNum(t))
h_res = np.mean(val)
h = val-h_res
coef = solve(np.asarray(dt),h,lat=lat,constit=constit,method="ols",verbose=False)
pred = reconstruct(np.asarray(dt),coef,verbose=False)
#-----------------------------------------------------------------------------
# Spectral Decomposition Analysis
#-----------------------------------------------------------------------------
L = len(dnum)
t_days = dnum-np.float64(np.floor(dnum[0]))
T = np.floor(np.median((np.diff(dnum))*24.0*60.0*60.0)*1000.0)/1000.0
Fs = 1.0/T
pw2 = nextpow2(L)
n = np.power(2.0,pw2)
f = Fs*np.arange(n)/n
ts = np.arange(L)*T
cf = 1.0/(cutoff_period*60.0*60.0)
h_tide = pred['h']
h_dyn = h-h_tide
X = np.zeros([np.int64(n),])
X[0:L] = h_dyn
spec = fft(X)
lfindx = np.where(f<cf)[0] # Low freq indices
if cutoff_period*60.0*60.0 > T:
h_dyn_lf = signal_from_fft(ts,f,lfindx,spec)/(n/2) # Low freq signal
else:
h_dyn_lf = signal_from_fft(ts,f,lfindx,spec)/n # Low freq signal
h_dyn_hf = h_dyn-h_dyn_lf # High freq signal
datadict = {}
datadict['dnum'] = dnum
datadict['t_days'] = t_days
datadict['dtime'] = dt
datadict['val'] = val
datadict['val_mean'] = h_res
datadict['tidal'] = pred['h']
datadict['nontidal'] = h_dyn
datadict['nontidal_lowfreq'] = h_dyn_lf
datadict['nontidal_highfreq'] = h_dyn_hf
datadict['model'] = coef
return datadict
#---------------------------------------------------------------------------
# Tidal prediction plots and tide tables
#---------------------------------------------------------------------------
[docs]
def get_slack_water_times(dnum,vmag):
"""
Determine slack water times from a timeseries of tidal velocities.
"""
indx = argrelextrema(vmag,np.less)[0]
slack_times = [dateStrFromDateNum(dnum[i]) for i in indx]
return slack_times
[docs]
def get_tidal_range(dnum,elev):
"""
Determine tidal range data from a timeseries of tidal surface elevations.
"""
# Need to implement a filter to remove small local secondary tides
ilow = argrelextrema(elev,np.less)[0]
ihigh = argrelextrema(elev,np.greater)[0]
nlow = len(ilow)
nhigh = len(ihigh)
n = min(nlow,nhigh)
if dnum[ilow[0]]<dnum[ihigh[0]]:
lhrange = elev[ihigh[0:n]]-elev[ilow[0:n]]
hlrange = elev[ihigh[0:n-1]]-elev[ilow[1:n]]
else:
hlrange = elev[ihigh[0:n]]-elev[ilow[0:n]]
lhrange = elev[ihigh[0:n-1]]-elev[ilow[1:n]]
if dnum[ilow].size > 1:
lowTimes = [dateStrFromDateNum(dnum[i]) for i in ilow]
else:
lowTimes = [dateStrFromDateNum(dnum[ilow][0])]
if dnum[ihigh].size > 1:
highTimes = [dateStrFromDateNum(dnum[i]) for i in ihigh]
else:
highTimes = dateStrFromDateNum(dnum[ihigh][0])
return lhrange,hlrange,lowTimes,highTimes,ilow,ihigh
[docs]
def print_tide_times(lowTimes,highTimes,lowElev,highElev):
"""
Display tide times and tidal ranges based on output from get_tidal_range.
"""
n = min(np.asarray(lowTimes).size,np.asarray(highTimes).size)
if not type(lowTimes) is list:
lowTimes = [lowTimes]
if not type(highTimes) is list:
highTimes = [highTimes]
print(''.join(' ' for i in range(10)) +
'Low Tide'+
''.join(' ' for i in range(31)) +
'High Tide')
if dateNumFromDateStr(lowTimes[0])<dateNumFromDateStr(highTimes[0]):
for i in range(n):
print("{:+.1f}".format(lowElev[i])+'m ' +
lowTimes[i].split(' ')[1]+' UTC'+
''.join(' ' for i in range(10)) +
"{:+.1f}".format(highElev[i])+'m ' +
highTimes[i].split(' ')[1]+' UTC')
if len(lowTimes)>n:
print("{:+.1f}".format(lowElev[n])+'m ' +
lowTimes[n].split(' ')[1]+' UTC')
else:
print(''.join(' ' for i in range(47)) +
"{:+.1f}".format(highElev[0])+'m ' +
highTimes[0].split(' ')[1]+' UTC')
if len(lowTimes)<len(highTimes):
for i in range(n):
print("{:+.1f}".format(lowElev[i])+'m ' +
lowTimes[i].split(' ')[1]+' UTC'+
''.join(' ' for i in range(10)) +
"{:+.1f}".format(highElev[i+1])+'m ' +
highTimes[i+1].split(' ')[1]+' UTC')
else:
for i in range(n-1):
print("{:+.1f}".format(lowElev[i])+'m ' +
lowTimes[i].split(' ')[1]+' UTC'+
''.join(' ' for i in range(10)) +
"{:+.1f}".format(highElev[i+1])+'m ' +
highTimes[i+1].split(' ')[1]+' UTC')
print("{:+.1f}".format(lowElev[n-1])+'m ' +
lowTimes[n-1].split(' ')[1]+' UTC')
print()
return
#--------------------------------------------------------------------------
# MAIN PROCESS
#--------------------------------------------------------------------------
#--------------------------------------------------------------------------
# INTERFACE
#--------------------------------------------------------------------------