Source code for ObsData

# -*- coding: utf-8 -*-
"""
Class structures and functions for loading observation data stored in 
netCDF formats.

:Dependencies [External]: os, numpy, datetime
:Dependencies [Internal]: waveval.DataFormats. waveval.TimeFunc, waveval.Geometry 

"""
# ----------------------------------------------------------------------------
#   IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
from waveval.DataFormats import ncdata, netcdfGeneric
from waveval.TimeFuncs import num2date, date2num
from waveval.TimeFuncs import stdTimeUnits
from waveval.Geometry import spatialCoverage
# Other Dependencies

# ----------------------------------------------------------------------------
#   GLOBAL VARIABLES
# ----------------------------------------------------------------------------


# ----------------------------------------------------------------------------
#   CLASS DEFINITIONS
# ----------------------------------------------------------------------------
# ==================== OceanSites Moored Buoy Data ======================
[docs]class wavebuoy(netcdfGeneric): """ Class structure for reading the CMEMS formatted wavebuoy netCDF files. """ def __init__(self, fileName): netcdfGeneric.__init__(self, fileName) if self.file['name'] != '': if 'platform_code' in self.file['attribs']: self.platform = ncdata(fileName).platform_code if 'time_coverage_start' in self.file['attribs']: tstr = ncdata(fileName).time_coverage_start self.time_start = tstr.replace('T', ' ').replace('Z', '') if 'time_coverage_end' in self.file['attribs']: tstr = ncdata(fileName).time_coverage_end self.time_end = tstr.replace('T', ' ').replace('Z', '') if 'LATITUDE' in self.file['vars']: self.lat_min = np.nanmin(self.getVar('LATITUDE')) self.lat_max = np.nanmax(self.getVar('LATITUDE')) lat = np.nanmedian(self.getVar('LATITUDE')) self.lat = int(lat*100000.0)/100000.0 if 'LONGITUDE' in self.file['vars']: self.lon_min = np.nanmin(self.getVar('LONGITUDE')) self.lon_max = np.nanmax(self.getVar('LONGITUDE')) lon = np.nanmedian(self.getVar('LONGITUDE')) self.lon = int(lon*100000.0)/100000.0 if 'TIME' in self.file['vars']: t = self.getVar('TIME') p = np.median(np.diff(t))*24.0 self.sample_period = round(p*3600.0)/3600.0
[docs] def getMetaData(self): path, file = os.path.split(self.file['name']) meta = [self.platform, path, file, self.lat, self.lon, self.time_start, self.time_end, self.sample_period] return meta
[docs] def getVar(self, varName): if varName in self.file['vars']: with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: if varName == 'TIME': timeUnits = self.getVarAttr('TIME', 'units') time, mask = get_nc_var(nc, 'TIME') var = date2num(num2date(time, timeUnits), stdTimeUnits) var[mask] = np.nan else: var, mask = get_nc_var(nc, varName) if mask is not None: var[mask] = np.nan if self.hasAttr(varName,'_FillValue'): fv = self.getVarAttr(varName,'_FillValue') var[var==fv] = np.NaN else: print('WARNING: Variable '+varName+' does not exist.') print('Available variables:') print(list(self.file['vars'])) var = None return var
[docs] def getQCVar(self, varName): if varName in self.file['vars']: if varName == 'TIME': timeUnits = self.getVarAttr('TIME', 'units') with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: times, t_mask = get_nc_var(nc, 'TIME') tqc, tqc_mask = get_nc_var(nc, 'TIME_QC') var = date2num(num2date(times, timeUnits), stdTimeUnits) t_unique, indx, cnts = np.unique(times, return_index=True, return_counts=True) umask = np.ones(tqc.shape, dtype=bool) umask[np.sort(indx[cnts == 1])] = False tmask = tqc_mask mask = tmask | umask else: with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: var, v_mask = get_nc_var(nc, varName) vqc, vqc_mask = get_nc_var(nc, varName+'_QC') vqc, vqc_mask = get_nc_var(nc, varName+'_QC') var[v_mask] = np.nan var[vqc != 1] = np.nan mask = np.zeros(vqc.shape, dtype=bool) mask[np.isnan(var)] = True else: print('WARNING: Variable '+varName+' does not exist.') print('Available variables:') print(list(self.file['vars'])) var = mask = None return var, mask
[docs] def getQCTimeseries(self, varName, varIndex): t, tmsk = self.getQCVar('TIME') v, vmsk = self.getQCVar(varName) msk = tmsk | vmsk[:, varIndex] times = t[~msk].copy() values = v[:, varIndex].copy() values = values[~msk] return times, values
[docs] def gapFillTimeseries(self, times, values): return times, values
# ==================== EMEC Moored Buoy Data SeaDataNet ======================
[docs]class emecwb(netcdfGeneric): """ Class structure for reading the EMEC formatted wavebuoy netCDF files. """ def __init__(self, fileName): netcdfGeneric.__init__(self, fileName) if self.file['name'] != '': if 'SDN_STATION' in self.file['vars']: self.platform = str(self.getVar('SDN_STATION')) else: self.platform = 'unknown' if 'LATITUDE' in self.file['vars']: self.lat_min = np.nanmin(self.getVar('LATITUDE')) self.lat_max = np.nanmax(self.getVar('LATITUDE')) lat = np.nanmedian(self.getVar('LATITUDE')) self.lat = int(lat*100000.0)/100000.0 if 'LONGITUDE' in self.file['vars']: self.lon_min = np.nanmin(self.getVar('LONGITUDE')) self.lon_max = np.nanmax(self.getVar('LONGITUDE')) lon = np.nanmedian(self.getVar('LONGITUDE')) self.lon = int(lon*100000.0)/100000.0 if 'TIME' in self.file['vars']: t = self.getVar('TIME').squeeze() p = np.median(np.diff(t))*24.0 self.sample_period = round(p*3600.0)/3600.0 self.time_start = num2date(t[0], stdTimeUnits).strftime() self.time_end = num2date(t[-1], stdTimeUnits).strftime()
[docs] def getMetaData(self): path, file = os.path.split(self.file['name']) meta = [self.platform, path, file, self.lat, self.lon, self.time_start, self.time_end, self.sample_period] return meta
[docs] def getVar(self, varName): if varName in self.file['vars']: with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: if varName == 'TIME': timeUnits = self.getVarAttr('TIME', 'units') time, mask = get_nc_var(nc, 'TIME') var = date2num(num2date(time, timeUnits), stdTimeUnits) var[mask] = np.nan else: var, mask = get_nc_var(nc, varName) if mask is not None: var[mask] = np.nan if self.hasAttr(varName,'_FillValue'): fv = self.getVarAttr(varName,'_FillValue') var[var==fv] = np.NaN if var.dtype == '|S1': var = (b''.join(list(var))).decode('utf-8') var = np.squeeze(var) else: print('WARNING: Variable '+varName+' does not exist.') print('Available variables:') print(list(self.file['vars'])) var = None return var
[docs] def getQCVar(self, varName): if varName in self.file['vars']: if varName == 'TIME': timeUnits = self.getVarAttr('TIME', 'units') with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: times, t_mask = get_nc_var(nc, 'TIME') tqc, tqc_mask = get_nc_var(nc, 'TIME_SEADATANET_QC') var = date2num(num2date(times, timeUnits), stdTimeUnits) t_unique, indx, cnts = np.unique(times, return_index=True, return_counts=True) umask = np.ones(tqc.shape, dtype=bool) umask[np.sort(indx[cnts == 1])] = False tmask = tqc_mask mask = tmask | umask var = np.squeeze(var) mask = np.squeeze(mask) else: with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: var, v_mask = get_nc_var(nc, varName) vqc, vqc_mask = get_nc_var(nc, varName+'_SEADATANET_QC') #vqc, vqc_mask = get_nc_var(nc, varName+'_SEADATANET_QC') var[v_mask] = np.nan var[vqc > 49] = np.nan mask = np.zeros(vqc.shape, dtype=bool) mask[np.isnan(var)] = True var = np.squeeze(var) mask = np.squeeze(mask) else: print('WARNING: Variable '+varName+' does not exist.') print('Available variables:') print(list(self.file['vars'])) var = mask = None return var, mask
[docs] def getQCTimeseries(self, varName, varIndex): t, tmsk = self.getQCVar('TIME') v, vmsk = self.getQCVar(varName) msk = tmsk | vmsk times = t[~msk].copy() values = v.copy() values = values[~msk] return times, values
[docs] def gapFillTimeseries(self, times, values): return times, values
# ====================== OceanSites Drifter Data ======================
[docs]class drifter(netcdfGeneric): """ Class structure for reading the CMEMS formatted drifter netCDF files. """ def __init__(self, fileName): netcdfGeneric.__init__(self, fileName) if self.file['name'] != '': if 'platform_code' in self.file['attribs']: self.platform = ncdata(fileName).platform_code if 'time_coverage_start' in self.file['attribs']: tstr = ncdata(fileName).time_coverage_start self.time_start = tstr.replace('T', ' ').replace('Z', '') if 'time_coverage_end' in self.file['attribs']: tstr = ncdata(fileName).time_coverage_end self.time_end = tstr.replace('T', ' ').replace('Z', '') if 'LATITUDE' in self.file['vars']: self.lat_min = np.nanmin(self.getVar('LATITUDE')) self.lat_max = np.nanmax(self.getVar('LATITUDE')) lat = np.nanmedian(self.getVar('LATITUDE')) self.lat = int(lat*100000.0)/100000.0 if 'LONGITUDE' in self.file['vars']: self.lon_min = np.nanmin(self.getVar('LONGITUDE')) self.lon_max = np.nanmax(self.getVar('LONGITUDE')) lon = np.nanmedian(self.getVar('LONGITUDE')) self.lon = int(lon*100000.0)/100000.0 if 'TIME' in self.file['vars']: t = self.getVar('TIME') p = np.median(np.diff(t))*24.0 self.sample_period = round(p*3600.0)/3600.0
[docs] def getMetaData(self): path, file = os.path.split(self.file['name']) meta = [self.platform, path, file, self.lat, self.lon, self.time_start, self.time_end, self.sample_period] return meta
[docs] def getVar(self, varName): if varName in self.file['vars']: with ncdata(self.file['name'], 'r', format='NETCDF4') as nc: if varName == 'TIME': timeUnits = self.getVarAttr('TIME', 'units') if np.ma.isMaskedArray(nc.variables[varName][:]): time = np.ma.getdata(nc.variables[varName][:]) else: time = nc.variables[varName][:] var = date2num(num2date(time, timeUnits), stdTimeUnits) else: if np.ma.isMaskedArray(nc.variables[varName][:]): var = np.ma.getdata(nc.variables[varName][:]) else: var = nc.variables[varName][:] var = nc.variables[varName][:] if self.hasAttr(varName,'_FillValue'): fv = self.getVarAttr(varName,'_FillValue') var[var==fv] = np.NaN else: print('WARNING: Variable '+varName+' does not exist.') print('Available variables:') print(list(self.file['vars'])) var = None return var
[docs] def getTrackCoverage(self): coverage = spatialCoverage() if (self.hasVar('LATITUDE') & self.hasVar('LONGITUDE')): coverage.lat_range[0] = np.nanmin(self.getVar('LATITUDE')) coverage.lat_range[1] = np.nanmax(self.getVar('LATITUDE')) coverage.lon_range[0] = np.nanmin(self.getVar('LONGITUDE')) coverage.lon_range[1] = np.nanmax(self.getVar('LONGITUDE')) return coverage
[docs] def crossesDomain(self, domain: spatialCoverage): # this belongs in a separate match module... crosses = False return crosses
# ---------------------------------------------------------------------------- # FUNCTION DEFINITIONS # ----------------------------------------------------------------------------
[docs]def is_masked(data): masked = type(data) == np.ma.core.MaskedArray return masked
[docs]def get_nc_var(ncHnd, varName): try: if (ncHnd[varName].valid_min == ' ') or (ncHnd[varName].valid_max == ' '): ncHnd.set_auto_mask(False) except: ncHnd.set_auto_mask(True) var = ncHnd.variables[varName][:].copy() if is_masked(var): data = np.ma.getdata(var) mask = np.ma.getmask(var) if mask.size == 1: mskval = mask mask = np.zeros(data.shape, dtype='bool') mask[:] = mskval else: data = var mask = None return data, mask