Source code for ocmw.design.osuTools

# -*- coding: utf-8 -*-
"""
Class objects for manipulating OSU tidal atlas data

Chris Old
IES, School of Engineering, University of Edinburgh
Dec 2023
"""

# ----------------------------------------------------------------------------
#   IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
import struct
import array
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
# Other Dependencies


# ----------------------------------------------------------------------------
#   GLOBAL VARIABLES
# ----------------------------------------------------------------------------


# ----------------------------------------------------------------------------
#   CLASS DEFINITIONS
# ----------------------------------------------------------------------------
[docs] class osuDataObj(): """ OSU data object class """ def __init__(self): self.n = 0 self.m = 0 self.nc = 0 self.latLims = [] self.lonLims = [] self.constit = [] self.data = None
[docs] class osuGridObj(): """ OSU grid object class """ def __init__(self): self.n = 0 self.m = 0 self.latLims = [] self.lonLims = [] self.dt = None self.nob = 0 self.iob = [] self.hz = None self.mz = None
[docs] class osuTidesObj(): """ OSU Tide Atlas object class """ def __init__(self, atlasFile): self.path = os.path.split(atlasFile.replace('\\','/'))[0] with open(atlasFile,'r') as fha: self.hfile = fha.readline().replace('\\','/').strip() self.ufile = fha.readline().replace('\\','/').strip() self.gfile = fha.readline().replace('\\','/').strip() self.elev = osuDataObj() self.flow = osuDataObj() self.grid = osuGridObj() #self.elev.n = 0
[docs] def loadHAtlas(self): """ Load OSU elevation file """ # Open elevation file file = self.path+'/'+self.hfile if not os.path.isfile(file): print('Elevation Read: File does not exist - '+file) return fh = open(file,mode='rb') # Read header headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) header = fh.read(headLen) headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not headLen == headChk: print('Elevation Read: Failed checksum test - header') fh.close() return self.elev.n = int.from_bytes(header[0:4],byteorder='big', signed=False) self.elev.m = int.from_bytes(header[4:8],byteorder='big', signed=False) self.elev.nc = int.from_bytes(header[8:12],byteorder='big', signed=False) self.elev.latLims = struct.unpack('!2f',header[12:20]) self.elev.lonLims = struct.unpack('!2f',header[20:28]) #self.elev.constit=[] for ic in np.arange(self.elev.nc): nbytes = 4 i1 = 28+(ic*nbytes) i2 = i1+nbytes self.elev.constit.append(header[i1:i2].decode("utf-8")) # Read elevation data self.elev.data = np.zeros((2,self.elev.n,self.elev.m,self.elev.nc)) for ic in np.arange(self.elev.nc): darray = array.array('f') dataLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) darray.frombytes(fh.read(dataLen)) dataChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not dataLen == dataChk: print('Elelvation Read: Failed checksum test - hdata') fh.close() return darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.elev.m,self.elev.n,2) self.elev.data[0,:,:,ic] = np.squeeze(darray[:,:,0]).transpose() self.elev.data[1,:,:,ic] = np.squeeze(darray[:,:,1]).transpose() fh.close() return
[docs] def loadUAtlas(self): """ Load OSU velocity file """ file = self.path+'/'+self.ufile if not os.path.isfile(file): print('Velocity Read: File does not exist - '+file) return fh = open(file,mode='rb') # Read header headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) header = fh.read(headLen) headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not headLen == headChk: print('Velocity Read: Failed checksum test - header') fh.close() return self.flow.n = int.from_bytes(header[0:4],byteorder='big', signed=False) self.flow.m = int.from_bytes(header[4:8],byteorder='big', signed=False) self.flow.nc = int.from_bytes(header[8:12],byteorder='big', signed=False) self.flow.latLims = struct.unpack('!2f',header[12:20]) self.flow.lonLims = struct.unpack('!2f',header[20:28]) #self.flow.constit=[] for ic in np.arange(self.flow.nc): nbytes = 4 i1 = 28+(ic*nbytes) i2 = i1+nbytes self.flow.constit.append(header[i1:i2].decode("utf-8")) # Read velocity data self.flow.data = np.zeros((4,self.flow.n,self.flow.m,self.flow.nc)) for ic in np.arange(self.flow.nc): darray = array.array('f') dataLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) darray.frombytes(fh.read(dataLen)) dataChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not dataLen == dataChk: print('Velocity Read: Failed checksum test - udata') fh.close() return darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.flow.m,self.flow.n,4) self.flow.data[0,:,:,ic] = np.squeeze(darray[:,:,0]).transpose() self.flow.data[1,:,:,ic] = np.squeeze(darray[:,:,1]).transpose() self.flow.data[2,:,:,ic] = np.squeeze(darray[:,:,2]).transpose() self.flow.data[3,:,:,ic] = np.squeeze(darray[:,:,3]).transpose() fh.close() return
[docs] def loadGrid(self): """ Load OSU grid file """ # Open grid file file = self.path+'/'+self.gfile if not os.path.isfile(file): print('Grid Read: File does not exist - '+file) return fh = open(file,mode='rb') # Read header headLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) header = fh.read(headLen) headChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not headLen == headChk: print('Header Read: Failed checksum test') fh.close() return self.grid.n = int.from_bytes(header[0:4],byteorder='big', signed=False) self.grid.m = int.from_bytes(header[4:8],byteorder='big', signed=False) self.grid.latLims = struct.unpack('!2f',header[8:16]) self.grid.lonLims = struct.unpack('!2f',header[16:24]) self.grid.dt = struct.unpack('!1f',header[24:28]) self.grid.nob = int.from_bytes(header[28:32],byteorder='big', signed=False) # Read open boundary nodel locations if self.grid.nob == 0: iobLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) iob = int.from_bytes(fh.read(4),byteorder='big', signed=False) iobChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) self.grid.iob = [] else: darray = array.array('i') iobLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) darray.frombytes(fh.read(iobLen)) iobChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not iobLen == iobChk: print('Grid Read: Failed checksum test - iob') fh.close() return darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.nob,2) self.grid.iob = darray.transpose() # Read bathymetry data darray = array.array('f') hzLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) darray.frombytes(fh.read(hzLen)) hzChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not hzLen == hzChk: print('Grid Read: Failed checksum test - hz') fh.close() return darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.m,self.grid.n) self.grid.hz = darray.transpose() # Read land mask darray = array.array('i') mzLen = int.from_bytes(fh.read(4),byteorder='big', signed=False) darray.frombytes(fh.read(mzLen)) mzChk = int.from_bytes(fh.read(4),byteorder='big', signed=False) if not mzLen == mzChk: print('Grid Read: Failed checksum test - mz') fh.close() return darray = (np.asarray(darray).byteswap(inplace=True)).reshape(self.grid.m,self.grid.n) self.grid.mz = darray.transpose() fh.close() return
[docs] def loadAtlas(self): """ Load atlas data from OSU atlas files """ print(' Loading elevation data...') self.loadHAtlas() print(' Loading velocity data...') self.loadUAtlas() print(' Loading grid data...') self.loadGrid() print(' Atlas loaded.') return
[docs] def setElev(self,n,m,nc,lat_lims,lon_lims,constit,data): """ Populate atlas elevation object """ self.elev.n = n self.elev.m = m self.elev.nc = nc self.elev.latLims = lat_lims self.elev.lonLims = lon_lims self.elev.constit = constit self.elev.data = data return
[docs] def setFlow(self,n,m,nc,lat_lims,lon_lims,constit,data): """ Populate atlas velocity object """ self.flow.n = n self.flow.m = m self.flow.nc = nc self.flow.latLims = lat_lims self.flow.lonLims = lon_lims self.flow.constit = constit self.flow.data = data return
[docs] def setGrid(self,n,m,lat_lims,lon_lims,dt,nob,iob,hz,mz): """ Populate atlas grid object """ self.grid.n = n self.grid.m = m self.grid.latLims = lat_lims self.grid.lonLims = lon_lims self.grid.dt = dt self.grid.nob = nob self.grid.iob = iob self.grid.hz = hz self.grid.mz = mz return
[docs] def getConstitIndices(self,constit,subcons): indx = [] for con in subcons: if con in constit: indx.append(constit.index(con)) return indx
[docs] def model2atlas(self,model,lat_lims,lon_lims,subcon=[]): """ Extract atlas subset from OSU model atlas """ # Get atlas dimensions and constituents mlonLims = model.grid.lonLims mlatLims = model.grid.latLims n = model.grid.n m = model.grid.m dx = (mlonLims[1]-mlonLims[0])/n dy = (mlatLims[1]-mlatLims[0])/m lon = np.arange(mlonLims[0]+dx/2,mlonLims[1]-dx/2,dx) lat = np.arange(mlatLims[0]+dy/2,mlatLims[1]-dy/2,dy) ii=np.where((lon>=lon_lims[0]) & (lon<=lon_lims[1]))[0] jj=np.where((lat>=lat_lims[0]) & (lat<=lat_lims[1]))[0] th_lim = np.zeros((2,)) ph_lim = np.zeros((2,)) th_lim[0]=lat[jj[1]]-1/60 th_lim[1]=lat[jj[-1]]+1/60 if (lon_lims[0]<0) & (lon_lims[1]<0): lon_lims=lon_lims+360.0 ph_lim[1]=lon[ii[-1]]+1/60 if lon_lims[0]>0: ph_lim[0]=lon[ii[0]]-1/60 else: # passing through 0 ii1=np.where(lon>lon_lims[0]+360)[0] ii=np.concatenate((ii1,ii)) ph_lim[0]=lon[ii1[0]]-1/60-360 nloc=len(ii) mloc=len(jj) if len(subcon) == 0: ncloc = model.elev.nc cons = np.asarray(model.elev.constit)[:].tolist() cindx = np.arange(len(cons)).tolist() else: ncloc = len(subcon) modcon = np.asarray(model.elev.constit)[:].tolist() cindx = self.getConstitIndices(modcon,subcon) cons = np.asarray(model.elev.constit)[cindx].tolist() # Get atlas data hdata = np.zeros((2,nloc,mloc,ncloc)) hdata[:,:,:,:] = ((model.elev.data[:,ii,:,:])[:,:,jj,:])[:,:,:,cindx] udata = np.zeros((4,nloc,mloc,ncloc)) udata[:,:,:,:] = ((model.flow.data[:,ii,:,:])[:,:,jj,:])[:,:,:,cindx] hz = np.zeros((nloc,mloc)) hz[:,:] = (model.grid.hz[ii,:])[:,jj] mz = np.zeros((nloc,mloc)) mz[:,:] = (model.grid.mz[ii,:])[:,jj] iob = [] dt = model.grid.dt self.setElev(nloc,mloc,ncloc,th_lim,ph_lim,cons,hdata) self.setFlow(nloc,mloc,ncloc,th_lim,ph_lim,cons,udata) nob = 0 iob = [] self.setGrid(nloc,mloc,th_lim,ph_lim,dt,nob,iob,hz,mz) return
[docs] def writeHAtlas(self): """ Write OSU Elevation file """ if not os.path.isdir(self.path): print('Atlas Write: Directory does not exist - '+self.path) return # Open new elevation file for writing hfile = self.path+'/'+self.hfile fh = open(hfile,mode='wb') elev = self.elev # Write file header headLen = np.uint32(4*(7+elev.nc)).byteswap(inplace=False) fh.write(headLen) n = np.uint32(elev.n).byteswap(inplace=False) fh.write(n) m = np.uint32(elev.m).byteswap(inplace=False) fh.write(m) nc = np.uint32(elev.nc).byteswap(inplace=False) fh.write(nc) latLims = np.single(elev.latLims).byteswap(inplace=False) fh.write(latLims) lonLims = np.single(elev.lonLims).byteswap(inplace=False) fh.write(lonLims) for ic in np.arange(elev.nc): fh.write(elev.constit[ic].encode()) fh.write(headLen) # Write elevation data recLen = np.uint32(8*elev.n*elev.m).byteswap(inplace=False) for ic in np.arange(elev.nc): fh.write(recLen) hf = np.squeeze(elev.data[:,:,:,ic]) hf = hf.transpose().flatten() hf = np.single(hf).byteswap(inplace=False) fh.write(hf) fh.write(recLen) # Close new file fh.close() return
[docs] def writeUAtlas(self): """ Write OSU Velocity file """ if not os.path.isdir(self.path): print('Atlas Write: Directory does not exist - '+self.path) return # Open new velocity file for writing ufile = self.path+'/'+self.ufile fh = open(ufile,mode='wb') flow = self.flow # Write file header headLen = np.uint32(4*(7+flow.nc)).byteswap(inplace=False) fh.write(headLen) n = np.uint32(flow.n).byteswap(inplace=False) fh.write(n) m = np.uint32(flow.m).byteswap(inplace=False) fh.write(m) nc = np.uint32(flow.nc).byteswap(inplace=False) fh.write(nc) latLims = np.single(flow.latLims).byteswap(inplace=False) fh.write(latLims) lonLims = np.single(flow.lonLims).byteswap(inplace=False) fh.write(lonLims) for ic in np.arange(flow.nc): fh.write(flow.constit[ic].encode()) fh.write(headLen) # Write velocity data recLen = np.uint32(2*8*flow.n*flow.m).byteswap(inplace=False) for ic in np.arange(flow.nc): fh.write(recLen) uv = np.squeeze(flow.data[:,:,:,ic]) uv = uv.transpose().flatten() uv = np.single(uv).byteswap(inplace=False) fh.write(uv) fh.write(recLen) # Close new file fh.close() return
[docs] def writeGrid(self): """ Write OSU Grid file """ if not os.path.isdir(self.path): print('Atlas Write: Directory does not exist - '+self.path) return # Open new grid file for writing gfile = self.path+'/'+self.gfile fh = open(gfile,mode='wb') grid = self.grid # Write file header headLen = np.uint32(32).byteswap(inplace=False) fh.write(headLen) n = np.uint32(grid.n).byteswap(inplace=False) fh.write(n) m = np.uint32(grid.m).byteswap(inplace=False) fh.write(m) latLims = np.single(grid.latLims).byteswap(inplace=False) fh.write(latLims) lonLims = np.single(grid.lonLims).byteswap(inplace=False) fh.write(lonLims) dt = np.single(grid.dt).byteswap(inplace=False) fh.write(dt) nob = np.uint32(grid.nob).byteswap(inplace=False) fh.write(nob) fh.write(headLen) # Write open boundary nodes if grid.nob == 0: recLen = np.uint32(4).byteswap(inplace=False) fh.write(recLen) fh.write(np.uint32(0)) fh.write(recLen) else: recLen = np.uint32(8*grid.nob).byteswap(inplace=False) fh.write(recLen) iob = np.uint32(grid.iob).byteswap(inplace=False) fh.write(iob) fh.write(recLen) # Write bathymetry data recLen = np.uint32(4*grid.n*grid.m).byteswap(inplace=False) fh.write(recLen) hz = grid.hz.copy() hz = hz.transpose().flatten() hz = np.single(hz).byteswap(inplace=False) fh.write(hz) fh.write(recLen) # Write land mask data fh.write(recLen) mz = grid.mz.copy() mz = mz.transpose().flatten() mz = np.uint32(mz).byteswap(inplace=False) fh.write(mz) fh.write(recLen) # Close new file fh.close() return
[docs] def writeAtlasFiles(self): """ Write OSU Atlas binary files """ if not os.path.isdir(self.path): print('Atlas Write: Directory does not exist - '+self.path) return print(' Writing elevation data...') self.writeHAtlas() print(' Writing velocity data...') self.writeUAtlas() print(' Writing grid data...') self.writeGrid() print(' Atlas written.') return
# ---------------------------------------------------------------------------- # FUNCTION DEFINITIONS # ---------------------------------------------------------------------------- # ---------------------------------------------------------------------------- # MAIN PROCESS # ---------------------------------------------------------------------------- #-------------------------------------------------------------------------- # INTERFACE #--------------------------------------------------------------------------