Source code for parcels.particlesets.baseparticleset

import numpy as np
from abc import ABC
from abc import abstractmethod
from datetime import datetime
from datetime import timedelta as delta
from os import path
import time as time_module
import cftime

import progressbar

from parcels.tools.statuscodes import StateCode
from parcels.tools.global_statics import get_package_dir
from parcels.compilation.codecompiler import GNUCompiler
from parcels.field import NestedField
from parcels.field import SummedField
from parcels.kernels.advection import AdvectionRK4
from parcels.kernel import Kernel
from parcels.tools.loggers import logger


[docs]class NDCluster(ABC): """Interface."""
[docs]class BaseParticleSet(NDCluster): """Base ParticleSet.""" _collection = None kernel = None fieldset = None time_origin = None repeat_starttime = None repeatlon = None repeatlat = None repeatdepth = None repeatpclass = None repeatkwargs = None def __init__(self, fieldset=None, pclass=None, lon=None, lat=None, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None, pid_orig=None, **kwargs): self._collection = None self.repeat_starttime = None self.repeatlon = None self.repeatlat = None self.repeatdepth = None self.repeatpclass = None self.repeatkwargs = None self.kernel = None self.fieldset = None self.time_origin = None def __iter__(self): """Allows for more intuitive iteration over a particleset, while in reality iterating over the particles in the collection. """ return iter(self._collection) def __getattr__(self, name): """ Access a single property of all particles. :param name: name of the property """ for v in self._collection.ptype.variables: if v.name == name: return getattr(self._collection, name) if name in self.__dict__ and name[0] != '_': return self.__dict__[name] else: return False @staticmethod def lonlatdepth_dtype_from_field_interp_method(field): if type(field) in [SummedField, NestedField]: for f in field: if f.interp_method == 'cgrid_velocity': return np.float64 else: if field.interp_method == 'cgrid_velocity': return np.float64 return np.float32 @property def collection(self): return self._collection
[docs] @abstractmethod def cstruct(self): """ 'cstruct' returns the ctypes mapping of the combined collections cstruct and the fieldset cstruct. This depends on the specific structure in question. """ pass
def __create_progressbar(self, starttime, endtime): pbar = None try: pbar = progressbar.ProgressBar(max_value=abs(endtime - starttime)).start() except: # for old versions of progressbar try: pbar = progressbar.ProgressBar(maxvalue=abs(endtime - starttime)).start() except: # for even older OR newer versions pbar = progressbar.ProgressBar(maxval=abs(endtime - starttime)).start() return pbar
[docs] @classmethod def from_list(cls, fieldset, pclass, lon, lat, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None, **kwargs): """Initialise the ParticleSet from lists of lon and lat :param fieldset: :mod:`parcels.fieldset.FieldSet` object from which to sample velocity :param pclass: mod:`parcels.particle.JITParticle` or :mod:`parcels.particle.ScipyParticle` object that defines custom particle :param lon: List of initial longitude values for particles :param lat: List of initial latitude values for particles :param depth: Optional list of initial depth values for particles. Default is 0m :param time: Optional list of start time values for particles. Default is fieldset.U.time[0] :param repeatdt: Optional interval (in seconds) on which to repeat the release of the ParticleSet :param lonlatdepth_dtype: Floating precision for lon, lat, depth particle coordinates. It is either np.float32 or np.float64. Default is np.float32 if fieldset.U.interp_method is 'linear' and np.float64 if the interpolation method is 'cgrid_velocity' Other Variables can be initialised using further arguments (e.g. v=... for a Variable named 'v') """ return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs)
[docs] @classmethod def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None): """Initialise the ParticleSet from start/finish coordinates with equidistant spacing Note that this method uses simple numpy.linspace calls and does not take into account great circles, so may not be a exact on a globe :param fieldset: :mod:`parcels.fieldset.FieldSet` object from which to sample velocity :param pclass: mod:`parcels.particle.JITParticle` or :mod:`parcels.particle.ScipyParticle` object that defines custom particle :param start: Starting point for initialisation of particles on a straight line. :param finish: End point for initialisation of particles on a straight line. :param size: Initial size of particle set :param depth: Optional list of initial depth values for particles. Default is 0m :param time: Optional start time value for particles. Default is fieldset.U.time[0] :param repeatdt: Optional interval (in seconds) on which to repeat the release of the ParticleSet :param lonlatdepth_dtype: Floating precision for lon, lat, depth particle coordinates. It is either np.float32 or np.float64. Default is np.float32 if fieldset.U.interp_method is 'linear' and np.float64 if the interpolation method is 'cgrid_velocity' """ lon = np.linspace(start[0], finish[0], size) lat = np.linspace(start[1], finish[1], size) if type(depth) in [int, float]: depth = [depth] * size return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype)
[docs] @classmethod @abstractmethod def monte_carlo_sample(cls, start_field, size, mode='monte_carlo'): """ Converts a starting field into a monte-carlo sample of lons and lats. :param start_field: :mod:`parcels.fieldset.Field` object for initialising particles stochastically (horizontally) according to the presented density field. returns list(lon), list(lat) """ pass
[docs] @classmethod def from_field(cls, fieldset, pclass, start_field, size, mode='monte_carlo', depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None): """Initialise the ParticleSet randomly drawn according to distribution from a field :param fieldset: :mod:`parcels.fieldset.FieldSet` object from which to sample velocity :param pclass: mod:`parcels.particle.JITParticle` or :mod:`parcels.particle.ScipyParticle` object that defines custom particle :param start_field: Field for initialising particles stochastically (horizontally) according to the presented density field. :param size: Initial size of particle set :param mode: Type of random sampling. Currently only 'monte_carlo' is implemented :param depth: Optional list of initial depth values for particles. Default is 0m :param time: Optional start time value for particles. Default is fieldset.U.time[0] :param repeatdt: Optional interval (in seconds) on which to repeat the release of the ParticleSet :param lonlatdepth_dtype: Floating precision for lon, lat, depth particle coordinates. It is either np.float32 or np.float64. Default is np.float32 if fieldset.U.interp_method is 'linear' and np.float64 if the interpolation method is 'cgrid_velocity' """ lon, lat = cls.monte_carlo_sample(start_field, mode) return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, lonlatdepth_dtype=lonlatdepth_dtype, repeatdt=repeatdt)
[docs] @classmethod @abstractmethod def from_particlefile(cls, fieldset, pclass, filename, restart=True, restarttime=None, repeatdt=None, lonlatdepth_dtype=None, **kwargs): """Initialise the ParticleSet from a netcdf ParticleFile. This creates a new ParticleSet based on locations of all particles written in a netcdf ParticleFile at a certain time. Particle IDs are preserved if restart=True :param fieldset: :mod:`parcels.fieldset.FieldSet` object from which to sample velocity :param pclass: mod:`parcels.particle.JITParticle` or :mod:`parcels.particle.ScipyParticle` object that defines custom particle :param filename: Name of the particlefile from which to read initial conditions :param restart: Boolean to signal if pset is used for a restart (default is True). In that case, Particle IDs are preserved. :param restarttime: time at which the Particles will be restarted. Default is the last time written. Alternatively, restarttime could be a time value (including np.datetime64) or a callable function such as np.nanmin. The last is useful when running with dt < 0. :param repeatdt: Optional interval (in seconds) on which to repeat the release of the ParticleSet :param lonlatdepth_dtype: Floating precision for lon, lat, depth particle coordinates. It is either np.float32 or np.float64. Default is np.float32 if fieldset.U.interp_method is 'linear' and np.float64 if the interpolation method is 'cgrid_velocity' """ pass
[docs] def show(self, with_particles=True, show_time=None, field=None, domain=None, projection=None, land=True, vmin=None, vmax=None, savefile=None, animation=False, **kwargs): """Method to 'show' a Parcels ParticleSet :param with_particles: Boolean whether to show particles :param show_time: Time at which to show the ParticleSet :param field: Field to plot under particles (either None, a Field object, or 'vector') :param domain: dictionary (with keys 'N', 'S', 'E', 'W') defining domain to show :param projection: type of cartopy projection to use (default PlateCarree) :param land: Boolean whether to show land. This is ignored for flat meshes :param vmin: minimum colour scale (only in single-plot mode) :param vmax: maximum colour scale (only in single-plot mode) :param savefile: Name of a file to save the plot to :param animation: Boolean whether result is a single plot, or an animation """
[docs] def density(self, field_name=None, particle_val=None, relative=False, area_scale=False): """Method to calculate the density of particles in a ParticleSet from their locations, through a 2D histogram. :param field: Optional :mod:`parcels.field.Field` object to calculate the histogram on. Default is `fieldset.U` :param particle_val: Optional numpy-array of values to weigh each particle with, or string name of particle variable to use weigh particles with. Default is None, resulting in a value of 1 for each particle :param relative: Boolean to control whether the density is scaled by the total weight of all particles. Default is False :param area_scale: Boolean to control whether the density is scaled by the area (in m^2) of each grid cell. Default is False """ pass
[docs] @abstractmethod def Kernel(self, pyfunc, c_include="", delete_cfiles=True): """Wrapper method to convert a `pyfunc` into a :class:`parcels.kernel.Kernel` object based on `fieldset` and `ptype` of the ParticleSet :param delete_cfiles: Boolean whether to delete the C-files after compilation in JIT mode (default is True) """ pass
[docs] @abstractmethod def ParticleFile(self, *args, **kwargs): """Wrapper method to initialise a :class:`parcels.particlefile.ParticleFile` object from the ParticleSet""" pass
@abstractmethod def _set_particle_vector(self, name, value): """Set attributes of all particles to new values. This is a fallback implementation, it might be slow. :param name: Name of the attribute (str). :param value: New value to set the attribute of the particles to. """ for p in self: setattr(p, name, value) @property @abstractmethod def error_particles(self): """Get an iterator over all particles that are in an error state. This is a fallback implementation, it might be slow. :return: Collection iterator over error particles. """ error_indices = [ i for i, p in enumerate(self) if p.state not in [StateCode.Success, StateCode.Evaluate]] return self.collection.get_multi_by_indices(indices=error_indices) @property def num_error_particles(self): """Get the number of particles that are in an error state.""" return len([True if p.state not in [StateCode.Success, StateCode.Evaluate] else None for p in self]) @abstractmethod def _impute_release_times(self, default): """Set attribute 'time' to default if encountering NaN values. This is a fallback implementation, it might be slow. :param default: Default release time. :return: Minimum and maximum release times. """ max_rt = None min_rt = None for p in self: if np.isnan(p.time): p.time = default if max_rt is None or max_rt < p.time: max_rt = p.time if min_rt is None or min_rt > p.time: min_rt = p.time return min_rt, max_rt
[docs] def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1., moviedt=None, recovery=None, output_file=None, movie_background_field=None, verbose_progress=None, postIterationCallbacks=None, callbackdt=None): """Execute a given kernel function over the particle set for multiple timesteps. Optionally also provide sub-timestepping for particle output. :param pyfunc: Kernel function to execute. This can be the name of a defined Python function or a :class:`parcels.kernel.Kernel` object. Kernels can be concatenated using the + operator :param endtime: End time for the timestepping loop. It is either a datetime object or a positive double. :param runtime: Length of the timestepping loop. Use instead of endtime. It is either a timedelta object or a positive double. :param dt: Timestep interval to be passed to the kernel. It is either a timedelta object or a double. Use a negative value for a backward-in-time simulation. :param moviedt: Interval for inner sub-timestepping (leap), which dictates the update frequency of animation. It is either a timedelta object or a positive double. None value means no animation. :param output_file: :mod:`parcels.particlefile.ParticleFile` object for particle output :param recovery: Dictionary with additional `:mod:parcels.tools.error` recovery kernels to allow custom recovery behaviour in case of kernel errors. :param movie_background_field: field plotted as background in the movie if moviedt is set. 'vector' shows the velocity as a vector field. :param verbose_progress: Boolean for providing a progress bar for the kernel execution loop. :param postIterationCallbacks: (Optional) Array of functions that are to be called after each iteration (post-process, non-Kernel) :param callbackdt: (Optional, in conjecture with 'postIterationCallbacks) timestep inverval to (latestly) interrupt the running kernel and invoke post-iteration callbacks from 'postIterationCallbacks' """ # check if pyfunc has changed since last compile. If so, recompile if self.kernel is None or (self.kernel.pyfunc is not pyfunc and self.kernel is not pyfunc): # Generate and store Kernel if isinstance(pyfunc, Kernel): self.kernel = pyfunc else: self.kernel = self.Kernel(pyfunc) # Prepare JIT kernel execution if self.collection.ptype.uses_jit: self.kernel.remove_lib() cppargs = ['-DDOUBLE_COORD_VARIABLES'] if self.collection.lonlatdepth_dtype else None self.kernel.compile(compiler=GNUCompiler(cppargs=cppargs, incdirs=[path.join(get_package_dir(), 'include'), "."])) self.kernel.load_lib() # Convert all time variables to seconds if isinstance(endtime, delta): raise RuntimeError('endtime must be either a datetime or a double') if isinstance(endtime, datetime): endtime = np.datetime64(endtime) elif isinstance(endtime, cftime.datetime): endtime = self.time_origin.reltime(endtime) if isinstance(endtime, np.datetime64): if self.time_origin.calendar is None: raise NotImplementedError('If fieldset.time_origin is not a date, execution endtime must be a double') endtime = self.time_origin.reltime(endtime) if isinstance(runtime, delta): runtime = runtime.total_seconds() if isinstance(dt, delta): dt = dt.total_seconds() outputdt = output_file.outputdt if output_file else np.infty if isinstance(outputdt, delta): outputdt = outputdt.total_seconds() if isinstance(moviedt, delta): moviedt = moviedt.total_seconds() if isinstance(callbackdt, delta): callbackdt = callbackdt.total_seconds() assert runtime is None or runtime >= 0, 'runtime must be positive' assert outputdt is None or outputdt >= 0, 'outputdt must be positive' assert moviedt is None or moviedt >= 0, 'moviedt must be positive' if runtime is not None and endtime is not None: raise RuntimeError('Only one of (endtime, runtime) can be specified') mintime, maxtime = self.fieldset.gridset.dimrange('time_full') if self.fieldset is not None else (0, 1) default_release_time = mintime if dt >= 0 else maxtime min_rt, max_rt = self._impute_release_times(default_release_time) # Derive _starttime and endtime from arguments or fieldset defaults _starttime = min_rt if dt >= 0 else max_rt if self.repeatdt is not None and self.repeat_starttime is None: self.repeat_starttime = _starttime if runtime is not None: endtime = _starttime + runtime * np.sign(dt) elif endtime is None: mintime, maxtime = self.fieldset.gridset.dimrange('time_full') endtime = maxtime if dt >= 0 else mintime execute_once = False if abs(endtime-_starttime) < 1e-5 or dt == 0 or runtime == 0: dt = 0 runtime = 0 endtime = _starttime logger.warning_once("dt or runtime are zero, or endtime is equal to Particle.time. " "The kernels will be executed once, without incrementing time") execute_once = True self._set_particle_vector('dt', dt) # First write output_file, because particles could have been added if output_file: output_file.write(self, _starttime) if moviedt: self.show(field=movie_background_field, show_time=_starttime, animation=True) if moviedt is None: moviedt = np.infty if callbackdt is None: interupt_dts = [np.infty, moviedt, outputdt] if self.repeatdt is not None: interupt_dts.append(self.repeatdt) callbackdt = np.min(np.array(interupt_dts)) time = _starttime if self.repeatdt: next_prelease = self.repeat_starttime + (abs(time - self.repeat_starttime) // self.repeatdt + 1) * self.repeatdt * np.sign(dt) else: next_prelease = np.infty if dt > 0 else - np.infty next_output = time + outputdt if dt > 0 else time - outputdt next_movie = time + moviedt if dt > 0 else time - moviedt next_callback = time + callbackdt if dt > 0 else time - callbackdt next_input = self.fieldset.computeTimeChunk(time, np.sign(dt)) if self.fieldset is not None else np.inf tol = 1e-12 if verbose_progress is None: walltime_start = time_module.time() if verbose_progress: pbar = self.__create_progressbar(_starttime, endtime) while (time < endtime and dt > 0) or (time > endtime and dt < 0) or dt == 0: if verbose_progress is None and time_module.time() - walltime_start > 10: # Showing progressbar if runtime > 10 seconds if output_file: logger.info('Temporary output files are stored in %s.' % output_file.tempwritedir_base) logger.info('You can use "parcels_convert_npydir_to_netcdf %s" to convert these ' 'to a NetCDF file during the run.' % output_file.tempwritedir_base) pbar = self.__create_progressbar(_starttime, endtime) verbose_progress = True if dt > 0: time = min(next_prelease, next_input, next_output, next_movie, next_callback, endtime) else: time = max(next_prelease, next_input, next_output, next_movie, next_callback, endtime) self.kernel.execute(self, endtime=time, dt=dt, recovery=recovery, output_file=output_file, execute_once=execute_once) if abs(time-next_prelease) < tol: pset_new = self.__class__( fieldset=self.fieldset, time=time, lon=self.repeatlon, lat=self.repeatlat, depth=self.repeatdepth, pclass=self.repeatpclass, lonlatdepth_dtype=self.collection.lonlatdepth_dtype, partitions=False, pid_orig=self.repeatpid, **self.repeatkwargs) for p in pset_new: p.dt = dt self.add(pset_new) next_prelease += self.repeatdt * np.sign(dt) if abs(time-next_output) < tol: if output_file: output_file.write(self, time) next_output += outputdt * np.sign(dt) if abs(time-next_movie) < tol: self.show(field=movie_background_field, show_time=time, animation=True) next_movie += moviedt * np.sign(dt) # ==== insert post-process here to also allow for memory clean-up via external func ==== # if abs(time-next_callback) < tol: if postIterationCallbacks is not None: for extFunc in postIterationCallbacks: extFunc() next_callback += callbackdt * np.sign(dt) if time != endtime: next_input = self.fieldset.computeTimeChunk(time, dt) if dt == 0: break if verbose_progress: pbar.update(abs(time - _starttime)) if output_file: output_file.write(self, time) if verbose_progress: pbar.finish()