Source code for ocmw.dataproc.processADCPData

# -*- coding: utf-8 -*-
"""
Tools for converting ADCP data into OCMW standard format for OCMW post-processing.

Example scripts for ADCP data conversion provided.

Chris Old
IES, School of Engineering, University of Edinburgh
Oct 2023

"""

# ----------------------------------------------------------------------------
#   IMPORTS
# ----------------------------------------------------------------------------

# Standard Python Dependencies
import os
# Non-Standard Python Dependencies
import numpy as np
from netCDF4 import Dataset as ncdata
import utm
# Local Module Dependencies
from ocmw.core import timeFuncs as tf
from ocmw.core import physics as phy
from ocmw.core.fileTools import getListOfFiles, generateGlobalAttributes
from ocmw.dataman.dataReaders import netcdfGeneric
from .ocmw_extract import ocmwExtn, checkOCMWExtn
from .ocmw_extract import load_ocmw_file, save_ocmw_file
# Other Dependencies


# ----------------------------------------------------------------------------
#   GLOBAL VARIABLES
# ----------------------------------------------------------------------------
secsPerDay = 24.0*60.0*60.0

# ----------------------------------------------------------------------------
#   CLASS DEFINITIONS
# ----------------------------------------------------------------------------
[docs] class adcp_obj(netcdfGeneric): """ Class structure for reading the OCMW netCDF4 formatted ADCP data files. """ def __init__(self, fileName): netcdfGeneric.__init__(self, fileName) if self.file['name'] != '': if 'instrument_type' in self.file['attribs']: self.instrument_type = ncdata(fileName).instrument_type if 'instrument_freq' in self.file['attribs']: self.instrument_freq = ncdata(fileName).instrument_freq if 'instrument_serial_no' in self.file['attribs']: self.instrument_serial_no = ncdata(fileName).instrument_serial_no if 'site' in self.file['attribs']: self.site = ncdata(fileName).site if 'deploy_id' in self.file['attribs']: self.deployment = ncdata(fileName).deploy_id if 'geospatial_latitude' in self.file['attribs']: self.latitude = ncdata(fileName).geospatial_latitude if 'geospatial_longitude' in self.file['attribs']: self.longitude = ncdata(fileName).geospatial_longitude if 'geospatial_easting' in self.file['attribs']: self.easting = ncdata(fileName).geospatial_easting if 'geospatial_northing' in self.file['attribs']: self.northing = ncdata(fileName).geospatial_northing if 'times' in self.file['vars']: t = self.getVar('times') if 'epoch_str' in self.listVarAttrs('times'): t_epoch = tf.dateNumFromDateStr(self.getVarAttr('times','epoch_str')) else: t_epoch = 0.0 self.time_start = tf.dateStrFromDateNum(t[0]+t_epoch,tf.decTimeFmt) self.time_end = tf.dateStrFromDateNum(t[-1]+t_epoch,tf.decTimeFmt) self.times = t+t_epoch if 'bin_dist' in self.file['vars']: self.bin_dist = self.getVar('bin_dist')*1.0 if 'bin_size' in self.file['vars']: self.bin_size = self.getVar('bin_size')*1.0 if 'num_bins' in self.file['vars']: self.num_bins = self.getVar('num_bins')*1 if 'head_hab' in self.file['vars']: self.head_hab = self.getVar('head_hab')*1.0 if 'sample_rate'in self.file['vars']: self.sample_rate = self.getVar('sample_rate')*1.0 self.sample_period = 1.0/self.sample_rate else: if 'times' in self.file['vars']: t = self.getVar('times') self.sample_period = np.nanmedian(np.diff(t))*24.0*60.0*60.0 self.sample_rate = 1.0/self.sample_period else: self.sample_period = None self.sample_rate = None
[docs] def getMetaData(self): path, file = os.path.split(self.file['name']) meta = [self.instrument_type, self.instrument_freq, path, file, self.easting, self.northing, self.time_start, self.time_end, self.sample_rate, self.sample_period] return meta
[docs] def getDepth(self,d0=0.0): p = self.getVar('pressure') lat,lon = utm.to_latlon(self.easting,self.northing,30,'N') depth = phy.pressure2depth(d0,lat,p) return depth
[docs] def genDepthMask(self,d0=0.0): d = self.getDepth(d0) dcell = ((d-self.bin_dist)//self.bin_size).astype(int) mask = np.ones((len(self.times),self.num_bins))*np.nan for cell in np.arange(self.num_bins): indx = np.where(dcell > cell)[0] mask[indx,cell] = 1.0 self.depth_mask = mask return
[docs] def applyDepthMask(self,var,d0=0.0): if not hasattr(self,'depth_mask'): self.genDepthMask(d0) var_qc = var*self.depth_mask return var_qc
[docs] def applyQCFlags(self,var): var_qc = var return var_qc
[docs] def getTimeDateNum(self): t = self.getVar('times') if 'epoch_str' in self.listVarAttrs('times'): t_epoch = tf.dateNumFromDateStr(self.getVarAttr('times','epoch_str')) else: t_epoch = 0.0 t_dn = t+t_epoch return t_dn
[docs] def getSubsampleTBounds(self,period): t0 = np.floor(self.times[0]) n_samples = int(np.floor(secsPerDay/period)) TBounds = np.zeros((n_samples,2),dtype=float) for isample in np.arange(n_samples): TBounds[isample,0] = t0+(isample*period)/secsPerDay TBounds[isample,1] = t0+((isample+1)*period)/secsPerDay return TBounds
[docs] def subSampleTime(self,period): bnds = self.getSubsampleTBounds(period) t = self.times n_samples = len(bnds) t_sub = np.empty((n_samples,),dtype=float) t_sub[:] = None for ismp in np.arange(n_samples): indices = np.where((t>bnds[ismp,0]) & (t<=bnds[ismp,1]))[0] if indices.size > 0: t_sub[ismp] = np.nanmean(t[indices]) else: t_sub[ismp] = np.nanmean(bnds[ismp,:]) return t_sub
[docs] def getWindowTBounds(self,period,window): if window > period: window = period PBnds = self.getSubsampleTBounds(period) WBnds = PBnds.copy() if window < period: dt = (period - window)/2.0 else: dt = 0.0 if dt > 0.0: WBnds[:,0] = WBnds[:,0] + dt/secsPerDay WBnds[:,1] = WBnds[:,1] - dt/secsPerDay return WBnds
[docs] def average1DVar(self,varStr,period,window): bnds = self.getWindowTBounds(period,window) t = self.times n_samples = len(bnds) V_sub = np.empty((n_samples,),dtype=float) V_sub[:] = None V = self.getVar(varStr) for ismp in np.arange(n_samples): indices = np.where((t>bnds[ismp,0]) & (t<=bnds[ismp,1]))[0] if indices.size > 0: V_sub[ismp] = np.nanmean(V[indices]) return V_sub
[docs] def averageVel(self,Vel,Bnds): t = self.times n_samples = len(Bnds) v_sub = np.empty((n_samples,self.num_bins),dtype=float) v_sub[:] = None for ismp in np.arange(n_samples): indices = np.where((t>Bnds[ismp,0]) & (t<=Bnds[ismp,1]))[0] if indices.size > 0: v_sub[ismp,:] = np.nanmean(Vel[indices,:],axis=0) return v_sub
[docs] def subSampleVel(self,period,window,velType=['ENU'],d0=0.0,apply_QC=False): WBnds = self.getWindowTBounds(period,window) if 'ENU' in velType: U = self.getVar('vel_geo_east') U = self.applyDepthMask(U,d0) if apply_QC: U = self.applyQCFlags(U) V = self.getVar('vel_geo_north') V = self.applyDepthMask(V,d0) if apply_QC: V = self.applyQCFlags(V) W = self.getVar('vel_geo_vert') W = self.applyDepthMask(W,d0) if apply_QC: W = self.applyQCFlags(W) U_sub = self.averageVel(U,WBnds) V_sub = self.averageVel(V,WBnds) W_sub = self.averageVel(W,WBnds) return U_sub, V_sub, W_sub
# ---------------------------------------------------------------------------- # FUNCTION DEFINITIONS # ----------------------------------------------------------------------------
[docs] def saveADCP2ocmw(outfname,glbAttr,Loc,CRS,Data2D,Data3D): """ Save extracted ADCP data to standard OCMW internal format file. """ data = {} data['globalAttributes'] = glbAttr data['dataLoc'] = Loc data['crs'] = CRS data['timeUnits'] = 'days since '+Data2D['TS_epoch'] data['epoch'] = Data2D['TS_epoch'] data['times'] = Data2D['TS_t'] data['pressure'] = Data2D['TS_p'] data['depth'] = Data2D['TS_d'] data['height'] = Data2D['TS_h'] data['z'] = Data3D['TS_z'] data['depAvg_U'] = Data2D['TS_U'] data['depAvg_V'] = Data2D['TS_V'] data['vel_east'] = Data3D['TS_U'] data['vel_north'] = Data3D['TS_V'] data['vel_up'] = Data3D['TS_W'] #Check output file extension fpath,fname = checkOCMWExtn(outfname) save_ocmw_file(fpath,fname,data) return data
[docs] def adcp2ocmw(datapath,filename,period,window,d0=0.0): """ Convert ADCP extracted data to standard OCWM internal format and save to file. """ ncfile = datapath+'/'+filename adcp = adcp_obj(ncfile) lat = utm.to_latlon(adcp.easting, adcp.northing, 30, 'N')[0] crs = adcp.getGlobalAttr('geospatial_crs') times = adcp.subSampleTime(period) epochStr = adcp.getVarAttr('times','epoch_str') epochDN = tf.dateNumFromDateStr(epochStr) times = times - epochDN U, V, W = adcp.subSampleVel(period,window) z = adcp.head_hab + adcp.bin_dist + np.arange(adcp.num_bins)*adcp.bin_size z = np.dot(np.ones((V.shape[0],1)),z.reshape((1,len(z)))) press = adcp.average1DVar('pressure', period, window) depth = phy.pressure2depth(d0, lat, press) ''' p = np.dot(depth.reshape((len(depth),1)),np.ones((1,adcp.num_bins))) zmask = np.ones(p.shape) zmask[z>p] = np.nan ''' titleStr = "Conversion of "+filename+" to OCMW internal format" glbAttr = generateGlobalAttributes(titleStr) Loc = [adcp.easting,adcp.northing] Data2D = {} Data2D['TS_epoch'] = adcp.getVarAttr('times','epoch_str') Data2D['TS_t'] = times Data2D['TS_p'] = press Data2D['TS_d'] = depth Data2D['TS_h'] = depth-np.nanmean(depth) Data2D['TS_U'] = np.nanmean(U,1) #*zmask,1) Data2D['TS_V'] = np.nanmean(V,1) #*zmask,1) Data3D = {} Data3D['TS_z'] = z Data3D['TS_U'] = U #*zmask Data3D['TS_V'] = V #*zmask Data3D['TS_W'] = W #*zmask outpath = datapath+'/EXTRACT' if not os.path.isdir(outpath): os.mkdir(outpath) fn, fext = os.path.splitext(filename) outfname = outpath+'/'+fn+ocmwExtn data = saveADCP2ocmw(outfname,glbAttr,Loc,crs,Data2D,Data3D) return data
[docs] def getValidADCPDataIndices(depth): indices = np.arange(len(depth)) thresh = 2.5*(np.nanmax(depth)-np.nanmedian(depth)) peaks = np.where(np.abs(np.diff(depth)) > thresh)[0] if len(peaks) == 4: indices = indices[peaks[0]+3:peaks[-1]-2] elif len(peaks) == 2: if peaks[0] < 0.5*len(depth): indices = indices[peaks[0]+3:] else: indices = indices[:peaks[-1]-2] return indices
[docs] def merge_adcp_daily_files(datapath,srchstr,outfile): """ Merge individual OCWM format daily extracts into a single file for analysis. """ files = getListOfFiles(datapath,srchstr+'*'+ocmwExtn) mrgdata = {} for i,f in enumerate(files): print(f) d = load_ocmw_file(datapath,f) if i == 0: titleStr = 'Merged ADCP data - '+datapath.split('/')[-2]+' deployment' mrgdata['globalAttributes'] = generateGlobalAttributes(titleStr) mrgdata['dataLoc'] = d['dataLoc'] mrgdata['crs'] = d['crs'] mrgdata['timeUnits'] = d['timeUnits'] mrgdata['epoch'] = d['epoch'] times = d['times'][:] press = d['pressure'][:] depth = d['depth'][:] height = d['height'][:] depAvg_U = d['depAvg_U'][:] depAvg_V = d['depAvg_V'][:] z = d['z'][:,:] vel_east = d['vel_east'][:,:] vel_north = d['vel_north'][:,:] vel_up = d['vel_up'][:,:] else: times = np.hstack([times,d['times'][:]]) press = np.hstack([press,d['pressure'][:]]) depth = np.hstack([depth,d['depth'][:]]) height = np.hstack([height,d['height'][:]]) depAvg_U = np.hstack([depAvg_U,d['depAvg_U'][:]]) depAvg_V = np.hstack([depAvg_V,d['depAvg_V'][:]]) z = np.vstack([z,d['z'][:,:]]) vel_east = np.vstack([vel_east,d['vel_east'][:,:]]) vel_north = np.vstack([vel_north,d['vel_north'][:,:]]) vel_up = np.vstack([vel_up,d['vel_up'][:,:]]) valid = getValidADCPDataIndices(depth) mrgdata['times'] = times[valid] mrgdata['pressure'] = press[valid] mrgdata['depth'] = depth[valid] mrgdata['height'] = height[valid] mrgdata['depAvg_U'] = depAvg_U[valid] mrgdata['depAvg_V'] = depAvg_V[valid] mrgdata['z'] = z[valid,:] mrgdata['vel_east'] = vel_east[valid,:] mrgdata['vel_north'] = vel_north[valid,:] mrgdata['vel_up'] = vel_up[valid,:] #Check output file extension fpath,fname = checkOCMWExtn(datapath+'/'+outfile) save_ocmw_file(fpath,fname,mrgdata) return mrgdata
# ---------------------------------------------------------------------------- # MAIN PROCESS # ---------------------------------------------------------------------------- #-------------------------------------------------------------------------- # INTERFACE #--------------------------------------------------------------------------