Source code for openpathsampling.engines.dynamics_engine

"""
Created on 01.07.2014

@author JDC Chodera
@author: JH Prinz
"""

import logging
import sys

from openpathsampling.netcdfplus import StorableNamedObject
from openpathsampling.integration_tools import is_simtk_unit_type

from .snapshot import BaseSnapshot
from .trajectory import Trajectory

from .delayedinterrupt import DelayedInterrupt

logger = logging.getLogger(__name__)

if sys.version_info > (3, ):
    basestring = str

# =============================================================================
# SOURCE CONTROL
# =============================================================================

__version__ = "$Id: NoName.py 1 2014-07-06 07:47:29Z jprinz $"


# =============================================================================
# Base dynamics engine class
# =============================================================================

class EngineError(Exception):
    def __init__(self, message, last_trajectory):
        # Call the base class constructor with the parameters it needs
        super(EngineError, self).__init__(message)

        # Now for your custom code...
        self.last_trajectory = last_trajectory


class EngineMaxLengthError(EngineError):
    pass


class EngineNaNError(EngineError):
    pass


[docs] class DynamicsEngine(StorableNamedObject): """ Wraps simulation tool (parameters, storage, etc.) Attributes ---------- on_nan : str set the behaviour of the engine when `NaN` is detected. Possible is 1. `fail` will raise an exception `EngineNaNError` 2. `retry` will rerun the trajectory in engine.generate, these moves do not satisfy detailed balance on_error : str set the behaviour of the engine when an exception happens. Possible is 1. `fail` will raise an exception `EngineError` 2. `retry` will rerun the trajectory in engine.generate, these moves do not satisfy detailed balance on_max_length : str set the behaviour if the trajectory length is `n_frames_max`. If `n_frames_max == 0` this will be ignored and nothing happens. Possible is 1. `fail` will raise an exception `EngineMaxLengthError` 2. `stop` will stop and return the max length trajectory (default) 3. `retry` will rerun the trajectory in engine.generate, these moves do not satisfy detailed balance retries_when_nan : int, default: 2 the number of retries (if chosen) before an exception is raised retries_when_error : int, default: 2 the number of retries (if chosen) before an exception is raised retries_when_max_length : int, default: 0 the number of retries (if chosen) before an exception is raised on_retry : str or callable the behaviour when a try is started. Since you have already generated some trajectory you might not restart completely. Possibilities are 1. `full` will restart completely and use the initial frames (default) 2. `keep_half` will cut the existing in half but keeping at least the initial 3. `remove_interval` will remove as many frames as the `interval` 4. a callable will be used as a function to generate the new from the old trajectories, e.g. `lambda t: t[:10]` would restart with the first 10 frames Notes ----- Should be considered an abstract class: only its subclasses can be instantiated. """ FORWARD = 1 BACKWARD = -1 _default_options = { 'n_frames_max': None, 'on_max_length': 'fail', 'on_nan': 'fail', 'retries_when_nan': 2, 'retries_when_error': 0, 'retries_when_max_length': 0, 'on_retry': 'full', 'on_error': 'fail' } #units = { #'length': u.Unit({}), #'velocity': u.Unit({}), #'energy': u.Unit({}) #} units = { 'length': None, 'velocity': None, 'energy': None } base_snapshot_type = BaseSnapshot clear_snapshot_cache = False
[docs] def __init__(self, options=None, descriptor=None): """ Create an empty DynamicsEngine object Notes ----- The purpose of an engine is to create trajectories and keep track of the results. The main method is 'generate' to create a trajectory, which is a list of snapshots and then can store the in the associated storage. In the initialization this storage is created as well as the related Trajectory and Snapshot classes are initialized. """ super(DynamicsEngine, self).__init__() self.descriptor = descriptor self._check_options(options) self.interrupter = DelayedInterrupt
@property def current_snapshot(self): return None @current_snapshot.setter def current_snapshot(self, snap): pass def to_dict(self): return { 'options': self.options, 'descriptor': self.descriptor } def _check_options(self, options=None): """ This will register all variables in the options dict as a member variable if they are present in either the `DynamicsEngine.default_options` or this classes default_options, no multiple inheritance is supported! It will use values with the priority in the following order - DynamicsEngine.default_options - self.default_options - self.options (usually not used) - options (function parameter) Parameters are only registered if 1. the variable name is present in the defaults 2. the type matches the one in the defaults 3. for variables with units also the units need to be compatible Parameters ---------- options : dict of { str : value } A dictionary Notes ----- Options are what is necessary to recreate the engine, but not runtime variables or independent variables like the actual initialization status, the runners or an attached storage. If there are non-default options present they will be ignored (no error thrown) """ # start with default options from a dynamics engine my_options = {} okay_options = {} # self.default_options overrides default ones from DynamicsEngine for variable, value in self.default_options.items(): my_options[variable] = value if hasattr(self, 'options') and self.options is not None: # self.options overrides default ones for variable, value in self.options.items(): my_options[variable] = value if options is not None: # given options override even default and already stored ones for variable, value in options.items(): my_options[variable] = value if my_options is not None: for variable, default_value in self.default_options.items(): # create an empty member variable if not yet present if not hasattr(self, variable): okay_options[variable] = None if variable in my_options: if type(my_options[variable]) is type(default_value): #if type(my_options[variable]) is u.Unit: if is_simtk_unit_type(my_options[variable]): if my_options[variable].unit.is_compatible( default_value): okay_options[variable] = my_options[variable] else: raise ValueError( 'Unit of option "' + str(variable) + '" (' + str(my_options[variable].unit) + ') not compatible to "' + str(default_value.unit) + '"') elif type(my_options[variable]) is list: if isinstance( my_options[variable][0], type(default_value[0])): okay_options[variable] = my_options[variable] else: raise \ ValueError( 'List elements for option "' + str(variable) + '" must be of type "' + str(type(default_value[0])) + '"') else: okay_options[variable] = my_options[variable] elif isinstance(my_options[variable], type(default_value)): okay_options[variable] = my_options[variable] elif isinstance(my_options[variable], basestring) \ and isinstance(default_value, basestring): okay_options[variable] = my_options[variable] elif default_value is None: okay_options[variable] = my_options[variable] else: raise ValueError( 'Type of option "' + str(variable) + '" (' + str(type(my_options[variable])) + ') is not "' + str(type(default_value)) + '"') self.options = okay_options else: self.options = {} def __getattr__(self, item): # first, check for errors that might be shadowed in properties if item in self.__class__.__dict__: # we should have this attribute p = self.__class__.__dict__[item] if isinstance(p, property): # re-run, raise the error inside the property try: result = p.fget(self) except: raise else: # alternately, trust the fixed result with # return result # miraculously fixed raise AttributeError( "Unknown problem occurred in property" + str(p.fget.__name__) + ": Second attempt returned" + str(result) ) # for now, items in dict that fail with AttributeError will just # give the default message; to change, add something here like: # raise AttributeError("Something went wrong with " + str(item)) # see, if the attribute is actually a dimension if self.descriptor is not None: if item in self.descriptor.dimensions: return self.descriptor.dimensions[item] # fallback is to look for an option and return it's value try: # extra step here seems to avoid recursion problem in Py3 option_dict = object.__getattribute__(self, 'options') return option_dict[item] except KeyError: # convert KeyError to AttributeError default_msg = "'{0}' has no attribute '{1}'" raise AttributeError( (default_msg + ", nor does its options dictionary").format( self.__class__.__name__, item ) ) @property def dimensions(self): if self.descriptor is None: return {} else: return self.descriptor.dimensions def set_as_default(self): import openpathsampling as p p.EngineMover.engine = self @property def default_options(self): default_options = {} default_options.update(DynamicsEngine._default_options) default_options.update(self._default_options) return default_options # def strip_units(self, item): # """Remove units and set in the standard unit set for this engine. # Each engine needs to know how to do its own unit system. The default # assumes there is no unit system. # Parameters # ---------- # item : object with units # the input with units # Returns # ------- # float or iterable # the result without units, in the engine's specific unit system # """ # return item def start(self, snapshot=None): if snapshot is not None: self.current_snapshot = snapshot def stop(self, trajectory): """Nothing special needs to be done for direct-control simulations when you hit a stop condition.""" pass def stop_conditions(self, trajectory, continue_conditions=None, trusted=True): """ Test whether we can continue; called by generate a couple of times, so the logic is separated here. Parameters ---------- trajectory : :class:`openpathsampling.trajectory.Trajectory` the trajectory we've generated so far continue_conditions : (list of) function(Trajectory) callable function of a 'Trajectory' that returns True or False. If one of these returns False the simulation is stopped. trusted : bool If `True` (default) the stopping conditions are evaluated as trusted. Returns ------- bool true if the dynamics should be stopped; false otherwise """ stop = False if callable(continue_conditions): continue_conditions = [continue_conditions] for condition in continue_conditions: stop = (not condition(trajectory, trusted)) or stop # TODO: Consider short-circuit logic (uncomment code below). # Pros: short circuit will be faster; avoid wasted effort. # Cons: there may be desired side effects from not shorting. # Current thought: No short circuit means that CVs may be # calculated as part of stop conditions here, so keep as is. # (NB: ensembles/volumes use short-circuit logic, so there isn't # really a promise that CVs get calculated.) # if stop: # return True return stop def generate(self, snapshot, running=None, direction=+1): r""" Generate a trajectory consisting of ntau segments of tau_steps in between storage of Snapshots. Parameters ---------- snapshot : :class:`.Snapshot` initial coordinates and velocities in form of a Snapshot object running : (list of) function(:class:`.Trajectory`) callable function of a 'Trajectory' that returns True or False. If one of these returns False the simulation is stopped. direction : -1 or +1 (DynamicsEngine.FORWARD or DynamicsEngine.BACKWARD) If +1 then this will integrate forward, if -1 it will reversed the momenta of the given snapshot and then prepending generated snapshots with reversed momenta. This will generate a _reversed_ trajectory that effectively ends in the initial snapshot Returns ------- trajectory : :class:`.Trajectory` generated trajectory of initial conditions, including initial coordinate set Notes ----- If the returned trajectory has length n_frames_max it can still happen that it stopped because of the stopping criterion. You need to check in that case. """ trajectory = None it = self.iter_generate( snapshot, running, direction, intervals=0, max_length=self.options['n_frames_max']) for trajectory in it: pass return trajectory def iter_generate(self, initial, running=None, direction=+1, intervals=10, max_length=0): r""" Return a generator that will generate a trajectory, returning the current trajectory in given intervals Parameters ---------- initial : :class:`.Snapshot` or :class:`.Trajectory` initial coordinates and velocities in form of a Snapshot object or a trajectory running : (list of) function(:class:`.Trajectory`) callable function of a 'Trajectory' that returns True or False. If one of these returns False the simulation is stopped. direction : -1 or +1 (DynamicsEngine.FORWARD or DynamicsEngine.BACKWARD) If +1 then this will integrate forward, if -1 it will reversed the momenta of the given snapshot and then prepending generated snapshots with reversed momenta. This will generate a _reversed_ trajectory that effectively ends in the initial snapshot intervals : int number steps after which the current status is returned. If `0` it will run until the end or a keyboard interrupt is detected max_length : int will limit the simulation length to a number of steps. Default is `0` which will run unlimited Yields ------ trajectory : :class:`openpathsampling.trajectory.Trajectory` generated trajectory of initial conditions, including initial coordinate set Notes ----- If the returned trajectory has length n_frames_max it can still happen that it stopped because of the stopping criterion. You need to check in that case. """ if direction == 0: raise RuntimeError( 'direction must be positive (FORWARD) or negative (BACKWARD).') try: iter(running) except TypeError: running = [running] if hasattr(initial, '__iter__'): initial = Trajectory(initial) else: initial = Trajectory([initial]) valid = False attempt_nan = 0 attempt_error = 0 attempt_max_length = 0 trajectory = initial final_error = None errors = [] while not valid and final_error is None: if attempt_nan + attempt_error > 1: # let's get a new initial trajectory the way the user wants to if self.on_retry == 'full': trajectory = initial elif self.on_retry == 'remove_interval': trajectory = \ trajectory[:max( len(initial), len(trajectory) - intervals)] elif self.on_retry == 'keep_half': trajectory = \ trajectory[:min( int(len(trajectory) * 0.9), max( len(initial), int(len(trajectory) / 2)))] elif hasattr(self.on_retry, '__call__'): trajectory = self.on_retry(trajectory) """ Case of run dying before first output""" if len(trajectory) >= 1: if direction > 0: self.current_snapshot = trajectory[-1] elif direction < 0: # backward simulation needs reversed snapshots self.current_snapshot = trajectory[0].reversed logger.info("Starting trajectory") self.start() frame = 0 # maybe we should stop before we even begin? stop = self.stop_conditions(trajectory=trajectory, continue_conditions=running, trusted=False) log_rate = 10 has_nan = False has_error = False snapshot = None # so it is in scope while not stop: if intervals > 0 and frame % intervals == 0: # return the current status logger.info("Through frame: %d", frame) yield trajectory elif frame % log_rate == 0: logger.info("Through frame: %d", frame) # Do integrator x steps self._clear_snapshot_cache(snapshot) # clear old snapshot snapshot = None try: with self.interrupter(): snapshot = self.generate_next_frame() # if self.on_nan != 'ignore' and \ if not self.is_valid_snapshot(snapshot): has_nan = True break except KeyboardInterrupt as e: # make sure we will report the last state for logger.info('Keyboard interrupt. Shutting down simulation') final_error = e break except: # any other error we start a retry e = sys.exc_info() errors.append(e) se = str(e).lower() if 'nan' in se and \ ('particle' in se or 'coordinates' in se): # this cannot be ignored because we cannot continue! has_nan = True break else: has_error = True break frame += 1 # Store snapshot and add it to the trajectory. # Stores also final frame the last time if direction > 0: trajectory.append(snapshot) elif direction < 0: trajectory.insert(0, snapshot.reversed) if 0 < max_length < len(trajectory): # hit the max length criterion on = self.on_max_length del trajectory[-1] if on == 'fail': final_error = EngineMaxLengthError( 'Hit maximal length of %d frames.' % self.options['n_frames_max'], trajectory ) break elif on == 'stop': logger.info('Trajectory hit max length. Stopping.') # fail gracefully stop = True elif on == 'retry': attempt_max_length += 1 if attempt_max_length > self.retries_when_max_length: if self.on_nan == 'fail': final_error = EngineMaxLengthError( 'Failed to generate trajectory without ' 'hitting max length after %d attempts' % attempt_max_length, trajectory) break if stop is False: # Check if we should stop. If not, continue simulation stop = self.stop_conditions(trajectory=trajectory, continue_conditions=running) # check what to do if stop is True if has_nan: on = self.on_nan if on == 'fail': final_error = EngineNaNError( '`nan` in snapshot', trajectory) elif on == 'retry': attempt_nan += 1 if attempt_nan > self.retries_when_nan: final_error = EngineNaNError( 'Failed to generate trajectory without `nan` ' 'after %d attempts' % attempt_error, trajectory) elif has_error: on = self.on_nan if on == 'fail': final_error = errors[-1][1] del errors[-1] elif on == 'retry': attempt_error += 1 if attempt_error > self.retries_when_error: final_error = EngineError( 'Failed to generate trajectory without `nan` ' 'after %d attempts' % attempt_error, trajectory) elif stop: valid = True self.stop(trajectory) if errors: logger.info('Errors occurred during generation :') for no, e in enumerate(errors): logger.info('[#%d] %s' % (no, repr(e[1]))) if final_error is not None: yield trajectory logger.info("Through frame: %d", len(trajectory)) self._clear_snapshot_cache(snapshot) raise final_error logger.info("Finished trajectory, length: %d", len(trajectory)) yield trajectory self._clear_snapshot_cache(snapshot) def _clear_snapshot_cache(self, snapshot): if self.clear_snapshot_cache and snapshot is not None: clear_cache = getattr(snapshot, 'clear_cache', lambda: None) clear_cache() def generate_next_frame(self): raise NotImplementedError('Next frame generation must be implemented!') def generate_n_frames(self, n_frames=1): """Generates n_frames, from but not including the current snapshot. This generates a fixed number of frames at once. If you desire the reversed trajectory, you can reverse the returned trajectory. Parameters ---------- n_frames : integer number of frames to generate Returns ------- paths.Trajectory() the `n_frames` of the trajectory following (and not including) the initial `current_snapshot` """ self.start() traj = Trajectory([self.generate_next_frame() for i in range(n_frames)]) self.stop(traj) return traj @staticmethod def is_valid_snapshot(snapshot): """ Test the snapshot to be valid. Usually not containing nan Returns ------- bool : True returns `True` if the snapshot is okay to be used """ return True @classmethod def check_snapshot_type(cls, snapshot): if not isinstance(snapshot, cls.base_snapshot_type): logger.warning( ('This engine is intended for "%s" and derived classes. ' 'You are using "%s". Make sure that this is intended.') % (cls.base_snapshot_type.__name__, snapshot.__class__.__name__) )
class NoEngine(DynamicsEngine): _default_options = {} def __init__(self, descriptor): super(NoEngine, self).__init__() self.descriptor = descriptor def generate_next_frame(self): pass