Source code for MatchUpDatabase

# -*- coding: utf-8 -*-
"""
Tools for generating an SQLite matchup database from a set of model extracts 
and an archive of wavebuoy data.

These tools are tied to the ResourceCode WaveWatch III model hindcast data, 
but the undelying functions for opening, populating and querying and SQLite 
dataase are generic. 

:Dependencies [External]: os, re, sqlite, csv
:Dependencies [Internal]: waveval.Geomtery, waveval.TimeFuncs, waveval.ObsData, waveval.ModelData

"""
# ----------------------------------------------------------------------------
#   IMPORTS
# ----------------------------------------------------------------------------
# Standard Python Dependencies
import os
import re
import sqlite3
import csv
# Non-Standard Python Dependencies
import numpy as np
# Local Module Dependencies
from waveval.Geometry import spatialCoverage
from waveval.TimeFuncs import temporalCoverage, dateNumFromStr
import waveval.ObsData as OData
import waveval.ModelData as MData
# Other Dependencies

# ----------------------------------------------------------------------------
#   GLOBAL VARIABLES
# ----------------------------------------------------------------------------
rscd_table_sql = """ CREATE TABLE IF NOT EXISTS rscd_files (
                        id integer PRIMARY KEY,
                        platform text NOT NULL,
                        filePath text NOT NULL,
                        fileName text NOT NULL,
                        latitude real NOT NULL,
                        longitude real NOT NULL,
                        beginDate timestamp NOT NULL,
                        endDate timestamp NOT NULL,
                        year integer NOT NULL,
                        month integer NOT NULL,
                        samplePeriod real NOT NULL
                                    ); """

buoy_table_sql = """ CREATE TABLE IF NOT EXISTS buoy_files (
                        id integer PRIMARY KEY,
                        platform text NOT NULL,
                        filePath text NOT NULL,
                        fileName text NOT NULL,
                        latitude real NOT NULL,
                        longitude real NOT NULL,
                        beginDate timestamp NOT NULL,
                        endDate timestamp NOT NULL,
                        samplePeriod real NOT NULL
                                    ); """

match_table_sql = """ CREATE TABLE IF NOT EXISTS match_files (
                        id integer PRIMARY KEY,
                        rscdRecID integer NOT NULL,
                        buoyRecID integer NOT NULL,
                        year integer NOT NULL,
                        month integer NOT NULL
                                    ); """

match_recs_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
rscd_files.latitude, rscd_files.longitude \
FROM buoy_files, rscd_files \
WHERE \
(((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \
AND \
(Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001)) \
OR \
(upper([rscd_files].[platform]) == upper([buoy_files].[platform]))) \
AND \
((buoy_files.beginDate <= rscd_files.beginDate) \
AND \
(buoy_files.endDate >= rscd_files.endDate)) \
ORDER BY rscd_files.fileName;\
"""

match_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month \
FROM buoy_files, rscd_files \
"""

val_site_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
buoy_files.beginDate, buoy_files.endDate,
buoy_files.platform \
FROM buoy_files, rscd_files \
"""

val_match_select_sql = \
"""\
SELECT rscd_files.filepath, rscd_files.filename, \
buoy_files.filepath, buoy_files.filename, \
rscd_files.year, rscd_files.month, \
rscd_files.latitude, rscd_files.longitude, \
buoy_files.platform, buoy_files.beginDate, buoy_files.endDate \
FROM buoy_files, rscd_files \
"""

location_filter_sql = \
"""\
((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \
AND \
(Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001))\
"""
    
platform_filter_sql = \
"""\
(upper([rscd_files].[platform]) == upper([buoy_files].[platform]))\
"""

time_window_filter_sql = \
"""\
((buoy_files.beginDate <= rscd_files.beginDate) \
AND \
(buoy_files.endDate >= rscd_files.endDate)) \
"""
    
excludeFiles = ['RSCD_WW3-RSCD-UG-62107_201705_freq.nc',
                'RSCD_WW3-RSCD-UG-62101_201704_freq.nc',
                'RSCD_WW3-RSCD-UG-63057_201706_freq.nc',
                'RSCD_WW3-RSCD-UG-62068_201710_freq.nc']

# ----------------------------------------------------------------------------
#   CLASS DEFINITIONS
# ----------------------------------------------------------------------------
[docs]class recordFilter: """ """ def __init__(self, coverage: list): for cover in coverage: if type(cover) == spatialCoverage: self.domain = cover if type(cover) == temporalCoverage: self.window = cover
[docs] def includeRecord(self, record, verbose=False): hasdomain = hasattr(self, 'domain') if hasdomain: if hasattr(record, 'lat') & hasattr(record, 'lon'): indomain = (record.lat > min(self.domain.lat_range)) & \ (record.lat < max(self.domain.lat_range)) & \ (record.lon > min(self.domain.lon_range)) & \ (record.lon < max(self.domain.lon_range)) if (not indomain) & verbose: if hasattr(record, 'platform'): print(record.platform+' outside spatial coverage.') else: print('record outside spatial coverage.') else: if verbose: print('WARNING: Record missing lat or lon data.') indomain = False haswindow = hasattr(self, 'window') if haswindow: if hasattr(record, 'time_start') & hasattr(record, 'time_end'): t0 = dateNumFromStr(record.time_start) t1 = dateNumFromStr(record.time_end) case01 = (t0 >= min(self.window.timeRangeNum)) & \ (t1 <= max(self.window.timeRangeNum)) case02 = (t0 < min(self.window.timeRangeNum)) & \ (t1 >= min(self.window.timeRangeNum)) case03 = (t0 < max(self.window.timeRangeNum)) & \ (t1 >= max(self.window.timeRangeNum)) case04 = (t0 < min(self.window.timeRangeNum)) & \ (t1 > max(self.window.timeRangeNum)) inwindow = case01 or case02 or case03 or case04 if (not inwindow) & verbose: if hasattr(record, 'platform'): print(record.platform+' outside temporal coverage.') else: print('record outside temporal coverage.') else: if verbose: print('WARNING: Record missing time data.') inwindow = False if hasdomain: include = indomain if haswindow: include = include and inwindow elif haswindow: include = inwindow else: include = False return include
# ---------------------------------------------------------------------------- # FUNCTION DEFINITIONS # ----------------------------------------------------------------------------
[docs]def tuples2csv(records, csv_file): """ """ # open file in write mode with open(csv_file, 'w', newline='', encoding='utf-8') as out: csv_out=csv.writer(out) for row in records: csv_out.writerow(row) return
[docs]def csv2tuples(csv_file): """ """ # open file in read mode with open(csv_file, 'r') as read_obj: csv_reader = csv.reader(read_obj) list_of_tuples = list(map(tuple, csv_reader)) return list_of_tuples
[docs]def mapRecord(startYr, stopYr, record, dataset): """ """ records = [] rscdpath = record[0] dpath = rscdpath.split(record[4])[0] rscdfile = record[1] defStr = record[4]+record[5].zfill(2) nyrs = stopYr-startYr + 1 for dyr in range(nyrs): yr = startYr + dyr yrstr = str(yr) for m in range(12): mnth = m + 1 mnstr = str(mnth).zfill(2) dstr = yrstr+mnstr rpath = dpath+yrstr+'/'+mnstr+'/'+dataset rfile = rscdfile.replace(defStr,dstr) rec = list(record) rec[0] = rpath rec[1] = rfile rec[4] = yrstr rec[5] = str(mnth) records.append(tuple(rec)) return records
[docs]def getPlatformRecords(records,platform): """ """ pfrecindx = np.where(np.array(records)[:,8] == platform) platformRecs = [] for indx in pfrecindx[0]: platformRecs.append(records[indx]) return platformRecs
[docs]def getPlatformList(records): """ """ platforms = list(set(np.array(records)[:,8])) return platforms
[docs]def touch(path): with open(path, 'a'): os.utime(path, None) return
[docs]def db_match_to_csv(db_path, db_name, csv_path, csv_fname): """ """ db_file = db_path+'/'+db_name+'.db' db_conn = create_connection(db_file) cur = db_conn.cursor() # Define mathc query string match_sql = \ "SELECT rscd_files.id, \ buoy_files.id, \ rscd_files.year, \ rscd_files.month \ FROM buoy_files, rscd_files \ WHERE (((Abs([rscd_files].[latitude]-[buoy_files].[latitude])<0.001) \ AND \ (Abs([rscd_files].[longitude]-[buoy_files].[longitude])<0.001)) \ OR \ (upper([rscd_files].[platform]) == upper([buoy_files].[platform])) \ ) \ AND \ ((buoy_files.beginDate <= rscd_files.beginDate) \ AND \ (buoy_files.endDate >= rscd_files.endDate)) \ ORDER BY rscd_files.fileName;" # Get matches from db res = cur.execute(match_sql) records = res.fetchall() # Generate record list for writing to csv file reclist = [] for rec in records: sql_str = "SELECT * FROM rscd_files WHERE rscd_files.id == "+str(rec[0])+";" res = cur.execute(sql_str) modrec = res.fetchall()[0] sql_str = "SELECT * FROM buoy_files WHERE buoy_files.id == "+str(rec[1])+";" res = cur.execute(sql_str) obsrec = res.fetchall()[0] modpath = modrec[2] modfile = modrec[3] obspath = obsrec[2] obsfile = obsrec[3] year = str(rec[2]) month = str(rec[3]) starttime = obsrec[6] stoptime = obsrec[7] pltfrm = obsrec[1] lat = obsrec[4] lon = obsrec[5] arec = (modpath, modfile, obspath, obsfile, year, month, starttime, stoptime, pltfrm, lat, lon) reclist.append(arec) # Write records list to csv file csvfile = csv_path+'/'+csv_fname tuples2csv(reclist,csvfile) db_conn.close() return
[docs]def create_connection(db_file): """ Create a database connection to the SQLite database specified by db_file. :param db_file: database file :return: Connection object or None """ db_conn = None try: db_conn = sqlite3.connect(db_file) return db_conn except sqlite3.Error as e: print(e) return db_conn
[docs]def close_database(db_conn): if db_conn is not None: db_conn.commit() db_conn.close() return
[docs]def create_rscd_table(db_conn): try: c = db_conn.cursor() c.execute(rscd_table_sql) except sqlite3.Error as e: print(e) return
[docs]def create_buoy_table(db_conn): try: c = db_conn.cursor() c.execute(buoy_table_sql) except sqlite3.Error as e: print(e) return
[docs]def create_match_table(db_conn): try: c = db_conn.cursor() c.execute(match_table_sql) except sqlite3.Error as e: print(e) return
[docs]def create_database(db_path, db_name): db_file = db_name+'.db' db_conn = create_connection(os.path.join(db_path, db_file)) create_rscd_table(db_conn) create_buoy_table(db_conn) create_match_table(db_conn) db_conn.commit() return db_conn
[docs]def insert_rscd_records(db_conn, records): """ Create a new record in the rscd_files table :param db_conn: :param records: :return: row count """ sql = ''' INSERT INTO rscd_files(platform, filePath, fileName, latitude, longitude, beginDate, endDate, year, month, samplePeriod) VALUES(?,?,?,?,?,?,?,?,?,?) ''' cur = db_conn.cursor() cur.executemany(sql, records) db_conn.commit() rowcount = cur.rowcount del cur return rowcount
[docs]def insert_buoy_records(db_conn, records): """ Create a new record in the rscd_files table :param db_conn: :param record: :return: row count """ sql = ''' INSERT INTO buoy_files(platform, filePath, fileName, latitude, longitude, beginDate, endDate, samplePeriod) VALUES(?,?,?,?,?,?,?,?) ''' cur = db_conn.cursor() cur.executemany(sql, records) db_conn.commit() rowcount = cur.rowcount del cur return rowcount
[docs]def get_match_records(db_path, db_name, sql_query): db_file = db_path+'/'+db_name+'.db' db_conn = create_connection(db_file) cur = db_conn.cursor() res = cur.execute(sql_query) records = res.fetchall() db_conn.close() return records
[docs]def insert_match_record(db_conn, records): """ Create a new record in the match_files table :param db_conn: :param records: :return: row count """ sql = ''' INSERT INTO match_files(rscdRecID, buoyRecID, year, month) VALUES(?,?,?,?) ''' cur = db_conn.cursor() cur.executemany(sql, records) db_conn.commit() return cur.rowcount
[docs]def load_rscd_archive(db_conn, archivePath, dataSet, chunkSize=500): """ """ records = [] recs_added = 0 print('Start time: ', str(MData.datetime.now())) for root, subFolders, files in os.walk(archivePath): if (os.path.split(root)[-1]) == dataSet: print(root.replace('\\', '/')) print('Files to process = ',len(files)) for count, file in enumerate(files): fextn = os.path.splitext(file)[1] tst01 = fextn == '.nc' tst02 = not re.search('W......N....._2017',file) tst03 = not re.search('E......N....._2017',file) tst04 = not file in excludeFiles if tst01 & tst02 & tst03 & tst04: # print(file) rscdFile = os.path.join(root, file).replace('\\', '/') try: rscd = MData.ww3(rscdFile) records.append(tuple(rscd.getMetaData())) del rscd except: print('CORRUPT FILE: ', file) if (count > 0) & ((count % chunkSize) == 0): rec_count = insert_rscd_records(db_conn, records) recs_added += rec_count print(recs_added,' RSCD records added to the database') del records records = [] # print('Time stamp: ', str(MData.datetime.now())) if len(records) > 0: rec_count = insert_rscd_records(db_conn, records) recs_added += rec_count print(recs_added,' RSCD records added to the database') print('Time stamp: ', str(MData.datetime.now())) return recs_added
[docs]def load_buoy_archive(db_conn, archivePath: str, dataSet: str, dataFmt: str, recFilter=None, chunkSize=500, verbose=False): """ """ records = [] recs_added = 0 print('Start time: ', str(MData.datetime.now())) for root, subFolders, files in os.walk(archivePath): if (os.path.split(root)[-1]) == dataSet: print(root.replace('\\', '/')) for count, file in enumerate(files): fextn = os.path.splitext(file)[1] if fextn == '.nc': buoyFile = os.path.join(root, file).replace('\\', '/') if dataFmt == 'InSituTAC': buoy = OData.wavebuoy(buoyFile) elif dataFmt == 'SeaDataNet': buoy = OData.emecwb(buoyFile) if recFilter is None: records.append(tuple(buoy.getMetaData())) else: if recFilter.includeRecord(buoy, verbose): records.append(tuple(buoy.getMetaData())) del buoy if (count > 0) & ((count % chunkSize) == 0): rec_count = insert_buoy_records(db_conn, records) recs_added += rec_count print(recs_added,' BUOY records added to the database') records = [] print('Time stamp: ', str(MData.datetime.now())) if len(records) > 0: rec_count = insert_buoy_records(db_conn, records) recs_added += rec_count print(recs_added,' BUOY records added to the database') print('Time stamp: ', str(MData.datetime.now())) return recs_added
[docs]def construct_rscd_mdb(db_path: str, db_name: str, rscdpath: str, rscddataset: str, months: list, buoypath: str, buoydataset: str, buoydatafmt: str, bfilt=None, chunkSize=500): """ """ # === Construct Match-up Database ======================================= db_file = db_path+'/'+db_name+'.db' db_conn = create_connection(db_file) create_rscd_table(db_conn) create_buoy_table(db_conn) cur = db_conn.cursor() try: cur.execute("DELETE FROM buoy_files;",) cur.rowcount except: print('No table buoy_files, nothing deleted.') try: cur.execute("DELETE FROM rscd_files;",) cur.rowcount except: print('No table rscd_files, nothing deleted.') db_conn.commit() res =cur.execute("SELECT name FROM sqlite_master WHERE type='table';") print('DB Tables: ',res.fetchall()) # === Populate Database ================================================= # Model Data Records if months == []: months = list(range(1,13)) for month in months: fpath = os.path.join(rscdpath,str(month).zfill(2)).replace('\\', '/') rscd_rec_cnt = load_rscd_archive(db_conn, fpath, rscddataset, chunkSize) # In situ Data Records buoy_rec_cnt = load_buoy_archive(db_conn, buoypath, buoydataset, buoydatafmt, bfilt, chunkSize) db_conn.commit() # === Close Database ==================================================== db_conn.close() return rscd_rec_cnt, buoy_rec_cnt