# -*- coding: utf-8 -*-
"""
Class objects for manipulating OSU tidal atlas data
Chris Old
IES, School of Engineering, University of Edinburgh
Dec 2023
"""
# ----------------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
import struct
import array
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
# Other Dependencies
# ----------------------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------------------
[docs]
class osuDataObj():
"""
OSU data object class
"""
def __init__(self):
self.n = 0
self.m = 0
self.nc = 0
self.latLims = []
self.lonLims = []
self.constit = []
self.data = None
[docs]
class osuGridObj():
"""
OSU grid object class
"""
def __init__(self):
self.n = 0
self.m = 0
self.latLims = []
self.lonLims = []
self.dt = None
self.nob = 0
self.iob = []
self.hz = None
self.mz = None
[docs]
class osuTidesObj():
"""
OSU Tide Atlas object class
"""
def __init__(self, atlasFile):
self.path = os.path.split(atlasFile.replace('\\','/'))[0]
with open(atlasFile,'r') as fha:
self.hfile = fha.readline().replace('\\','/').strip()
self.ufile = fha.readline().replace('\\','/').strip()
self.gfile = fha.readline().replace('\\','/').strip()
self.elev = osuDataObj()
self.flow = osuDataObj()
self.grid = osuGridObj()
#self.elev.n = 0
[docs]
def loadHAtlas(self):
"""
Load OSU elevation file
"""
# Open elevation file
file = self.path+'/'+self.hfile
if not os.path.isfile(file):
print('Elevation Read: File does not exist - '+file)
return
fh = open(file,mode='rb')
# Read header
headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
header = fh.read(headLen)
headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not headLen == headChk:
print('Elevation Read: Failed checksum test - header')
fh.close()
return
self.elev.n = int.from_bytes(header[0:4],byteorder='big', signed=False)
self.elev.m = int.from_bytes(header[4:8],byteorder='big', signed=False)
self.elev.nc = int.from_bytes(header[8:12],byteorder='big', signed=False)
self.elev.latLims = struct.unpack('!2f',header[12:20])
self.elev.lonLims = struct.unpack('!2f',header[20:28])
#self.elev.constit=[]
for ic in np.arange(self.elev.nc):
nbytes = 4
i1 = 28+(ic*nbytes)
i2 = i1+nbytes
self.elev.constit.append(header[i1:i2].decode("utf-8"))
# Read elevation data
self.elev.data = np.zeros((2,self.elev.n,self.elev.m,self.elev.nc))
for ic in np.arange(self.elev.nc):
darray = array.array('f')
dataLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
darray.frombytes(fh.read(dataLen))
dataChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not dataLen == dataChk:
print('Elelvation Read: Failed checksum test - hdata')
fh.close()
return
darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.elev.m,self.elev.n,2)
self.elev.data[0,:,:,ic] = np.squeeze(darray[:,:,0]).transpose()
self.elev.data[1,:,:,ic] = np.squeeze(darray[:,:,1]).transpose()
fh.close()
return
[docs]
def loadUAtlas(self):
"""
Load OSU velocity file
"""
file = self.path+'/'+self.ufile
if not os.path.isfile(file):
print('Velocity Read: File does not exist - '+file)
return
fh = open(file,mode='rb')
# Read header
headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
header = fh.read(headLen)
headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not headLen == headChk:
print('Velocity Read: Failed checksum test - header')
fh.close()
return
self.flow.n = int.from_bytes(header[0:4],byteorder='big', signed=False)
self.flow.m = int.from_bytes(header[4:8],byteorder='big', signed=False)
self.flow.nc = int.from_bytes(header[8:12],byteorder='big', signed=False)
self.flow.latLims = struct.unpack('!2f',header[12:20])
self.flow.lonLims = struct.unpack('!2f',header[20:28])
#self.flow.constit=[]
for ic in np.arange(self.flow.nc):
nbytes = 4
i1 = 28+(ic*nbytes)
i2 = i1+nbytes
self.flow.constit.append(header[i1:i2].decode("utf-8"))
# Read velocity data
self.flow.data = np.zeros((4,self.flow.n,self.flow.m,self.flow.nc))
for ic in np.arange(self.flow.nc):
darray = array.array('f')
dataLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
darray.frombytes(fh.read(dataLen))
dataChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not dataLen == dataChk:
print('Velocity Read: Failed checksum test - udata')
fh.close()
return
darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.flow.m,self.flow.n,4)
self.flow.data[0,:,:,ic] = np.squeeze(darray[:,:,0]).transpose()
self.flow.data[1,:,:,ic] = np.squeeze(darray[:,:,1]).transpose()
self.flow.data[2,:,:,ic] = np.squeeze(darray[:,:,2]).transpose()
self.flow.data[3,:,:,ic] = np.squeeze(darray[:,:,3]).transpose()
fh.close()
return
[docs]
def loadGrid(self):
"""
Load OSU grid file
"""
# Open grid file
file = self.path+'/'+self.gfile
if not os.path.isfile(file):
print('Grid Read: File does not exist - '+file)
return
fh = open(file,mode='rb')
# Read header
headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
header = fh.read(headLen)
headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not headLen == headChk:
print('Header Read: Failed checksum test')
fh.close()
return
self.grid.n = int.from_bytes(header[0:4],byteorder='big', signed=False)
self.grid.m = int.from_bytes(header[4:8],byteorder='big', signed=False)
self.grid.latLims = struct.unpack('!2f',header[8:16])
self.grid.lonLims = struct.unpack('!2f',header[16:24])
self.grid.dt = struct.unpack('!1f',header[24:28])
self.grid.nob = int.from_bytes(header[28:32],byteorder='big', signed=False)
# Read open boundary nodel locations
if self.grid.nob == 0:
iobLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
iob = int.from_bytes(fh.read(4),byteorder='big', signed=False)
iobChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
self.grid.iob = []
else:
darray = array.array('i')
iobLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
darray.frombytes(fh.read(iobLen))
iobChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not iobLen == iobChk:
print('Grid Read: Failed checksum test - iob')
fh.close()
return
darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.nob,2)
self.grid.iob = darray.transpose()
# Read bathymetry data
darray = array.array('f')
hzLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
darray.frombytes(fh.read(hzLen))
hzChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not hzLen == hzChk:
print('Grid Read: Failed checksum test - hz')
fh.close()
return
darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.m,self.grid.n)
self.grid.hz = darray.transpose()
# Read land mask
darray = array.array('i')
mzLen = int.from_bytes(fh.read(4),byteorder='big', signed=False)
darray.frombytes(fh.read(mzLen))
mzChk = int.from_bytes(fh.read(4),byteorder='big', signed=False)
if not mzLen == mzChk:
print('Grid Read: Failed checksum test - mz')
fh.close()
return
darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.m,self.grid.n)
self.grid.mz = darray.transpose()
fh.close()
return
[docs]
def loadAtlas(self):
"""
Load atlas data from OSU atlas files
"""
print(' Loading elevation data...')
self.loadHAtlas()
print(' Loading velocity data...')
self.loadUAtlas()
print(' Loading grid data...')
self.loadGrid()
print(' Atlas loaded.')
return
[docs]
def setElev(self,n,m,nc,lat_lims,lon_lims,constit,data):
"""
Populate atlas elevation object
"""
self.elev.n = n
self.elev.m = m
self.elev.nc = nc
self.elev.latLims = lat_lims
self.elev.lonLims = lon_lims
self.elev.constit = constit
self.elev.data = data
return
[docs]
def setFlow(self,n,m,nc,lat_lims,lon_lims,constit,data):
"""
Populate atlas velocity object
"""
self.flow.n = n
self.flow.m = m
self.flow.nc = nc
self.flow.latLims = lat_lims
self.flow.lonLims = lon_lims
self.flow.constit = constit
self.flow.data = data
return
[docs]
def setGrid(self,n,m,lat_lims,lon_lims,dt,nob,iob,hz,mz):
"""
Populate atlas grid object
"""
self.grid.n = n
self.grid.m = m
self.grid.latLims = lat_lims
self.grid.lonLims = lon_lims
self.grid.dt = dt
self.grid.nob = nob
self.grid.iob = iob
self.grid.hz = hz
self.grid.mz = mz
return
[docs]
def getConstitIndices(self,constit,subcons):
indx = []
for con in subcons:
if con in constit:
indx.append(constit.index(con))
return indx
[docs]
def model2atlas(self,model,lat_lims,lon_lims,subcon=[]):
"""
Extract atlas subset from OSU model atlas
"""
# Get atlas dimensions and constituents
mlonLims = model.grid.lonLims
mlatLims = model.grid.latLims
n = model.grid.n
m = model.grid.m
dx = (mlonLims[1]-mlonLims[0])/n
dy = (mlatLims[1]-mlatLims[0])/m
lon = np.arange(mlonLims[0]+dx/2,mlonLims[1]-dx/2,dx)
lat = np.arange(mlatLims[0]+dy/2,mlatLims[1]-dy/2,dy)
ii=np.where((lon>=lon_lims[0]) & (lon<=lon_lims[1]))[0]
jj=np.where((lat>=lat_lims[0]) & (lat<=lat_lims[1]))[0]
th_lim = np.zeros((2,))
ph_lim = np.zeros((2,))
th_lim[0]=lat[jj[1]]-1/60
th_lim[1]=lat[jj[-1]]+1/60
if (lon_lims[0]<0) & (lon_lims[1]<0):
lon_lims=lon_lims+360.0
ph_lim[1]=lon[ii[-1]]+1/60
if lon_lims[0]>0:
ph_lim[0]=lon[ii[0]]-1/60
else: # passing through 0
ii1=np.where(lon>lon_lims[0]+360)[0]
ii=np.concatenate((ii1,ii))
ph_lim[0]=lon[ii1[0]]-1/60-360
nloc=len(ii)
mloc=len(jj)
if len(subcon) == 0:
ncloc = model.elev.nc
cons = np.asarray(model.elev.constit)[:].tolist()
cindx = np.arange(len(cons)).tolist()
else:
ncloc = len(subcon)
modcon = np.asarray(model.elev.constit)[:].tolist()
cindx = self.getConstitIndices(modcon,subcon)
cons = np.asarray(model.elev.constit)[cindx].tolist()
# Get atlas data
hdata = np.zeros((2,nloc,mloc,ncloc))
hdata[:,:,:,:] = ((model.elev.data[:,ii,:,:])[:,:,jj,:])[:,:,:,cindx]
udata = np.zeros((4,nloc,mloc,ncloc))
udata[:,:,:,:] = ((model.flow.data[:,ii,:,:])[:,:,jj,:])[:,:,:,cindx]
hz = np.zeros((nloc,mloc))
hz[:,:] = (model.grid.hz[ii,:])[:,jj]
mz = np.zeros((nloc,mloc))
mz[:,:] = (model.grid.mz[ii,:])[:,jj]
iob = []
dt = model.grid.dt
self.setElev(nloc,mloc,ncloc,th_lim,ph_lim,cons,hdata)
self.setFlow(nloc,mloc,ncloc,th_lim,ph_lim,cons,udata)
nob = 0
iob = []
self.setGrid(nloc,mloc,th_lim,ph_lim,dt,nob,iob,hz,mz)
return
[docs]
def writeHAtlas(self):
"""
Write OSU Elevation file
"""
if not os.path.isdir(self.path):
print('Atlas Write: Directory does not exist - '+self.path)
return
# Open new elevation file for writing
hfile = self.path+'/'+self.hfile
fh = open(hfile,mode='wb')
elev = self.elev
# Write file header
headLen = np.uint32(4*(7+elev.nc)).byteswap(inplace=False)
fh.write(headLen)
n = np.uint32(elev.n).byteswap(inplace=False)
fh.write(n)
m = np.uint32(elev.m).byteswap(inplace=False)
fh.write(m)
nc = np.uint32(elev.nc).byteswap(inplace=False)
fh.write(nc)
latLims = np.single(elev.latLims).byteswap(inplace=False)
fh.write(latLims)
lonLims = np.single(elev.lonLims).byteswap(inplace=False)
fh.write(lonLims)
for ic in np.arange(elev.nc):
fh.write(elev.constit[ic].encode())
fh.write(headLen)
# Write elevation data
recLen = np.uint32(8*elev.n*elev.m).byteswap(inplace=False)
for ic in np.arange(elev.nc):
fh.write(recLen)
hf = np.squeeze(elev.data[:,:,:,ic])
hf = hf.transpose().flatten()
hf = np.single(hf).byteswap(inplace=False)
fh.write(hf)
fh.write(recLen)
# Close new file
fh.close()
return
[docs]
def writeUAtlas(self):
"""
Write OSU Velocity file
"""
if not os.path.isdir(self.path):
print('Atlas Write: Directory does not exist - '+self.path)
return
# Open new velocity file for writing
ufile = self.path+'/'+self.ufile
fh = open(ufile,mode='wb')
flow = self.flow
# Write file header
headLen = np.uint32(4*(7+flow.nc)).byteswap(inplace=False)
fh.write(headLen)
n = np.uint32(flow.n).byteswap(inplace=False)
fh.write(n)
m = np.uint32(flow.m).byteswap(inplace=False)
fh.write(m)
nc = np.uint32(flow.nc).byteswap(inplace=False)
fh.write(nc)
latLims = np.single(flow.latLims).byteswap(inplace=False)
fh.write(latLims)
lonLims = np.single(flow.lonLims).byteswap(inplace=False)
fh.write(lonLims)
for ic in np.arange(flow.nc):
fh.write(flow.constit[ic].encode())
fh.write(headLen)
# Write velocity data
recLen = np.uint32(2*8*flow.n*flow.m).byteswap(inplace=False)
for ic in np.arange(flow.nc):
fh.write(recLen)
uv = np.squeeze(flow.data[:,:,:,ic])
uv = uv.transpose().flatten()
uv = np.single(uv).byteswap(inplace=False)
fh.write(uv)
fh.write(recLen)
# Close new file
fh.close()
return
[docs]
def writeGrid(self):
"""
Write OSU Grid file
"""
if not os.path.isdir(self.path):
print('Atlas Write: Directory does not exist - '+self.path)
return
# Open new grid file for writing
gfile = self.path+'/'+self.gfile
fh = open(gfile,mode='wb')
grid = self.grid
# Write file header
headLen = np.uint32(32).byteswap(inplace=False)
fh.write(headLen)
n = np.uint32(grid.n).byteswap(inplace=False)
fh.write(n)
m = np.uint32(grid.m).byteswap(inplace=False)
fh.write(m)
latLims = np.single(grid.latLims).byteswap(inplace=False)
fh.write(latLims)
lonLims = np.single(grid.lonLims).byteswap(inplace=False)
fh.write(lonLims)
dt = np.single(grid.dt).byteswap(inplace=False)
fh.write(dt)
nob = np.uint32(grid.nob).byteswap(inplace=False)
fh.write(nob)
fh.write(headLen)
# Write open boundary nodes
if grid.nob == 0:
recLen = np.uint32(4).byteswap(inplace=False)
fh.write(recLen)
fh.write(np.uint32(0))
fh.write(recLen)
else:
recLen = np.uint32(8*grid.nob).byteswap(inplace=False)
fh.write(recLen)
iob = np.uint32(grid.iob).byteswap(inplace=False)
fh.write(iob)
fh.write(recLen)
# Write bathymetry data
recLen = np.uint32(4*grid.n*grid.m).byteswap(inplace=False)
fh.write(recLen)
hz = grid.hz.copy()
hz = hz.transpose().flatten()
hz = np.single(hz).byteswap(inplace=False)
fh.write(hz)
fh.write(recLen)
# Write land mask data
fh.write(recLen)
mz = grid.mz.copy()
mz = mz.transpose().flatten()
mz = np.uint32(mz).byteswap(inplace=False)
fh.write(mz)
fh.write(recLen)
# Close new file
fh.close()
return
[docs]
def writeAtlasFiles(self):
"""
Write OSU Atlas binary files
"""
if not os.path.isdir(self.path):
print('Atlas Write: Directory does not exist - '+self.path)
return
print(' Writing elevation data...')
self.writeHAtlas()
print(' Writing velocity data...')
self.writeUAtlas()
print(' Writing grid data...')
self.writeGrid()
print(' Atlas written.')
return
# ----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# MAIN PROCESS
# ----------------------------------------------------------------------------
#--------------------------------------------------------------------------
# INTERFACE
#--------------------------------------------------------------------------