Batch Processing - (IFREMER Datarmor HPC)

The ResourceCode hindcast archive has been developed using the IFREMER Datarmor HPC services. The hindcast data set was validated using the WaveVal tools implemented on Datarmor. This page provides information on how to set up and run the WaveVal tools on Datarmor, and how to generate and run a set of PBS batch jobs to automate the validation process. To use the Datarmor HPC service the user requires an IFREMER intranet login account, and an extranet login to remotely access the services.

Useful information on using the Datarmor services can be found at https://m.davidmkaplan.fr/cluster/cluster-use-instructions.html.

The methods described here should port to any HPC systems that supports the python anaconda environment and runs a PBS job submission service.

Setup conda environment

Access to Datarmor is via login node. On this node the user enters a bash shell by default. The anaconda python interface is accessed by changing to a conda shell using:

$ source /appli/anaconda/latest/etc/profile.d/conda.csh

Then the local conda runtime configuration file ( ~/.condarc ) needs to be created containg the following lines:

.condarc
envs_dirs:
- $DATAWORK/conda-env
- /appli/conda-env
- /appli/conda-env/2.7
- /appli/conda-env/3.6
pkgs_dirs:
- $DATAWORK/conda/pkgs

This file needs to be in the root of the users home directory. It can be created using any suitable text editor (e.g. vi, nano, etc.).

Once the ~/.condarc file exists, a conda enviroment can be created including the python packages required to run the validation tools. For the ResourceCode hindcast validation the conda environment buoyvalid was created using the command:

$ conda create --name buoyvalid

where the option –name is used to set the name of the conda enviroment created.

To check that the conda environment was generated use the command:

$ conda info --envs

The required python packages can be added to the buoyvalid conda environment as follows:

$ conda install --name buoyvalid numpy
$ conda install --name buoyvalid netCDF4
$ conda install --name buoyvalid astropy
$ conda install --name buoyvalid matplotlib
$ conda install --name buoyvalid cartopy
$ conda install --name buoyvalid spyder
$ conda config  --add  channels  conda-forge
$ conda install --name buoyvalid cartopy_offlinedata

To use the conda environment it needs to be activated using the command:

$ conda activate buoyvalid

Similarly, to close the conda environment use the command:

$ conda deactivate

Construct validation process scripts

The processing script needs to be constructed in such a way that it can be called from a PBS batch job script and option parameters can be passed to control how what the process does. The script datarmor_insitutac_validate.py was used to process the ResourceCode hindcast data set against the CMEMS InsituTAC wavebuoy data archive. The main function is called when this python script is run, and the optinal parameters are parsed prior to calling the validation process. This script is designed to process a single wavebuoy location, the information about the buoy is taken from a record in a CSV file that provides a full set of unique model/wavebuoy data matches for a specific year and month, i.e. each record in the CSV file corresponds to a unique wavebuoy.

datarmor_insitutac_validate.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# ----------------------------------------------------------------------------
#   IMPORT PACKAGE DEPENDENCIES
# ----------------------------------------------------------------------------
import sys
import os
import getopt
from waveval.MatchUpDatabase import csv2tuples, mapRecord
from waveval.Validation import validate_records, save_tabulated_results

The header of the script sets the shebang line to idenfiy the program required to run the script, and imports the required modules and module functions.

The first part of the main function parses the user input options used to control the process. There are three required parameters:

Option

Description

-p, --path

Path for validation results output

-f, --file

CSV file of dat match records, this must be in the directory defined by --path

-n, --recnum

Record number in the CSV file to process

and three optional parameters:

Option

Description

Default

--minYear

Minimum year to process

1994

--maxYear

Maximum year to process

2019

--genPlots

Generate plots flag

False

Note

The minimum year must not be less than the first year in the model hindcast archive, and the maximum year must not be greater than the last year in the model hindcast archive.

#--------------------------------------------------------------------------
# MAIN PROCESS
#--------------------------------------------------------------------------

def main(argv):
    
    minYear = 1994
    maxYear = 2019
    genPlots = False

    try:
        opts, args = getopt.getopt(argv,'hp:f:n:',["--path","--fname=",
                                                   "--recnum","minYear=",
                                                   "maxYear=","genPlots="])
    except getopt.GetoptError:
        print('python3 datarmor_insitutac_validate.py -p <path> -f <fname> -n <recnum>')
        sys.exit(4)
    if opts == []:
        print('python3 datarmor_insitutac_validate.py -p <path> -f <fname> -n <recnum>')
        sys.exit(4)
    for opt, arg in opts:
        print(opt,arg)
        if opt == '-h':
            print('python3 datarmor_insitutac_validate.py -p <path> -f <fname> -n <recnum>')
            sys.exit()
        elif opt in ("-p","--path"):
            dat_path = arg
        elif opt in ("-f","--fname"):
            rec_file = arg
        elif opt in ("-n","--recnum"):
            recNum = int(arg)
        elif opt in ("--minYear"):
            minYear = int(arg)
        elif opt in ("--maxYear"):
            maxYear = int(arg)
        elif opt in ("--genPlots"):
            if arg[0].upper() == "T":
                genPlots = True

Once the input parameters are parsed, the input record list CSV file is converted to a list of tuples for processing by the Validation module, the requested year range is set, and the wave data format defined.

    # =================== Get Records to Process =============================
    rec_list = csv2tuples(os.path.join(dat_path,rec_file))
    
    rec = rec_list[recNum]
    yearBgn = int(rec[6].split('-')[0])
    if yearBgn < minYear:
        yearBgn = minYear
    yearEnd = int(rec[7].split('-')[0])
    if yearEnd > maxYear:
        yearEnd = maxYear
    records = mapRecord(yearBgn,yearEnd,rec)
    platform = records[0][8]
    
    buoyFmt = 'InSituTAC'
    

Next the wave parameters to be processed are select using an index list based on the following values:

Index

Integrated Wave Parameter

0

Hm0 - Significant wave height

1

Tp - Peak wave period

2

Tm02 - Mean zero-crossing wave period

3

Dir - Peak wave direction

4

Spr - Peak wave directional spreading

Each integrated wave parameter is processed separately, so the selected parameters are processed in a loop.

    # ========================= Process Record ==============================
    # Choose fields to process
    fields = [0,1,3,4]
    
    for field in fields:
        # Get internal variable name string
        if field == 0:
            oVarName = 'Hm0'      # Significant wave height
        if field == 1:
            oVarName = 'Tp'       # Peak wave period
        if field == 2:
            oVarName = 'Tm02'     # Mean zero crossing period
        if field == 3:
            oVarName = 'Dir'      # Wave direction
        if field == 4:
            oVarName = 'Spr'      # Wave directional spreading
        

The selected wave parameters then need to be mapped to the corresponding netCDF variable names within the model and wavebuoy data files. The model variable name is set in the variable mVarName and takes a single string value, the wavebuoy variable names are set in the list variable varOptions. There are a range of similar wave parameters in the buoy data that could be used in place of the preferred variables, the list allows these to be used if the preferred is not available.

Note

It is recommended that the user only take the preferred wavebuoy variables for validation purposes. [VHM0, VTPK, VTM02, VPED, VPSP].

The WaveWatch III model ouputs peak and zero-crossing frequencies, these are converted to periods during the record processing stage.

        # Set MDB variable names
        if oVarName == 'Hm0':
            mVarName = 'hs'
            varOptions = ['VHM0']
        elif oVarName == 'Tp':
            mVarName = 'fp'
            varOptions = ['VTPK']
        elif oVarName == 'Tm02':
            mVarName = 'f02'
            varOptions = ['VTM02']
        elif oVarName == 'Dir':
            mVarName = 'dir'
            varOptions = ['VPED']
        elif oVarName == 'Spr':
            mVarName = 'spr'
            varOptions = ['VPSP']
        

The final steps are to calculate the validation statistics by calling Validation.validate_records(). This returns a count of the number of validations returned (n_valid) and a dictionary structure (valid_stats) containing the validation results. If there is at least one validation result, then the results are saved to both an ASCII and a binary file for post-procesing. A separate record is produced for each integrated wave parameter for the current wavebuoy being processed.

        # ================== Generate Validation Stats ===========================
        plot_results = genPlots
        n_valid, valid_stats = validate_records(records,
                                         buoyFmt, platform,
                                         mVarName, varOptions,
                                         oVarName,
                                         dat_path,
                                         plot_results)

        # ================== Save Tabulated Results ===========================
        if n_valid > 0:
            save_tabulated_results(valid_stats, platform, oVarName, dat_path)
        else:
            print('No validations statistics generated from match up records.')
    

The interface section allows the script to be run from the command line or to be called from within another script. This feature is used to automate the processing of a large number of wavebuoy locations using the methods described in the following section.

#--------------------------------------------------------------------------
# INTERFACE
#--------------------------------------------------------------------------
if __name__ == "__main__":
    main(sys.argv[1:])

To run the script from the command line use:

$ python3 datarmor_insitutac_validate.py -p path/to/output/directory -f records_file.csv -n record_nmber

e.g.

$ python3 datarmor_insitutac_validate.py -p ../data/VALIDATION/RSCD_v3 -f validation_site.csv -n 9

will write the output to the absolute directory ../data/VALIDATION/RSCD_v3, the input file containing the list of unique model/observation matches is validation_sites.csv, and record number 9 is to be processed (it must be remembered that python counts from 0 not 1, i.e. -n 0 processes the first record in the file, -n 9 processes the 10 th record in the file).

Note

For this script to be callable from another function its permissions need to be set to executable, e.g. apply chmod a+x datarmor_insitutac_validate.py to the script.

Generate PBS job scripts and submit to queue

To run the datarmor_insitutac_validate.py processing script on a Datarmor processing node it needs to be submitted to the job queue using a qsub call. The datarmor_insitutac_validate.py needs to be run in an anaconda shell environment, so this needs to be set up in the script submitted by the qsub call. Within the script the compute node resources need to be requested (i.e. the amount of memory required and the time required to run the process). It should be noted that if insufficient resources or walltime are requested the job will end without completion, so it pays to over-estimate the resource requirements, to avoid needing to rerun a process.

To automate the processing of a large number of wavebuoys, a separate PBS script is required. The approcach taken to facillitate this is to define a base PBD script that contains the common components, and can be modified to provide the specific information for each job.

The following PBS job script (base_insitutac.pbs) was used as the basis for generating the individual job submissions:

base_insitutac.pbs
1#!/bin/csh -e
2
3#PBS -l mem=5G
4#PBS -l walltime=02:00:00
5
6source /appli/anaconda/latest/etc/profile.d/conda.csh
7conda activate buoyvalid
8cd /home1/datawork/USER/pycode
9python3 datarmor_insitutac_validate.py -p /home1/datawork/USER/VALIDATION/RSCD_v3 -f insitutac_validation_sites.csv

Line 1 sets the shell environment, lines 3 and 4 request the compute node resources, line 6 changes to a conda shell, line 7 activates the conda environment buoyvalid (as described above), line 8 changes to the location of the python scripts on the users datawork space, and line 9 gives the common components of the call to run the datarmor_insitutac_validate.py script. Line 9 is missing the required -n input option, so this will return an error if submitted as is. The following script used to generate the PBS jobs adds the required record number to the call.

Note

The conda environment buoyvalid needs to be replaced with the environment you have set up.

You must replace USER must be with your user name, pycode with the location of your python scripts, VALIDATION/RSCD_v3 with the location you want the results to be output too, and validation_sites.csv with the name of the CSV file used for input.

The output directory must exist, and the records CSV file must exist in the output directory.

The script generate_validation_batch_jobs.py is used to generate and submit a PBS job for records in the CSV matched data records file. The default PBS script base_insitutac.pbs, described above, is modified to match the record being processed. It is assumed that each record in the CSV file represents a unique wavebuoy, the data processing script called generates the set of separate year/month records required for processing a given buoy.

The only line in base_insitutac.pbs that needs to be modified is the last line which calls python processing script; the record number to be processed is added as in input option to the call. A new PBS job script is generated with the record number appended, then submitted to the job queue using the qsub command.

A subset of the records can be processed by setting the start_rec and num_recs values in this script before running it. The num_recs must not be greater than the number of records in the CSV file. This feature can be used to re-run processes that failed.

generate_validation_batch_jobs.py
 1#!/usr/bin/env python3
 2# -*- coding: utf-8 -*-
 3
 4import os
 5
 6start_rec = 0
 7num_recs = 154
 8baseFile = 'base_insitutac.pbs'
 9
10for irec in range(start_rec, num_recs):
11    inFile = open(baseFile,'r')
12    ofname = 'insitutac_job_'+str(irec).zfill(3)+'.pbs'
13    outFile = open(ofname,'w')
14    for astr in inFile:
15        if 'python3' in astr:
16            astr = astr.strip('\n')+' -n '+str(irec)+'\n'
17        outFile.write(astr)
18    inFile.close()
19    outFile.close()
20    procstr = 'qsub '+ofname
21    os.system(procstr)

Post-process validation results

process_insitutac_stats.py
 1#!/usr/bin/env python3
 2# -*- coding: utf-8 -*-
 3
 4# ----------------------------------------------------------------------------
 5#   IMPORT PACKAGE DEPENDENCIES
 6# ----------------------------------------------------------------------------
 7# Standard Python Dependencies
 8# Non-Standard Python Dependencies
 9import numpy as np
10# Local Module Dependencies
11from waveval.MatchUpDatabase import csv2tuples
12from waveval.WaveStats import get_buoy_locations
13from waveval.WaveStats import get_parameter_stats_global
14from waveval.WaveStats import get_parameter_stats_by_buoy
15from waveval.Graphics import Regional_Weighted
16# Other Dependencies
17
18# ----------------------------------------------------------------------------
19#   GLOBAL VARIABLES
20# ----------------------------------------------------------------------------
21paramStr = ['Hm0','Tp','Tm02','Dir','Spr']
22metricStr = ['R','MB','NMB','MAE','NMAE','RMSE','NRMSE','SI']
23
24#--------------------------------------------------------------------------
25# MAIN PROCESS
26#--------------------------------------------------------------------------
27
28dpath = 'E:/ResourceCode/Data/VALIDATION/RSCD_v3/INSITUTAC'
29ppath = 'E:/ResourceCode/Data/VALIDATION/RSCD_v3/WAVEVAL/MAPS'
30pfnam = 'INSITUTAC_TS_'
31
32paramStr = ['Hm0','Tp']
33metricStr = ['R','NMB','NRMSE']
34savePlot = True
35
36for param in paramStr:
37    print('PARAMETER: '+param)
38    get_parameter_stats_global(dpath, param, metricStr)
39
40recfile = 'E:/ResourceCode/Data/VALIDATION/RSCD_v3/rscd_2017_matchup_v03.csv'
41records = csv2tuples(recfile)
42buoyLoc = get_buoy_locations(records)
43
44lat = np.asarray(buoyLoc['latitude'],dtype=float)
45lon = np.asarray(buoyLoc['longitude'],dtype=float)
46platforms = buoyLoc['platform']
47
48for i in range(len(platforms)):
49    platforms[i] = platforms[i].strip()
50
51for param in paramStr:
52    
53    for metric in metricStr:
54        
55        stats = get_parameter_stats_by_buoy(dpath, param, metric)
56        
57        pfrm = stats['platform']
58        wght = stats['mean']
59        dom = [-15, 15, 35, 70]
60        pltTitle = 'IN SITU TAC (TS):  Parameter: '+param+',  Metric: '+metric
61        
62        idx = np.zeros((len(pfrm),), dtype=int)
63        for i,p in enumerate(pfrm):
64            try:
65                indx = platforms.index(p)
66            except:
67                continue
68            idx[i] = indx
69        if metric in ['R']:
70            clims = [0.0, 1.0]
71        elif metric in ['NMB', 'NMAE', 'NRMSE', 'SI']:
72            clims = [0.0, 100]
73        else:
74            clims = [None]
75        pname = pfnam+param+'_'+metric+'_by_buoy'
76        Regional_Weighted(list(lat[idx]), list(lon[idx]), wght, dom, clims, 
77                          pltTitle, ppath, pname, savePlot)
78