Source code for openpathsampling.pathsimulators.reactive_flux

import logging
import numpy as np
import openpathsampling as paths

logger = logging.getLogger(__name__)
from .shoot_snapshots import ShootFromSnapshotsSimulation

[docs] class ReactiveFluxSimulation(ShootFromSnapshotsSimulation): """ Reactive Flux simulations (effective positive flux). Parameters ---------- storage : :class:`.Storage` the file to store simulations in engine : :class:`.DynamicsEngine` the dynamics engine to use to run the simulation states : list of :class:`.Volume` the volumes representing the stable states, first state A then state B randomizer : :class:`.SnapshotModifier` the method used to modify the input snapshot before each shot initial_snapshots : list of :class:`.Snapshot` initial snapshots to use rc : :class:`.CollectiveVariable` reaction coordinate """
[docs] def __init__(self, storage, engine=None, states=None, randomizer=None, initial_snapshots=None, rc=None): # state definition self.states = states state_A = states[0] state_B = states[1] # get min/max reaction coordinate of initial snapshots self.rc = rc rc_array = np.array(self.rc(initial_snapshots)) rc_min = np.nextafter(rc_array.min(), -np.inf) rc_max = np.nextafter(rc_array.max(), np.inf) # define reaction coordinate region of initial snapshots # = starting_volume self.dividing_surface = paths.CVDefinedVolume(self.rc, rc_min, rc_max) # define volume between state A and the dividing surface (including A) self.volume_towards_A = paths.CVDefinedVolume(self.rc, -np.inf, rc_max) # shoot backward until we hit A but never cross the dividing surface backward_ensemble = paths.SequentialEnsemble([ paths.AllInXEnsemble(state_A) & paths.LengthEnsemble(1), paths.AllInXEnsemble(self.volume_towards_A - state_A) ]) # shoot forward until we hit state B without hitting A first # caution: since the mover will consist of backward and forward # shoot in sequence, the starting ensemble for the forward # shoot is the output of the backward shoot, i.e. a # trajectory that runs from A to the dividing surface and # not just a point there. forward_ensemble = paths.SequentialEnsemble([ paths.AllInXEnsemble(state_A) & paths.LengthEnsemble(1), paths.AllOutXEnsemble(state_A | state_B), paths.AllInXEnsemble(state_B) & paths.LengthEnsemble(1), ]) super(ReactiveFluxSimulation, self).__init__( storage=storage, engine=engine, starting_volume=self.dividing_surface, forward_ensemble=forward_ensemble, backward_ensemble=backward_ensemble, randomizer=randomizer, initial_snapshots=initial_snapshots ) # create backward mover (starting from single point) self.backward_mover = paths.BackwardExtendMover( ensemble=self.starting_ensemble, target_ensemble=self.backward_ensemble ) # create forward mover (starting from the backward ensemble) self.forward_mover = paths.ForwardExtendMover( ensemble=self.backward_ensemble, target_ensemble=self.forward_ensemble ) # create mover combining forward and backward shooting, # abort if backward mover fails self.mover = paths.NonCanonicalConditionalSequentialMover([ self.backward_mover, self.forward_mover ])
def to_dict(self): ret_dict = { 'states' : self.states, 'dividing_surface' : self.dividing_surface, 'volume_towards_A' : self.volume_towards_A, 'rc' : self.rc } return ret_dict @classmethod def from_dict(cls, dct): rf = cls.__new__(cls) # replace automatically created attributes with stored ones rf.states = dct['states'] rf.dividing_surface = dct['dividing_surface'] rf.volume_towards_A = dct['volume_towards_A'] rf.rc = dct['rc'] return rf