# -*- coding: utf-8 -*-
"""
Class structures and functions for loading observation data stored in
netCDF formats.
:Dependencies [External]: os, numpy, datetime
:Dependencies [Internal]: waveval.DataFormats. waveval.TimeFunc, waveval.Geometry
"""
# ----------------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
from waveval.DataFormats import ncdata, netcdfGeneric
from waveval.TimeFuncs import num2date, date2num
from waveval.TimeFuncs import stdTimeUnits
from waveval.Geometry import spatialCoverage
# Other Dependencies
# ----------------------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------------------
# ==================== OceanSites Moored Buoy Data ======================
[docs]class wavebuoy(netcdfGeneric):
"""
Class structure for reading the CMEMS formatted wavebuoy netCDF files.
"""
def __init__(self, fileName):
netcdfGeneric.__init__(self, fileName)
if self.file['name'] != '':
if 'platform_code' in self.file['attribs']:
self.platform = ncdata(fileName).platform_code
if 'time_coverage_start' in self.file['attribs']:
tstr = ncdata(fileName).time_coverage_start
self.time_start = tstr.replace('T', ' ').replace('Z', '')
if 'time_coverage_end' in self.file['attribs']:
tstr = ncdata(fileName).time_coverage_end
self.time_end = tstr.replace('T', ' ').replace('Z', '')
if 'LATITUDE' in self.file['vars']:
self.lat_min = np.nanmin(self.getVar('LATITUDE'))
self.lat_max = np.nanmax(self.getVar('LATITUDE'))
lat = np.nanmedian(self.getVar('LATITUDE'))
self.lat = int(lat*100000.0)/100000.0
if 'LONGITUDE' in self.file['vars']:
self.lon_min = np.nanmin(self.getVar('LONGITUDE'))
self.lon_max = np.nanmax(self.getVar('LONGITUDE'))
lon = np.nanmedian(self.getVar('LONGITUDE'))
self.lon = int(lon*100000.0)/100000.0
if 'TIME' in self.file['vars']:
t = self.getVar('TIME')
p = np.median(np.diff(t))*24.0
self.sample_period = round(p*3600.0)/3600.0
[docs] def getVar(self, varName):
if varName in self.file['vars']:
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
if varName == 'TIME':
timeUnits = self.getVarAttr('TIME', 'units')
time, mask = get_nc_var(nc, 'TIME')
var = date2num(num2date(time, timeUnits), stdTimeUnits)
var[mask] = np.nan
else:
var, mask = get_nc_var(nc, varName)
if mask is not None:
var[mask] = np.nan
if self.hasAttr(varName,'_FillValue'):
fv = self.getVarAttr(varName,'_FillValue')
var[var==fv] = np.NaN
else:
print('WARNING: Variable '+varName+' does not exist.')
print('Available variables:')
print(list(self.file['vars']))
var = None
return var
[docs] def getQCVar(self, varName):
if varName in self.file['vars']:
if varName == 'TIME':
timeUnits = self.getVarAttr('TIME', 'units')
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
times, t_mask = get_nc_var(nc, 'TIME')
tqc, tqc_mask = get_nc_var(nc, 'TIME_QC')
var = date2num(num2date(times, timeUnits), stdTimeUnits)
t_unique, indx, cnts = np.unique(times,
return_index=True,
return_counts=True)
umask = np.ones(tqc.shape, dtype=bool)
umask[np.sort(indx[cnts == 1])] = False
tmask = tqc_mask
mask = tmask | umask
else:
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
var, v_mask = get_nc_var(nc, varName)
vqc, vqc_mask = get_nc_var(nc, varName+'_QC')
vqc, vqc_mask = get_nc_var(nc, varName+'_QC')
var[v_mask] = np.nan
var[vqc != 1] = np.nan
mask = np.zeros(vqc.shape, dtype=bool)
mask[np.isnan(var)] = True
else:
print('WARNING: Variable '+varName+' does not exist.')
print('Available variables:')
print(list(self.file['vars']))
var = mask = None
return var, mask
[docs] def getQCTimeseries(self, varName, varIndex):
t, tmsk = self.getQCVar('TIME')
v, vmsk = self.getQCVar(varName)
msk = tmsk | vmsk[:, varIndex]
times = t[~msk].copy()
values = v[:, varIndex].copy()
values = values[~msk]
return times, values
[docs] def gapFillTimeseries(self, times, values):
return times, values
# ==================== EMEC Moored Buoy Data SeaDataNet ======================
[docs]class emecwb(netcdfGeneric):
"""
Class structure for reading the EMEC formatted wavebuoy netCDF files.
"""
def __init__(self, fileName):
netcdfGeneric.__init__(self, fileName)
if self.file['name'] != '':
if 'SDN_STATION' in self.file['vars']:
self.platform = str(self.getVar('SDN_STATION'))
else:
self.platform = 'unknown'
if 'LATITUDE' in self.file['vars']:
self.lat_min = np.nanmin(self.getVar('LATITUDE'))
self.lat_max = np.nanmax(self.getVar('LATITUDE'))
lat = np.nanmedian(self.getVar('LATITUDE'))
self.lat = int(lat*100000.0)/100000.0
if 'LONGITUDE' in self.file['vars']:
self.lon_min = np.nanmin(self.getVar('LONGITUDE'))
self.lon_max = np.nanmax(self.getVar('LONGITUDE'))
lon = np.nanmedian(self.getVar('LONGITUDE'))
self.lon = int(lon*100000.0)/100000.0
if 'TIME' in self.file['vars']:
t = self.getVar('TIME').squeeze()
p = np.median(np.diff(t))*24.0
self.sample_period = round(p*3600.0)/3600.0
self.time_start = num2date(t[0], stdTimeUnits).strftime()
self.time_end = num2date(t[-1], stdTimeUnits).strftime()
[docs] def getVar(self, varName):
if varName in self.file['vars']:
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
if varName == 'TIME':
timeUnits = self.getVarAttr('TIME', 'units')
time, mask = get_nc_var(nc, 'TIME')
var = date2num(num2date(time, timeUnits), stdTimeUnits)
var[mask] = np.nan
else:
var, mask = get_nc_var(nc, varName)
if mask is not None:
var[mask] = np.nan
if self.hasAttr(varName,'_FillValue'):
fv = self.getVarAttr(varName,'_FillValue')
var[var==fv] = np.NaN
if var.dtype == '|S1':
var = (b''.join(list(var))).decode('utf-8')
var = np.squeeze(var)
else:
print('WARNING: Variable '+varName+' does not exist.')
print('Available variables:')
print(list(self.file['vars']))
var = None
return var
[docs] def getQCVar(self, varName):
if varName in self.file['vars']:
if varName == 'TIME':
timeUnits = self.getVarAttr('TIME', 'units')
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
times, t_mask = get_nc_var(nc, 'TIME')
tqc, tqc_mask = get_nc_var(nc, 'TIME_SEADATANET_QC')
var = date2num(num2date(times, timeUnits), stdTimeUnits)
t_unique, indx, cnts = np.unique(times,
return_index=True,
return_counts=True)
umask = np.ones(tqc.shape, dtype=bool)
umask[np.sort(indx[cnts == 1])] = False
tmask = tqc_mask
mask = tmask | umask
var = np.squeeze(var)
mask = np.squeeze(mask)
else:
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
var, v_mask = get_nc_var(nc, varName)
vqc, vqc_mask = get_nc_var(nc, varName+'_SEADATANET_QC')
#vqc, vqc_mask = get_nc_var(nc, varName+'_SEADATANET_QC')
var[v_mask] = np.nan
var[vqc > 49] = np.nan
mask = np.zeros(vqc.shape, dtype=bool)
mask[np.isnan(var)] = True
var = np.squeeze(var)
mask = np.squeeze(mask)
else:
print('WARNING: Variable '+varName+' does not exist.')
print('Available variables:')
print(list(self.file['vars']))
var = mask = None
return var, mask
[docs] def getQCTimeseries(self, varName, varIndex):
t, tmsk = self.getQCVar('TIME')
v, vmsk = self.getQCVar(varName)
msk = tmsk | vmsk
times = t[~msk].copy()
values = v.copy()
values = values[~msk]
return times, values
[docs] def gapFillTimeseries(self, times, values):
return times, values
# ====================== OceanSites Drifter Data ======================
[docs]class drifter(netcdfGeneric):
"""
Class structure for reading the CMEMS formatted drifter netCDF files.
"""
def __init__(self, fileName):
netcdfGeneric.__init__(self, fileName)
if self.file['name'] != '':
if 'platform_code' in self.file['attribs']:
self.platform = ncdata(fileName).platform_code
if 'time_coverage_start' in self.file['attribs']:
tstr = ncdata(fileName).time_coverage_start
self.time_start = tstr.replace('T', ' ').replace('Z', '')
if 'time_coverage_end' in self.file['attribs']:
tstr = ncdata(fileName).time_coverage_end
self.time_end = tstr.replace('T', ' ').replace('Z', '')
if 'LATITUDE' in self.file['vars']:
self.lat_min = np.nanmin(self.getVar('LATITUDE'))
self.lat_max = np.nanmax(self.getVar('LATITUDE'))
lat = np.nanmedian(self.getVar('LATITUDE'))
self.lat = int(lat*100000.0)/100000.0
if 'LONGITUDE' in self.file['vars']:
self.lon_min = np.nanmin(self.getVar('LONGITUDE'))
self.lon_max = np.nanmax(self.getVar('LONGITUDE'))
lon = np.nanmedian(self.getVar('LONGITUDE'))
self.lon = int(lon*100000.0)/100000.0
if 'TIME' in self.file['vars']:
t = self.getVar('TIME')
p = np.median(np.diff(t))*24.0
self.sample_period = round(p*3600.0)/3600.0
[docs] def getVar(self, varName):
if varName in self.file['vars']:
with ncdata(self.file['name'], 'r', format='NETCDF4') as nc:
if varName == 'TIME':
timeUnits = self.getVarAttr('TIME', 'units')
if np.ma.isMaskedArray(nc.variables[varName][:]):
time = np.ma.getdata(nc.variables[varName][:])
else:
time = nc.variables[varName][:]
var = date2num(num2date(time, timeUnits), stdTimeUnits)
else:
if np.ma.isMaskedArray(nc.variables[varName][:]):
var = np.ma.getdata(nc.variables[varName][:])
else:
var = nc.variables[varName][:]
var = nc.variables[varName][:]
if self.hasAttr(varName,'_FillValue'):
fv = self.getVarAttr(varName,'_FillValue')
var[var==fv] = np.NaN
else:
print('WARNING: Variable '+varName+' does not exist.')
print('Available variables:')
print(list(self.file['vars']))
var = None
return var
[docs] def getTrackCoverage(self):
coverage = spatialCoverage()
if (self.hasVar('LATITUDE') & self.hasVar('LONGITUDE')):
coverage.lat_range[0] = np.nanmin(self.getVar('LATITUDE'))
coverage.lat_range[1] = np.nanmax(self.getVar('LATITUDE'))
coverage.lon_range[0] = np.nanmin(self.getVar('LONGITUDE'))
coverage.lon_range[1] = np.nanmax(self.getVar('LONGITUDE'))
return coverage
[docs] def crossesDomain(self, domain: spatialCoverage):
# this belongs in a separate match module...
crosses = False
return crosses
# ----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# ----------------------------------------------------------------------------
[docs]def is_masked(data):
masked = type(data) == np.ma.core.MaskedArray
return masked
[docs]def get_nc_var(ncHnd, varName):
try:
if (ncHnd[varName].valid_min == ' ') or (ncHnd[varName].valid_max == ' '):
ncHnd.set_auto_mask(False)
except:
ncHnd.set_auto_mask(True)
var = ncHnd.variables[varName][:].copy()
if is_masked(var):
data = np.ma.getdata(var)
mask = np.ma.getmask(var)
if mask.size == 1:
mskval = mask
mask = np.zeros(data.shape, dtype='bool')
mask[:] = mskval
else:
data = var
mask = None
return data, mask