Source code for ocmw.calval.xtrackReader

# -*- coding: utf-8 -*-

"""
 Data loader for XTRACK netCDF4 dat files

 Version: 0.0
 Author: C.P. Old

 History:
 28 January 2020 - Initial code creation and testing

"""

# ----------------------------------------------------------------------------
#   IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
# Non-Standard Python Dependencies
import numpy as np
from netCDF4 import Dataset
import utm
from ocmw.core.in_poly_funcs import cn_PnPoly 
# Non-Standard Python Dependencies
# Other Dependencies

# ----------------------------------------------------------------------------
#   GLOBAL VARIABLES
# ----------------------------------------------------------------------------


# ----------------------------------------------------------------------------
#   CLASS DEFINITIONS
# ----------------------------------------------------------------------------


#--------------------------------------------------------------------------
# FUNCTION DEFINITIONS
#--------------------------------------------------------------------------

[docs] def isInDomain(east,north,domain): """ Test whether a point is inside the model domain extent """ indx, = np.where((east>domain[0])&(east<domain[1])&(north>domain[2])&(north<domain[3])) return indx
[docs] def isInPolygon(east,north,polygon): """ Test whether a point is inside the model domain polygon """ points = np.asarray([east[:],north[:]]).transpose() inpoly = [cn_PnPoly(pt,polygon) for pt in points] indx, = np.where(inpoly) return indx
[docs] def latlon2utm(lat,lon,zone=30,region='N'): """ Convert lat/lon to easting/northing """ pos = utm.from_latlon(lat[:],lon[:],zone,region) east = pos[0] north = pos[1] return [east,north]
[docs] def readConstitName(constit): """ Get tidal constituent name """ indx = np.where(constit.mask==False) name = str(np.asarray(constit)[indx],"UTF-8") return name
[docs] def extractXTrackValues(xtvars,index): """ Extract the XTrack data values """ harm = [] for h in np.arange(xtvars['ncnst']): constit = xtvars['constit'][h] name = readConstitName(constit) harm = np.append(harm,name) ampl = xtvars['amp'].data[index] phase = xtvars['pha'].data[index] mserr = xtvars['mse'].data[index] return harm,ampl,phase,mserr
[docs] def loadVars(ncfile): """ Load the XTrack data from netCDF into a data dictionary. """ nc = Dataset(ncfile,'r') dict = {} # dimensions dict['ncnst'] = nc.dimensions['constituent'].size dict['nmlen'] = nc.dimensions['namelength'].size dict['nrecs'] = nc.dimensions['records'].size # variables dict['constit'] = nc.variables['constituentname'][:] dict['lat'] = nc.variables['lat'][:] dict['lon'] = nc.variables['lon'][:] dict['amp'] = nc.variables['amplitude'][:] dict['pha'] = nc.variables['phase_lag'][:] dict['mse'] = nc.variables['mean_square_error'][:] dict['bce'] = nc.variables['bg_contamination_error'][:] dict['smp'] = nc.variables['sample'][:] nc.close() return dict
[docs] def getPointsInDomain(xtvars,domain): """ Identify the XTrack record points that lie within the model domain extent """ [east,north] = latlon2utm(xtvars['lat'],xtvars['lon']) indx = isInDomain(east,north,domain) if np.asarray(indx).size > 0: points = np.asarray([east[indx],north[indx]]) else: points = np.asarray([]) return points,indx
[docs] def getPointsInPolygon(xtvars,polygon): """ Identify the XTrack record points that lie within the model domain polygon """ [east,north] = latlon2utm(xtvars['lat'],xtvars['lon']) indx = isInPolygon(east,north,polygon) if np.asarray(indx).size > 0: points = np.asarray([east[indx],north[indx]]) else: points = np.asarray([]) return points,indx
#--------------------------------------------------------------------------