"""Module controlling the writing of ParticleSets to NetCDF file"""
import os
import random
import shutil
import string
from glob import glob
import netCDF4
import numpy as np
try:
from mpi4py import MPI
except:
MPI = None
try:
from parcels._version import version as parcels_version
except:
raise EnvironmentError('Parcels version can not be retrieved. Have you run ''python setup.py install''?')
try:
from os import getuid
except:
# Windows does not have getuid(), so define to simply return 'tmp'
def getuid():
return 'tmp'
__all__ = ['ParticleFile']
def _set_calendar(origin_calendar):
if origin_calendar == 'np_datetime64':
return 'standard'
else:
return origin_calendar
[docs]class ParticleFile(object):
"""Initialise trajectory output.
:param name: Basename of the output file
:param particleset: ParticleSet to output
:param outputdt: Interval which dictates the update frequency of file output
while ParticleFile is given as an argument of ParticleSet.execute()
It is either a timedelta object or a positive double.
:param write_ondelete: Boolean to write particle data only when they are deleted. Default is False
:param convert_at_end: Boolean to convert npy files to netcdf at end of run. Default is True
:param tempwritedir: directories to write temporary files to during executing.
Default is out-XXXXXX where Xs are random capitals. Files for individual
processors are written to subdirectories 0, 1, 2 etc under tempwritedir
:param pset_info: dictionary of info on the ParticleSet, stored in tempwritedir/XX/pset_info.npy,
used to create NetCDF file from npy-files.
"""
def __init__(self, name, particleset, outputdt=np.infty, write_ondelete=False, convert_at_end=True,
tempwritedir=None, pset_info=None):
self.write_ondelete = write_ondelete
self.convert_at_end = convert_at_end
self.outputdt = outputdt
self.lasttime_written = None # variable to check if time has been written already
self.dataset = None
self.metadata = {}
if pset_info is not None:
for v in pset_info.keys():
setattr(self, v, pset_info[v])
else:
self.name = name
self.particleset = particleset
self.parcels_mesh = self.particleset.fieldset.gridset.grids[0].mesh
self.time_origin = self.particleset.time_origin
self.lonlatdepth_dtype = self.particleset.collection.lonlatdepth_dtype
self.var_names = []
self.var_names_once = []
for v in self.particleset.collection.ptype.variables:
if v.to_write == 'once':
self.var_names_once += [v.name]
elif v.to_write is True:
self.var_names += [v.name]
if len(self.var_names_once) > 0:
self.written_once = []
self.file_list_once = []
self.file_list = []
self.time_written = []
self.maxid_written = -1
if tempwritedir is None:
tempwritedir = os.path.join(os.path.dirname(str(self.name)), "out-%s"
% ''.join(random.choice(string.ascii_uppercase) for _ in range(8)))
if MPI:
mpi_rank = MPI.COMM_WORLD.Get_rank()
self.tempwritedir_base = MPI.COMM_WORLD.bcast(tempwritedir, root=0)
else:
self.tempwritedir_base = tempwritedir
mpi_rank = 0
self.tempwritedir = os.path.join(self.tempwritedir_base, "%d" % mpi_rank)
if not os.path.exists(self.tempwritedir):
os.makedirs(self.tempwritedir)
elif pset_info is None:
raise IOError("output directory %s already exists. Please remove first." % self.tempwritedir)
[docs] def open_netcdf_file(self, data_shape):
"""Initialise NetCDF4.Dataset for trajectory output.
The output follows the format outlined in the Discrete Sampling Geometries
section of the CF-conventions:
http://cfconventions.org/cf-conventions/v1.6.0/cf-conventions.html#discrete-sampling-geometries
The current implementation is based on the NCEI template:
http://www.nodc.noaa.gov/data/formats/netcdf/v2.0/trajectoryIncomplete.cdl
:param data_shape: shape of the variables in the NetCDF4 file
"""
extension = os.path.splitext(str(self.name))[1]
fname = self.name if extension in ['.nc', '.nc4'] else "%s.nc" % self.name
if os.path.exists(str(fname)):
os.remove(str(fname))
self.dataset = netCDF4.Dataset(fname, "w", format="NETCDF4")
self.dataset.createDimension("obs", data_shape[1])
self.dataset.createDimension("traj", data_shape[0])
coords = ("traj", "obs")
self.dataset.feature_type = "trajectory"
self.dataset.Conventions = "CF-1.6/CF-1.7"
self.dataset.ncei_template_version = "NCEI_NetCDF_Trajectory_Template_v2.0"
self.dataset.parcels_version = parcels_version
self.dataset.parcels_mesh = self.parcels_mesh
# Create ID variable according to CF conventions
self.id = self.dataset.createVariable("trajectory", "i8", coords, fill_value=-2**(63)) # minint64 fill_value
self.id.long_name = "Unique identifier for each particle"
self.id.cf_role = "trajectory_id"
# Create time, lat, lon and z variables according to CF conventions:
self.time = self.dataset.createVariable("time", "f8", coords, fill_value=np.nan)
self.time.long_name = ""
self.time.standard_name = "time"
if self.time_origin.calendar is None:
self.time.units = "seconds"
else:
self.time.units = "seconds since " + str(self.time_origin)
self.time.calendar = 'standard' if self.time_origin.calendar == 'np_datetime64' else self.time_origin.calendar
self.time.axis = "T"
if self.lonlatdepth_dtype is np.float64:
lonlatdepth_precision = "f8"
else:
lonlatdepth_precision = "f4"
if ('lat' in self.var_names):
self.lat = self.dataset.createVariable("lat", lonlatdepth_precision, coords, fill_value=np.nan)
self.lat.long_name = ""
self.lat.standard_name = "latitude"
self.lat.units = "degrees_north"
self.lat.axis = "Y"
if ('lon' in self.var_names):
self.lon = self.dataset.createVariable("lon", lonlatdepth_precision, coords, fill_value=np.nan)
self.lon.long_name = ""
self.lon.standard_name = "longitude"
self.lon.units = "degrees_east"
self.lon.axis = "X"
if ('depth' in self.var_names) or ('z' in self.var_names):
self.z = self.dataset.createVariable("z", lonlatdepth_precision, coords, fill_value=np.nan)
self.z.long_name = ""
self.z.standard_name = "depth"
self.z.units = "m"
self.z.positive = "down"
for vname in self.var_names:
if vname not in ['time', 'lat', 'lon', 'depth', 'id']:
setattr(self, vname, self.dataset.createVariable(vname, "f4", coords, fill_value=np.nan))
getattr(self, vname).long_name = ""
getattr(self, vname).standard_name = vname
getattr(self, vname).units = "unknown"
for vname in self.var_names_once:
setattr(self, vname, self.dataset.createVariable(vname, "f4", "traj", fill_value=np.nan))
getattr(self, vname).long_name = ""
getattr(self, vname).standard_name = vname
getattr(self, vname).units = "unknown"
for name, message in self.metadata.items():
setattr(self.dataset, name, message)
def __del__(self):
if self.convert_at_end:
self.close()
[docs] def close(self, delete_tempfiles=True):
"""Close the ParticleFile object by exporting and then deleting
the temporary npy files"""
self.export()
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
if mpi_rank == 0:
if delete_tempfiles:
self.delete_tempwritedir(tempwritedir=self.tempwritedir_base)
self.convert_at_end = False
[docs] def add_metadata(self, name, message):
"""Add metadata to :class:`parcels.particleset.ParticleSet`
:param name: Name of the metadata variabale
:param message: message to be written
"""
if self.dataset is None:
self.metadata[name] = message
else:
setattr(self.dataset, name, message)
[docs] def dump_dict_to_npy(self, data_dict, data_dict_once):
"""Buffer data to set of temporary numpy files, using np.save"""
if len(data_dict) > 0:
tmpfilename = os.path.join(self.tempwritedir, str(len(self.file_list)) + ".npy")
with open(tmpfilename, 'wb') as f:
np.save(f, data_dict)
self.file_list.append(tmpfilename)
if len(data_dict_once) > 0:
tmpfilename = os.path.join(self.tempwritedir, str(len(self.file_list)) + '_once.npy')
with open(tmpfilename, 'wb') as f:
np.save(f, data_dict_once)
self.file_list_once.append(tmpfilename)
def dump_psetinfo_to_npy(self):
pset_info = {}
attrs_to_dump = ['name', 'var_names', 'var_names_once', 'time_origin', 'lonlatdepth_dtype',
'file_list', 'file_list_once', 'maxid_written', 'time_written', 'parcels_mesh',
'metadata']
for a in attrs_to_dump:
if hasattr(self, a):
pset_info[a] = getattr(self, a)
with open(os.path.join(self.tempwritedir, 'pset_info.npy'), 'wb') as f:
np.save(f, pset_info)
[docs] def write(self, pset, time, deleted_only=False):
"""Write all data from one time step to a temporary npy-file
using a python dictionary. The data is saved in the folder 'out'.
:param pset: ParticleSet object to write
:param time: Time at which to write ParticleSet
:param deleted_only: Flag to write only the deleted Particles
"""
data_dict, data_dict_once = pset.to_dict(self, time, deleted_only=deleted_only)
self.dump_dict_to_npy(data_dict, data_dict_once)
self.dump_psetinfo_to_npy()
[docs] def read_from_npy(self, file_list, time_steps, var):
"""Read NPY-files for one variable using a loop over all files.
:param file_list: List that contains all file names in the output directory
:param time_steps: Number of time steps that were written in out directory
:param var: name of the variable to read
"""
data = np.nan * np.zeros((self.maxid_written+1, time_steps))
time_index = np.zeros(self.maxid_written+1, dtype=np.int64)
t_ind_used = np.zeros(time_steps, dtype=np.int64)
# loop over all files
for npyfile in file_list:
try:
data_dict = np.load(npyfile, allow_pickle=True).item()
except NameError:
raise RuntimeError('Cannot combine npy files into netcdf file because your ParticleFile is '
'still open on interpreter shutdown.\nYou can use '
'"parcels_convert_npydir_to_netcdf %s" to convert these to '
'a NetCDF file yourself.\nTo avoid this error, make sure you '
'close() your ParticleFile at the end of your script.' % self.tempwritedir)
id_ind = np.array(data_dict["id"], dtype=np.int64)
t_ind = time_index[id_ind] if 'once' not in file_list[0] else 0
t_ind_used[t_ind] = 1
data[id_ind, t_ind] = data_dict[var]
time_index[id_ind] = time_index[id_ind] + 1
# remove rows and columns that are completely filled with nan values
tmp = data[time_index > 0, :]
return tmp[:, t_ind_used == 1]
[docs] def export(self):
"""Exports outputs in temporary NPY-files to NetCDF file"""
if MPI:
# The export can only start when all threads are done.
MPI.COMM_WORLD.Barrier()
if MPI.COMM_WORLD.Get_rank() > 0:
return # export only on threat 0
# Retrieve all temporary writing directories and sort them in numerical order
temp_names = sorted(glob(os.path.join("%s" % self.tempwritedir_base, "*")),
key=lambda x: int(os.path.basename(x)))
if len(temp_names) == 0:
raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)
global_maxid_written = -1
global_time_written = []
global_file_list = []
if len(self.var_names_once) > 0:
global_file_list_once = []
for tempwritedir in temp_names:
if os.path.exists(tempwritedir):
pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
global_time_written += pset_info_local['time_written']
global_file_list += pset_info_local['file_list']
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']
self.maxid_written = global_maxid_written
self.time_written = np.unique(global_time_written)
for var in self.var_names:
data = self.read_from_npy(global_file_list, len(self.time_written), var)
if var == self.var_names[0]:
self.open_netcdf_file(data.shape)
varout = 'z' if var == 'depth' else var
getattr(self, varout)[:, :] = data
if len(self.var_names_once) > 0:
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var)
self.dataset.close()
[docs] def delete_tempwritedir(self, tempwritedir=None):
"""Deleted all temporary npy files
:param tempwritedir Optional path of the directory to delete
"""
if tempwritedir is None:
tempwritedir = self.tempwritedir
if os.path.exists(tempwritedir):
shutil.rmtree(tempwritedir)