Commit 34cb5a60 authored by Modellers Operational's avatar Modellers Operational

Add support for running on ceto and also a bunch of bug fixes

parent 20b16551
......@@ -8,30 +8,39 @@ comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
fvcom_file = sys.argv[1]
lower_left_ll = np.asarray(sys.argv[2].split(','), dtype=float)
upper_right_ll = np.asarray(sys.argv[3].split(','), dtype=float)
grid_res = sys.argv[4]
depth_layers = np.asarray(sys.argv[5].split(','), dtype=float)
var_list_raw = sys.argv[6].split(',')
varmatch = {'temp':'nodes', 'salinity':'nodes', 'u':'elements', 'v':'elements', 'zeta':'surface'}
varlist = {}
for this_var in var_list_raw:
varlist[this_var] = varmatch[this_var]
# get the fvcom points within the extended grid and land mask the grid mesh
if rank == 0:
# Load data, get time indices.
fvcom_file = sys.argv[1]
lower_left_ll = np.asarray(sys.argv[2].split(','), dtype=float)
upper_right_ll = np.asarray(sys.argv[3].split(','), dtype=float)
grid_res = float(sys.argv[4])
depth_layers = np.asarray(sys.argv[5].split(','), dtype=float)
var_list_raw = sys.argv[6].split(',')
varmatch = {'temp':'nodes', 'salinity':'nodes', 'u':'elements', 'v':'elements', 'zeta':'surface'}
varlist = {}
for this_var in var_list_raw:
varlist[this_var] = varmatch[this_var]
fvcom = pf.read.FileReader(fvcom_file)
nt = fvcom.dims.time
start, stop = 0, nt
global_time_indices = np.array_split(np.arange(start, stop), size)
else:
fvcom_file = None
lower_left_ll = None
upper_right_ll = None
grid_res = None
depth_layers = None
varlist = None
global_time_indices = None
# get the fvcom points within the extended grid and land mask the grid mesh
fvcom_file = comm.bcast(fvcom_file, root=0)
lower_left_ll = comm.bcast(lower_left_ll, root=0)
upper_right_ll = comm.bcast(upper_right_ll, root=0)
grid_res = comm.bcast(grid_res, root=0)
depth_layers = comm.bcast(depth_layers, root=0)
varlist = comm.bcast(varlist, root=0)
time_indices = comm.scatter(global_time_indices, root=0)
......@@ -47,24 +56,29 @@ all_interped_data = comm.gather(interped_data, root=0)
# resort the data by timestep
if rank == 0:
np.save('test_dict.npy', all_interped_data)
collected_interp_data = {}
for this_var in varlist.keys():
output_list = []
for this_dict in all_interped_data:
output_list.append(this_dict[this_var])
collected_interp_data[this_var] = np.ma.masked_invalid(np.vstack(output_list))
# write to cmems format
reg_grid_data = np.ma.masked_invalid(reg_grid_data)
output_file = 'output.nc'
fvcom = pf.read.FileReader(fvcom_file)
dims = {'time':len(fvcom.time.datetime), 'depth':len(worker.regular_grid.dep_lays), 'lon':len(worker.regular_grid.lons),
'lat':len(worker.regular_grid.lats)}
'lat':len(worker.regular_grid.lats), 'DateStrLen': 26}
all_attributes =
{'lon':{'standard_name':'longitude', 'units':'degrees_east'}
'lat':{'standard_name':'latitude', 'units':'degrees_north'}
'temp':{'standard_name':'sea_water_potential_temperature','units':'C','_FillValue':'-32768s', 'missing_value':'-32768s'} ,
'salinity':{'standard_name':'sea_water_salinity','units':'psu','_FillValue':'-32768s', 'missing_value':'-32768s'},
'u':{'standard_name':'eastward_sea_water_velocity','units':'m s-1','_FillValue':'-32768s', 'missing_value':'-32768s'},
'v':{'standard_name':'northward_sea_water_velocity','units':'m s-1','_FillValue':'-32768s', 'missing_value':'-32768s'},
'zeta':{'standard_name':'sea_surface_height_above_geoid','units':'m','_FillValue':'-32768s', 'missing_value':'-32768s'}}
all_attributes = {'lon':{'standard_name':'longitude', 'units':'degrees_east'},
'lat':{'standard_name':'latitude', 'units':'degrees_north'},
'depth':{'standard_name':'depth', 'units':'m'},
'temp':{'standard_name':'sea_water_potential_temperature','units':'C','missing_value':-32768} ,
'salinity':{'standard_name':'sea_water_salinity','units':'psu', 'missing_value':-32768},
'u':{'standard_name':'eastward_sea_water_velocity','units':'m s-1', 'missing_value':32768},
'v':{'standard_name':'northward_sea_water_velocity','units':'m s-1', 'missing_value':-32768},
'zeta':{'standard_name':'sea_surface_height_above_geoid','units':'m', 'missing_value':-32768}}
globals = {'type': 'OPERATIONAL MODEL REGULAR GRID OUTPUT',
'title': 'Regularly gridded data interpolated from FVCOM output',
......@@ -72,19 +86,23 @@ if rank == 0:
'filename': str(output_file),
'Conventions': 'CF-1.0'}
for this_val in collected_interp_data.values():
np.ma.set_fill_value(this_val, -32768)
ncopts = {}
with pf.preproc.WriteForcing(output_file, dims, global_attributes=globals, clobber=True, format='NETCDF4') as outfile:
# Add the variables.
outfile.add_variable('lon', worker.regular_grid.lons, ['lon'], attributes=all_attributes['lon'])
outfile.add_variable('lat', worker.regular_grid.lats, ['lat'], attributes=all_attributes['lat'])
outfile.add_variable('depth', worker.regular_grid.dep_lays, ['depth'], attributes=all_attributes['depth'])
for this_var, this_mode in varlist.items:
for this_var, this_mode in varlist.items():
if this_mode == 'surface':
outfile.add_variable(this_var, collected_interp_data[this_var], ['time', 'lon', 'lat'],
attributes=all_attributes[this_var])
attributes=all_attributes[this_var], ncopts=ncopts)
else:
outfile.add_variable(this_var, collected_interp_data[this_var], ['time', 'lon', 'lat', 'depth'],
attributes=all_attributes[this_var])
attributes=all_attributes[this_var], ncopts=ncopts)
outfile.write_fvcom_time(fvcom.time.datetime)
else:
......
[command]
default = module load mpi/openmpi-x86_64;
mpiexec -n ${NPROCS} python3 grid_interp.py ${FVCOM_FILES} ${GRID_LOWER_LEFT} ${GRID_UPPER_RIGHT} ${HORIZ_RES} ${DEPTH_LAYERS} ${VARS};
mkdir -p ${OUTPUT_DIR}/${TODAY_DATESTR};
mv output.nc ${OUTPUT_DIR}/${TODAY_DATESTR}/${OUTPUT_FILENAME}
mpiexec -n ${NPROCS} python3 grid_interp.py /${FVCOM_FILE} ${GRID_LOWER_LEFT} ${GRID_UPPER_RIGHT} ${HORIZ_RES} ${DEPTH_LAYERS} ${VARS};
mkdir -p /${OUTPUT_DIR}/${FORECAST_DAY};
mv output.nc /${OUTPUT_DIR}/${FORECAST_DAY}/${OUTPUT_FILENAME}
from mpi4py import MPI
import numpy as np
import sys
import PyFVCOM as pf
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
fvcom_file = sys.argv[1]
lower_left_ll = np.asarray(sys.argv[2].split(','), dtype=float)
upper_right_ll = np.asarray(sys.argv[3].split(','), dtype=float)
grid_res = float(sys.argv[4])
depth_layers = np.asarray(sys.argv[5].split(','), dtype=float)
var_list_raw = sys.argv[6].split(',')
varmatch = {'temp':'nodes', 'salinity':'nodes', 'u':'elements', 'v':'elements', 'zeta':'surface'}
varlist = {}
for this_var in var_list_raw:
varlist[this_var] = varmatch[this_var]
fvcom = pf.read.FileReader(fvcom_file)
nt = fvcom.dims.time
start, stop = 0, nt
global_time_indices = np.array_split(np.arange(start, stop), size)
else:
fvcom_file = None
lower_left_ll = None
upper_right_ll = None
grid_res = None
depth_layers = None
varlist = None
global_time_indices = None
# get the fvcom points within the extended grid and land mask the grid mesh
fvcom_file = comm.bcast(fvcom_file, root=0)
lower_left_ll = comm.bcast(lower_left_ll, root=0)
upper_right_ll = comm.bcast(upper_right_ll, root=0)
grid_res = comm.bcast(grid_res, root=0)
depth_layers = comm.bcast(depth_layers, root=0)
varlist = comm.bcast(varlist, root=0)
time_indices = comm.scatter(global_time_indices, root=0)
interped_data = {}
worker = pf.interpolate.MPIRegularInterpolateWorker(fvcom_file, time_indices, comm=comm, verbose=True)
worker.InitialiseGrid(lower_left_ll, upper_right_ll, grid_res, depth_layers, time_varying_depth=True)
for var, var_mode in varlist.items():
interped_data[var] = worker.InterpolateRegular(var, mode=var_mode)
all_interped_data = comm.gather(interped_data, root=0)
# resort the data by timestep
if rank == 0:
collected_interp_data = {}
for this_var in varlist.keys():
output_list = []
for this_dict in all_interped_data:
output_list.append(this_dict[this_var])
collected_interp_data[this_var] = np.ma.masked_invalid(np.vstack(output_list))
# write to cmems format
output_file = 'output.nc'
fvcom = pf.read.FileReader(fvcom_file)
dims = {'time':len(fvcom.time.datetime), 'depth':len(worker.regular_grid.dep_lays), 'lon':len(worker.regular_grid.lons),
'lat':len(worker.regular_grid.lats), 'DateStrLen': 26}
all_attributes = {'lon':{'standard_name':'longitude', 'units':'degrees_east'},
'lat':{'standard_name':'latitude', 'units':'degrees_north'},
'depth':{'standard_name':'depth', 'units':'m'},
'temp':{'standard_name':'sea_water_potential_temperature','units':'C','missing_value':-32768} ,
'salinity':{'standard_name':'sea_water_salinity','units':'psu', 'missing_value':-32768},
'u':{'standard_name':'eastward_sea_water_velocity','units':'m s-1', 'missing_value':32768},
'v':{'standard_name':'northward_sea_water_velocity','units':'m s-1', 'missing_value':-32768},
'zeta':{'standard_name':'sea_surface_height_above_geoid','units':'m', 'missing_value':-32768}}
globals = {'type': 'OPERATIONAL MODEL REGULAR GRID OUTPUT',
'title': 'Regularly gridded data interpolated from FVCOM output',
'history': 'File created using PyFVCOM',
'filename': str(output_file),
'Conventions': 'CF-1.0'}
for this_val in collected_interp_data.values():
np.ma.set_fill_value(this_val, -32768)
ncopts = {}
with pf.preproc.WriteForcing(output_file, dims, global_attributes=globals, clobber=True, format='NETCDF4') as outfile:
# Add the variables.
outfile.add_variable('lon', worker.regular_grid.lons, ['lon'], attributes=all_attributes['lon'])
outfile.add_variable('lat', worker.regular_grid.lats, ['lat'], attributes=all_attributes['lat'])
outfile.add_variable('depth', worker.regular_grid.dep_lays, ['depth'], attributes=all_attributes['depth'])
for this_var, this_mode in varlist.items():
if this_mode == 'surface':
outfile.add_variable(this_var, collected_interp_data[this_var], ['time', 'lon', 'lat'],
attributes=all_attributes[this_var], ncopts=ncopts)
else:
outfile.add_variable(this_var, collected_interp_data[this_var], ['time', 'lon', 'lat', 'depth'],
attributes=all_attributes[this_var], ncopts=ncopts)
outfile.write_fvcom_time(fvcom.time.datetime)
else:
print('Rank {} process finished'.format(rank))
[command]
default =
ln -s ${ROSE_DATAC}/${FVCOM_GRID_NAME}_0001.nc ${FVCOM_GRID_NAME}_0001.nc;
set eu;
module purge;
module load ipd;
module load intel/intel-2016;
module load intel-mpi/5.1.2;
np=$SLURM_NTASKS;
ulimit -s unlimited;
export OMP_NUM_THREADS=1;
export I_MPI_PIN_PROCS=0-19;
export I_MPI_EXTRA_FILESYSTEM=on;
export I_MPI_EXTRA_FILESYSTEM_LIST=gpfs;
export I_MPI_PMI_LIBRARY=/usr/lib64/libpmi.so;
srun -n $np python3 grid_interp.py ${FVCOM_GRID_NAME}_0001.nc ${GRID_LOWER_LEFT} ${GRID_UPPER_RIGHT} ${HORIZ_RES} ${DEPTH_LAYERS} ${VARS};
mv output.nc ${ROSE_DATAC}
[command]
default = dst=/pml${OUTPUT_DIR}/${FORECAST_DAY};
mkdir -p $dst;
src=${ROSE_DATAC}/output.nc;
ssh ceto6 -t "rsync -aph --no-o --no-g $src $dst/${OUTPUT_FILENAME}";
[command]
default = cp /pml${FVCOM_FILE} ${ROSE_DATAC}/;
[jinja2:suite.rc]
## Run properties
INITIAL_START_DATE='2019-02-28T00:00:00Z'
INITIAL_START_DATE='2019-03-18T00:00:00Z'
FINAL_CYCLE_POINT='NONE'
MAIL_TO='mbe@pml.ac.uk'
NPROCS=12
USE_CETO=True
REMOTE_USER='modop'
NODES=3
TRIGGER_SUITE='fvcom_tamar'
TRIGGER_TASK='transfer_data'
# MODEL SETUP
FVCOM_GRID_NAME='tamar_v0'
FVCOM_OUTPUT_DIR='/data/sthenno1/scratch/modop/Model/FVCOM_tamar/output/'
GRID_LOWER_LEFT='-4.3, 50.24'
GRID_UPPER_RIGHT='-4.05, 50.55'
FVCOM_GRID_NAME='tamar_v2'
FVCOM_OUTPUT_DIR='data/sthenno1/scratch/modop/Model/FVCOM_tamar/output'
GRID_LOWER_LEFT='-4.3,50.24'
GRID_UPPER_RIGHT='-4.05,50.55'
HORIZ_RES=0.001
DEPTH_LAYERS='0,2,5,10,15,25,40,60'
DEPTH_LAYERS='0,2,5,10,15,25,40'
VARS='temp,salinity,u,v,zeta'
OUTPUT_DIR='/data/sthenno1/scratch/modop/Model/FVCOM_tamar/estuary_output/'
OUTPUT_FILENAME='test_out.nc'
OUTPUT_DIR='data/sthenno1/scratch/modop/Model/FVCOM_tamar/estuary_output'
OUTPUT_FILENAME='tamar_estuary_0001.nc'
......@@ -13,7 +13,13 @@
[[dependencies]]
[[[P1D]]]
graph = """
regrid_domain[-P1D]:finish => start_suite => suite_trigger <{{TRIGGER_SUITE}}::{{TRIGGER_TASK}}> => regrid_domain
{% if USE_CETO %}
regrid_domain_ceto[-P1D]:finish => start_cycle => suite_trigger <{{TRIGGER_SUITE}}::{{TRIGGER_TASK}}> => transfer_to_remote
transfer_to_remote => regrid_domain_ceto => transfer_from_remote
{% else %}
regrid_domain[-P1D]:finish => start_cycle => suite_trigger <{{TRIGGER_SUITE}}::{{TRIGGER_TASK}}> => regrid_domain
{% endif %}
"""
[runtime]
......@@ -34,19 +40,55 @@
HORIZ_RES={{HORIZ_RES}}
DEPTH_LAYERS={{DEPTH_LAYERS}}
VARS={{VARS}}
OUTPUT_DIR={{OUTPUT_DIR}}
OUTPUT_FILENAME={{OUTPUT_FILENAME}}
FORECAST_DAY=$(rose date --print-format='%Y-%m-%d' $CYLC_TASK_CYCLE_POINT)
FVCOM_FILE={{FVCOM_OUTPUT_DIR}}/${FORECAST_DAY/${FVCOM_GRID_NAME}_0001.nc
FVCOM_FILE={{FVCOM_OUTPUT_DIR}}/${FORECAST_DAY}/${FVCOM_GRID_NAME}_0001.nc
[[start_cycle]]
script = """
"""
[[fvcom_suite_trigger]]
[[start_cycle]]
script = """
echo 'Starting regrid'
"""
[[suite_trigger]]
script =""
[[[suite state polling]]]
interval = PT10M
max-polls = 1440
[[[job]]]
execution retry delays = 3*PT15M
{% if USE_CETO %}
[[slurm_job]]
[[[job]]]
batch system = slurm
submission polling intervals = PT10S
execution polling intervals = PT10S, PT1M
[[[directives]]]
--nodes = {{NODES}}
--ntasks-per-node=20
--threads-per-core=1
--time=24:00:00
[[[remote]]]
host = login.ceto.npm.ac.uk
owner = {{REMOTE_USER}}
[[remote_job]]
[[[remote]]]
host = login.ceto.npm.ac.uk
owner = {{REMOTE_USER}}
[[transfer_to_remote]]
inherit = remote_job
[[transfer_from_remote]]
inherit = remote_job
[[regrid_domain_ceto]]
inherit = slurm_job
{% else %}
[[regrid_domain]]
{% endif %}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment