# -*- coding: utf-8 -*-
"""
Tools for generating an SQLite matchup database from a set of model extracts
and an archive of wavebuoy data.
These tools are tied to the ResourceCode WaveWatch III model hindcast data,
but the undelying functions for opening, populating and querying and SQLite
dataase are generic.
:Dependencies [External]: os, re, sqlite, csv
:Dependencies [Internal]: waveval.Geomtery, waveval.TimeFuncs, waveval.ObsData, waveval.ModelData
"""
# ----------------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
import re
import sqlite3
import csv
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
from waveval.Geometry import spatialCoverage
from waveval.TimeFuncs import temporalCoverage, dateNumFromStr
import waveval.ObsData as OData
import waveval.ModelData as MData
# Other Dependencies
# ----------------------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------------------
rscd_table_sql = """ CREATE TABLE IF NOT EXISTS rscd_files (
id integer PRIMARY KEY,
platform text NOT NULL,
filePath text NOT NULL,
fileName text NOT NULL,
latitude real NOT NULL,
longitude real NOT NULL,
beginDate timestamp NOT NULL,
endDate timestamp NOT NULL,
year integer NOT NULL,
month integer NOT NULL,
samplePeriod real NOT NULL
); """
buoy_table_sql = """ CREATE TABLE IF NOT EXISTS buoy_files (
id integer PRIMARY KEY,
platform text NOT NULL,
filePath text NOT NULL,
fileName text NOT NULL,
latitude real NOT NULL,
longitude real NOT NULL,
beginDate timestamp NOT NULL,
endDate timestamp NOT NULL,
samplePeriod real NOT NULL
); """
match_table_sql = """ CREATE TABLE IF NOT EXISTS match_files (
id integer PRIMARY KEY,
rscdRecID integer NOT NULL,
buoyRecID integer NOT NULL,
year integer NOT NULL,
month integer NOT NULL
); """
match_recs_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
rscd_files.latitude, rscd_files.longitude \
FROM buoy_files, rscd_files \
WHERE \
(((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \
AND \
(Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001)) \
OR \
(upper([rscd_files].[platform]) == upper([buoy_files].[platform]))) \
AND \
((buoy_files.beginDate <= rscd_files.beginDate) \
AND \
(buoy_files.endDate >= rscd_files.endDate)) \
ORDER BY rscd_files.fileName;\
"""
match_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month \
FROM buoy_files, rscd_files \
"""
val_site_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
buoy_files.beginDate, buoy_files.endDate,
buoy_files.platform \
FROM buoy_files, rscd_files \
"""
val_match_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
rscd_files.latitude, rscd_files.longitude, \
buoy_files.platform, buoy_files.beginDate, buoy_files.endDate \
FROM buoy_files, rscd_files \
"""
location_filter_sql = \
"""\
((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \
AND \
(Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001))\
"""
platform_filter_sql = \
"""\
(upper([rscd_files].[platform]) == upper([buoy_files].[platform]))\
"""
time_window_filter_sql = \
"""\
((buoy_files.beginDate <= rscd_files.beginDate) \
AND \
(buoy_files.endDate >= rscd_files.endDate)) \
"""
excludeFiles = ['RSCD_WW3-RSCD-UG-62107_201705_freq.nc',
'RSCD_WW3-RSCD-UG-62101_201704_freq.nc',
'RSCD_WW3-RSCD-UG-63057_201706_freq.nc',
'RSCD_WW3-RSCD-UG-62068_201710_freq.nc']
# ----------------------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------------------
[docs]class recordFilter:
"""
"""
def __init__(self, coverage: list):
for cover in coverage:
if type(cover) == spatialCoverage:
self.domain = cover
if type(cover) == temporalCoverage:
self.window = cover
[docs] def includeRecord(self, record, verbose=False):
hasdomain = hasattr(self, 'domain')
if hasdomain:
if hasattr(record, 'lat') & hasattr(record, 'lon'):
indomain = (record.lat > min(self.domain.lat_range)) & \
(record.lat < max(self.domain.lat_range)) & \
(record.lon > min(self.domain.lon_range)) & \
(record.lon < max(self.domain.lon_range))
if (not indomain) & verbose:
if hasattr(record, 'platform'):
print(record.platform+' outside spatial coverage.')
else:
print('record outside spatial coverage.')
else:
if verbose:
print('WARNING: Record missing lat or lon data.')
indomain = False
haswindow = hasattr(self, 'window')
if haswindow:
if hasattr(record, 'time_start') & hasattr(record, 'time_end'):
t0 = dateNumFromStr(record.time_start)
t1 = dateNumFromStr(record.time_end)
case01 = (t0 >= min(self.window.timeRangeNum)) & \
(t1 <= max(self.window.timeRangeNum))
case02 = (t0 < min(self.window.timeRangeNum)) & \
(t1 >= min(self.window.timeRangeNum))
case03 = (t0 < max(self.window.timeRangeNum)) & \
(t1 >= max(self.window.timeRangeNum))
case04 = (t0 < min(self.window.timeRangeNum)) & \
(t1 > max(self.window.timeRangeNum))
inwindow = case01 or case02 or case03 or case04
if (not inwindow) & verbose:
if hasattr(record, 'platform'):
print(record.platform+' outside temporal coverage.')
else:
print('record outside temporal coverage.')
else:
if verbose:
print('WARNING: Record missing time data.')
inwindow = False
if hasdomain:
include = indomain
if haswindow:
include = include and inwindow
elif haswindow:
include = inwindow
else:
include = False
return include
# ----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# ----------------------------------------------------------------------------
[docs]def tuples2csv(records, csv_file):
"""
"""
# open file in write mode
with open(csv_file, 'w', newline='', encoding='utf-8') as out:
csv_out=csv.writer(out)
for row in records:
csv_out.writerow(row)
return
[docs]def csv2tuples(csv_file):
"""
"""
# open file in read mode
with open(csv_file, 'r') as read_obj:
csv_reader = csv.reader(read_obj)
list_of_tuples = list(map(tuple, csv_reader))
return list_of_tuples
[docs]def mapRecord(startYr, stopYr, record, dataset):
"""
"""
records = []
rscdpath = record[0]
dpath = rscdpath.split(record[4])[0]
rscdfile = record[1]
defStr = record[4]+record[5].zfill(2)
nyrs = stopYr-startYr + 1
for dyr in range(nyrs):
yr = startYr + dyr
yrstr = str(yr)
for m in range(12):
mnth = m + 1
mnstr = str(mnth).zfill(2)
dstr = yrstr+mnstr
rpath = dpath+yrstr+'/'+mnstr+'/'+dataset
rfile = rscdfile.replace(defStr,dstr)
rec = list(record)
rec[0] = rpath
rec[1] = rfile
rec[4] = yrstr
rec[5] = str(mnth)
records.append(tuple(rec))
return records
[docs]def touch(path):
with open(path, 'a'):
os.utime(path, None)
return
[docs]def db_match_to_csv(db_path, db_name, csv_path, csv_fname):
"""
"""
db_file = db_path+'/'+db_name+'.db'
db_conn = create_connection(db_file)
cur = db_conn.cursor()
# Define mathc query string
match_sql = \
"SELECT rscd_files.id, \
buoy_files.id, \
rscd_files.year, \
rscd_files.month \
FROM buoy_files, rscd_files \
WHERE (((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \
AND \
(Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001)) \
OR \
(upper([rscd_files].[platform]) == upper([buoy_files].[platform])) \
) \
AND \
((buoy_files.beginDate <= rscd_files.beginDate) \
AND \
(buoy_files.endDate >= rscd_files.endDate)) \
ORDER BY rscd_files.fileName;"
# Get matches from db
res = cur.execute(match_sql)
records = res.fetchall()
# Generate record list for writing to csv file
reclist = []
for rec in records:
sql_str = "SELECT * FROM rscd_files WHERE rscd_files.id == "+str(rec[0])+";"
res = cur.execute(sql_str)
modrec = res.fetchall()[0]
sql_str = "SELECT * FROM buoy_files WHERE buoy_files.id == "+str(rec[1])+";"
res = cur.execute(sql_str)
obsrec = res.fetchall()[0]
modpath = modrec[2]
modfile = modrec[3]
obspath = obsrec[2]
obsfile = obsrec[3]
year = str(rec[2])
month = str(rec[3])
starttime = obsrec[6]
stoptime = obsrec[7]
pltfrm = obsrec[1]
lat = obsrec[4]
lon = obsrec[5]
arec = (modpath, modfile, obspath, obsfile, year, month, starttime, stoptime, pltfrm, lat, lon)
reclist.append(arec)
# Write records list to csv file
csvfile = csv_path+'/'+csv_fname
tuples2csv(reclist,csvfile)
db_conn.close()
return
[docs]def create_connection(db_file):
""" Create a database connection to the SQLite database
specified by db_file.
:param db_file: database file
:return: Connection object or None
"""
db_conn = None
try:
db_conn = sqlite3.connect(db_file)
return db_conn
except sqlite3.Error as e:
print(e)
return db_conn
[docs]def close_database(db_conn):
if db_conn is not None:
db_conn.commit()
db_conn.close()
return
[docs]def create_rscd_table(db_conn):
try:
c = db_conn.cursor()
c.execute(rscd_table_sql)
except sqlite3.Error as e:
print(e)
return
[docs]def create_buoy_table(db_conn):
try:
c = db_conn.cursor()
c.execute(buoy_table_sql)
except sqlite3.Error as e:
print(e)
return
[docs]def create_match_table(db_conn):
try:
c = db_conn.cursor()
c.execute(match_table_sql)
except sqlite3.Error as e:
print(e)
return
[docs]def create_database(db_path, db_name):
db_file = db_name+'.db'
db_conn = create_connection(os.path.join(db_path, db_file))
create_rscd_table(db_conn)
create_buoy_table(db_conn)
create_match_table(db_conn)
db_conn.commit()
return db_conn
[docs]def insert_rscd_records(db_conn, records):
"""
Create a new record in the rscd_files table
:param db_conn:
:param records:
:return: row count
"""
sql = ''' INSERT INTO rscd_files(platform,
filePath, fileName,
latitude, longitude,
beginDate, endDate,
year, month, samplePeriod)
VALUES(?,?,?,?,?,?,?,?,?,?) '''
cur = db_conn.cursor()
cur.executemany(sql, records)
db_conn.commit()
rowcount = cur.rowcount
del cur
return rowcount
[docs]def insert_buoy_records(db_conn, records):
"""
Create a new record in the rscd_files table
:param db_conn:
:param record:
:return: row count
"""
sql = ''' INSERT INTO buoy_files(platform,
filePath, fileName,
latitude, longitude,
beginDate, endDate,
samplePeriod)
VALUES(?,?,?,?,?,?,?,?) '''
cur = db_conn.cursor()
cur.executemany(sql, records)
db_conn.commit()
rowcount = cur.rowcount
del cur
return rowcount
[docs]def get_match_records(db_path, db_name, sql_query):
db_file = db_path+'/'+db_name+'.db'
db_conn = create_connection(db_file)
cur = db_conn.cursor()
res = cur.execute(sql_query)
records = res.fetchall()
db_conn.close()
return records
[docs]def insert_match_record(db_conn, records):
"""
Create a new record in the match_files table
:param db_conn:
:param records:
:return: row count
"""
sql = ''' INSERT INTO match_files(rscdRecID, buoyRecID, year, month)
VALUES(?,?,?,?) '''
cur = db_conn.cursor()
cur.executemany(sql, records)
db_conn.commit()
return cur.rowcount
[docs]def load_rscd_archive(db_conn, archivePath, dataSet, chunkSize=500):
"""
"""
records = []
recs_added = 0
print('Start time: ', str(MData.datetime.now()))
for root, subFolders, files in os.walk(archivePath):
if (os.path.split(root)[-1]) == dataSet:
print(root.replace('\\', '/'))
print('Files to process = ',len(files))
for count, file in enumerate(files):
fextn = os.path.splitext(file)[1]
tst01 = fextn == '.nc'
tst02 = not re.search('W......N....._2017',file)
tst03 = not re.search('E......N....._2017',file)
tst04 = not file in excludeFiles
if tst01 & tst02 & tst03 & tst04:
# print(file)
rscdFile = os.path.join(root, file).replace('\\', '/')
try:
rscd = MData.ww3(rscdFile)
records.append(tuple(rscd.getMetaData()))
del rscd
except:
print('CORRUPT FILE: ', file)
if (count > 0) & ((count % chunkSize) == 0):
rec_count = insert_rscd_records(db_conn, records)
recs_added += rec_count
print(recs_added,' RSCD records added to the database')
del records
records = []
# print('Time stamp: ', str(MData.datetime.now()))
if len(records) > 0:
rec_count = insert_rscd_records(db_conn, records)
recs_added += rec_count
print(recs_added,' RSCD records added to the database')
print('Time stamp: ', str(MData.datetime.now()))
return recs_added
[docs]def load_buoy_archive(db_conn,
archivePath: str, dataSet: str, dataFmt: str,
recFilter=None, chunkSize=500,
verbose=False):
"""
"""
records = []
recs_added = 0
print('Start time: ', str(MData.datetime.now()))
for root, subFolders, files in os.walk(archivePath):
if (os.path.split(root)[-1]) == dataSet:
print(root.replace('\\', '/'))
for count, file in enumerate(files):
fextn = os.path.splitext(file)[1]
if fextn == '.nc':
buoyFile = os.path.join(root, file).replace('\\', '/')
if dataFmt == 'InSituTAC':
buoy = OData.wavebuoy(buoyFile)
elif dataFmt == 'SeaDataNet':
buoy = OData.emecwb(buoyFile)
if recFilter is None:
records.append(tuple(buoy.getMetaData()))
else:
if recFilter.includeRecord(buoy, verbose):
records.append(tuple(buoy.getMetaData()))
del buoy
if (count > 0) & ((count % chunkSize) == 0):
rec_count = insert_buoy_records(db_conn, records)
recs_added += rec_count
print(recs_added,' BUOY records added to the database')
records = []
print('Time stamp: ', str(MData.datetime.now()))
if len(records) > 0:
rec_count = insert_buoy_records(db_conn, records)
recs_added += rec_count
print(recs_added,' BUOY records added to the database')
print('Time stamp: ', str(MData.datetime.now()))
return recs_added
[docs]def construct_rscd_mdb(db_path: str,
db_name: str,
rscdpath: str,
rscddataset: str,
months: list,
buoypath: str,
buoydataset: str,
buoydatafmt: str,
bfilt=None,
chunkSize=500):
"""
"""
# === Construct Match-up Database =======================================
db_file = db_path+'/'+db_name+'.db'
db_conn = create_connection(db_file)
create_rscd_table(db_conn)
create_buoy_table(db_conn)
cur = db_conn.cursor()
try:
cur.execute("DELETE FROM buoy_files;",)
cur.rowcount
except:
print('No table buoy_files, nothing deleted.')
try:
cur.execute("DELETE FROM rscd_files;",)
cur.rowcount
except:
print('No table rscd_files, nothing deleted.')
db_conn.commit()
res =cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
print('DB Tables: ',res.fetchall())
# === Populate Database =================================================
# Model Data Records
if months == []:
months = list(range(1,13))
for month in months:
fpath = os.path.join(rscdpath,str(month).zfill(2)).replace('\\', '/')
rscd_rec_cnt = load_rscd_archive(db_conn,
fpath, rscddataset,
chunkSize)
# In situ Data Records
buoy_rec_cnt = load_buoy_archive(db_conn,
buoypath, buoydataset, buoydatafmt,
bfilt, chunkSize)
db_conn.commit()
# === Close Database ====================================================
db_conn.close()
return rscd_rec_cnt, buoy_rec_cnt