import time
import sys
import logging
import numpy as np
import pandas as pd
from openpathsampling.netcdfplus import StorableNamedObject, StorableObject
import openpathsampling as paths
import openpathsampling.tools
import collections
from openpathsampling.pathmover import SubPathMover
from .ops_logging import initialization_logging
import abc
from future.utils import with_metaclass
logger = logging.getLogger(__name__)
init_log = logging.getLogger('openpathsampling.initialization')
# python 3 support
try:
xrange
except NameError:
xrange = range
class MCStep(StorableObject):
"""
A monte-carlo step in the main PathSimulation loop
It references all objects created and used in a MC step. The used mover,
and simulator as well as the initial and final sampleset, the step
number and the generated movechange.
Attributes
----------
simulation : PathSimulation
the running pathsimulation responsible for generating the step
mccycle : int
the step number counting from the root sampleset
previous : SampleSet
the initial (pre) sampleset
active : SampleSet
the final (post) sampleset
change : MoveChange
the movechange describing the transition from pre to post
"""
def __init__(self, simulation=None, mccycle=-1, previous=None,
active=None, change=None):
super(MCStep, self).__init__()
self.simulation = simulation
self.previous = previous
self.active = active
self.change = change
self.mccycle = mccycle
[docs]class PathSimulator(with_metaclass(abc.ABCMeta, StorableNamedObject)):
"""Abstract class for the "main" function of a simulation.
Parameters
----------
storage : :class:`.Storage`
Storage file for results
Attributes
----------
save_frequency : int
Results should be sync'd (saved to disk) after every
``save_frequency`` steps. Note: subclasses must directly implement
this, the attribute is just a placeholder.
output_stream : file
Subclasses should write output to this, allowing a standard way to
redirect any output.
allow_refresh : bool
Whether to allow the output to refresh an ipynb cell; default True.
This is likely to be overridden when a pathsimulator is wrapped in
another simulation.
"""
#__metaclass__ = abc.ABCMeta
calc_name = "PathSimulator"
_excluded_attr = ['sample_set', 'step', 'save_frequency',
'output_stream']
[docs] def __init__(self, storage):
super(PathSimulator, self).__init__()
self.storage = storage
# self.engine = engine
self.save_frequency = 1
self.step = 0
initialization_logging(
logger=init_log, obj=self,
entries=['storage']#, 'engine']
)
self.sample_set = None
self.output_stream = sys.stdout # user can change to file handler
self.allow_refresh = True
[docs] def sync_storage(self):
"""
Will sync all collective variables and the storage to disk
"""
if self.storage is not None:
self.storage.sync_all()
[docs] @abc.abstractmethod
def run(self, n_steps):
"""
Run the simulator for a number of steps
Parameters
----------
n_steps : int
number of step to be run
"""
pass
[docs] def save_initial_step(self):
"""
Save the initial state as an MCStep to the storage
"""
mcstep = MCStep(
simulation=self,
mccycle=self.step,
active=self.sample_set,
change=paths.AcceptedSampleMoveChange(self.sample_set.samples)
)
if self.storage is not None:
self.storage.steps.save(mcstep)
self.storage.sync_all()
class BootstrapPromotionMove(SubPathMover):
"""
Bootstrap promotion is the combination of an EnsembleHop (to the next
ensemble up) with incrementing the replica ID.
"""
def __init__(self, bias=None, shooters=None, ensembles=None):
"""
Parameters
----------
bias : None
not used yet, only for API consistency and later implementation
shooters : list of ShootingMovers
list of ShootingMovers for each ensemble
ensembles : list of Ensembles
list of ensembles the move should act on
Notes
-----
The bootstrapping will use the ensembles sequentially so it requires
that all ensembles have a reasonable overlab using shooting moves.
"""
self.shooters = shooters
self.bias = bias
self.ensembles = ensembles
initialization_logging(logger=init_log, obj=self,
entries=['bias', 'shooters', 'ensembles'])
ens_pairs = [[self.ensembles[i], self.ensembles[i+1]]
for i in range(len(self.ensembles)-1)]
# Bootstrapping sets numeric replica IDs. If the user wants it done
# differently, the user can change it.
self._ensemble_dict = {ens: rep for rep, ens in enumerate(ensembles)}
# Create all possible hoppers so we do not have to recreate these
# every time which will result in more efficient storage
mover = paths.LastAllowedMover([
# writing an algorithm this convoluted can get you shot in Texas
paths.PartialAcceptanceSequentialMover(
movers=[
shoot,
paths.EnsembleHopMover(
ensemble=enss[0],
target_ensemble=enss[1],
change_replica=self._ensemble_dict[enss[1]]
)
]
) for (enss, shoot) in zip(ens_pairs, shooters)
])
super(BootstrapPromotionMove, self).__init__(mover)
class Bootstrapping(PathSimulator):
"""Creates a SampleSet with one sample per ensemble.
The ensembles for the Bootstrapping pathsimulator must be one ensemble
set, in increasing order. Replicas are named numerically.
"""
calc_name = "Bootstrapping"
def __init__(
self,
storage,
engine=None,
movers=None,
trajectory=None,
ensembles=None
):
"""
Parameters
----------
storage : openpathsampling.storage.Storage
the storage all results should be stored in
engine : openpathsampling.DynamicsEngine
the dynamics engine to be used
movers : list of openpathsampling.PathMover
list of shooters to be used in the BootstrapPromotionMove
trajectory : openpathsampling.Trajectory
an initial trajectory to be started from
ensembles : nested list of openpathsampling.Ensemble
the ensembles this move should act on
"""
# TODO: Change input from trajectory to sample
super(Bootstrapping, self).__init__(storage)
self.engine = engine
paths.EngineMover.default_engine = engine # set the default
self.ensembles = ensembles
self.trajectory = trajectory
sample = paths.Sample(
replica=0,
trajectory=trajectory,
ensemble=self.ensembles[0]
)
self.sample_set = paths.SampleSet([sample])
if movers is None:
pass # TODO: implement defaults: one per ensemble, uniform sel
else:
self.movers = movers
initialization_logging(init_log, self,
['movers', 'ensembles'])
init_log.info("Parameter: %s : %s", 'trajectory', str(trajectory))
self._bootstrapmove = BootstrapPromotionMove(
bias=None,
shooters=self.movers,
ensembles=self.ensembles
)
def run(self, n_steps):
bootstrapmove = self._bootstrapmove
cvs = []
n_samples = 0
if self.storage is not None:
cvs = list(self.storage.cvs)
n_samples = len(self.storage.snapshots)
ens_num = len(self.sample_set)-1
if self.step == 0:
self.save_initial_step()
failsteps = 0
# if we fail n_steps times in a row, kill the job
while ens_num < len(self.ensembles) - 1 and failsteps < n_steps:
self.step += 1
logger.info("Step: " + str(self.step)
+ " Ensemble: " + str(ens_num)
+ " failsteps = " + str(failsteps)
)
paths.tools.refresh_output(
("Working on Bootstrapping cycle step %d" +
" in ensemble %d/%d .\n") %
(self.step, ens_num + 1, len(self.ensembles)),
output_stream=self.output_stream,
refresh=self.allow_refresh
)
movepath = bootstrapmove.move(self.sample_set)
samples = movepath.results
new_sampleset = self.sample_set.apply_samples(samples)
# samples = movepath.results
# logger.debug("SAMPLES:")
# for sample in samples:
# logger.debug("(" + str(sample.replica)
# + "," + str(sample.trajectory)
# + "," + repr(sample.ensemble)
# )
mcstep = MCStep(
simulation=self,
mccycle=self.step,
previous=self.sample_set,
active=new_sampleset,
change=movepath
)
# logger.debug("GLOBALSTATE:")
# for sample in self.sample_set:
# logger.debug("(" + str(sample.replica)
# + "," + str(sample.trajectory)
# + "," + repr(sample.ensemble)
# )
if self.storage is not None:
# compute all cvs now
for cv in cvs:
n_len = len(self.storage.snapshots)
cv(self.storage.snapshots[n_samples:n_len])
n_samples = n_len
self.storage.steps.save(mcstep)
self.sample_set = new_sampleset
old_ens_num = ens_num
ens_num = len(self.sample_set)-1
if ens_num == old_ens_num:
failsteps += 1
if self.step % self.save_frequency == 0:
self.sample_set.sanity_check()
self.sync_storage()
self.sync_storage()
paths.tools.refresh_output(
("DONE! Completed Bootstrapping cycle step %d"
+ " in ensemble %d/%d.\n") %
(self.step, ens_num + 1, len(self.ensembles)),
output_stream=self.output_stream,
refresh=self.allow_refresh
)
[docs]class FullBootstrapping(PathSimulator):
"""
Takes a snapshot as input; gives you back a sampleset with trajectories
for every ensemble in the transition.
This includes
Parameters
----------
transition : :class:`.TISTransition`
the TIS transition to fill by bootstrapping
snapshot : :class:`.Snapshot`
the initial snapshot
storage : :class:`.Storage`
storage file to record the steps (optional)
engine : :class:`.DynamicsEngine`
MD engine to use for dynamics
extra_interfaces : list of :class:`.Volume`
additional interfaces to make into TIS ensembles (beyond those in
the transition)
extra_ensembles : list of :class:`.Ensemble`
additional ensembles to sample after the TIS ensembles
forbidden_states : list of :class:`.Volume`
regions that are disallowed during the initial trajectory. Note that
these region *are* allowed during the interface sampling
initial_max_length : int
maximum length of the initial A->A trajectory
"""
calc_name = "FullBootstrapping"
[docs] def __init__(self, transition, snapshot, storage=None, engine=None,
extra_interfaces=None, extra_ensembles=None,
forbidden_states=None, initial_max_length=None):
super(FullBootstrapping, self).__init__(storage)
self.engine = engine
paths.EngineMover.default_engine = engine # set the default
if extra_interfaces is None:
extra_interfaces = list()
if forbidden_states is None:
forbidden_states = list()
interface0 = transition.interfaces[0]
ensemble0 = transition.ensembles[0]
state = transition.stateA
self.state = state
self.first_traj_ensemble = paths.SequentialEnsemble([
paths.OptionalEnsemble(paths.AllOutXEnsemble(state)),
paths.AllInXEnsemble(state),
paths.OptionalEnsemble(
paths.AllOutXEnsemble(state) & paths.AllInXEnsemble(interface0)
),
paths.OptionalEnsemble(paths.AllInXEnsemble(interface0)),
paths.AllOutXEnsemble(interface0),
paths.OptionalEnsemble(paths.AllOutXEnsemble(state)),
paths.SingleFrameEnsemble(paths.AllInXEnsemble(state))
]) & paths.AllOutXEnsemble(paths.join_volumes(forbidden_states))
self.initial_max_length = initial_max_length
if self.initial_max_length is not None:
self.first_traj_ensemble = (
paths.LengthEnsemble(slice(0, self.initial_max_length))
& self.first_traj_ensemble
)
if extra_ensembles is None:
extra_ensembles = []
self.extra_ensembles = [
paths.TISEnsemble(transition.stateA, transition.stateB, iface,
transition.orderparameter)
for iface in extra_interfaces
] + extra_ensembles
self.transition_shooters = [
paths.OneWayShootingMover(selector=paths.UniformSelector(),
ensemble=ens,
engine=self.engine)
for ens in transition.ensembles
]
self.extra_shooters = [
paths.OneWayShootingMover(selector=paths.UniformSelector(),
ensemble=ens,
engine=self.engine)
for ens in self.extra_ensembles
]
self.snapshot = snapshot.copy()
self.ensemble0 = ensemble0
self.all_ensembles = transition.ensembles + self.extra_ensembles
self.n_ensembles = len(self.all_ensembles)
self.error_max_rounds = True
[docs] def run(self, max_ensemble_rounds=None, n_steps_per_round=20,
build_attempts=20):
#print first_traj_ensemble #DEBUG
has_AA_path = False
subtraj = None
while not has_AA_path:
self.engine.current_snapshot = self.snapshot.copy()
self.engine.snapshot = self.snapshot.copy()
self.output_stream.write("Building first trajectory\n")
sys.stdout.flush()
first_traj = self.engine.generate(
self.engine.current_snapshot,
[self.first_traj_ensemble.can_append]
)
self.output_stream.write("Selecting segment\n")
sys.stdout.flush()
subtrajs = self.ensemble0.split(first_traj)
if len(subtrajs) > 0:
# if we have a short enough path go ahead
subtraj = subtrajs[0]
# check that this is A->A as well
has_AA_path = self.state(subtraj[-1]) \
and self.state(subtraj[0])
build_attempts -= 1
if build_attempts == 0:
raise RuntimeError(
'Too many attempts. Try another initial snapshot instead.')
self.output_stream.write("Sampling " + str(self.n_ensembles) +
" ensembles.\n")
bootstrap = paths.Bootstrapping(
storage=self.storage,
ensembles=self.all_ensembles,
movers=self.transition_shooters + self.extra_shooters,
trajectory=subtraj
)
bootstrap.output_stream = self.output_stream
self.output_stream.write("Beginning bootstrapping\n")
n_rounds = 0
n_filled = len(bootstrap.sample_set)
while n_filled < self.n_ensembles:
bootstrap.run(n_steps_per_round)
if n_filled == len(bootstrap.sample_set):
n_rounds += 1
else:
n_rounds = 0
if n_rounds == max_ensemble_rounds:
# hard equality instead of inequality so that None gives us
# effectively infinite (rounds add one at a time
msg = ("Too many rounds of bootstrapping: " + str(n_rounds)
+ " round of " + str(n_steps_per_round) + " steps.")
if self.error_max_rounds:
raise RuntimeError(msg)
else: # pragma: no cover
logger.warning(msg)
break
n_filled = len(bootstrap.sample_set)
return bootstrap.sample_set
[docs]class PathSampling(PathSimulator):
"""
General path sampling code.
Takes a single move_scheme and generates samples from that, keeping one
per replica after each move.
"""
calc_name = "PathSampling"
[docs] def __init__(self, storage, move_scheme=None, sample_set=None,
initialize=True):
"""
Parameters
----------
storage : :class:`openpathsampling.storage.Storage`
the storage where all results should be stored in
move_scheme : :class:`openpathsampling.MoveScheme`
the move scheme used for the pathsampling cycle
sample_set : :class:`openpathsampling.SampleSet`
the initial SampleSet for the Simulator
initialize : bool
if `False` the new PathSimulator will continue at the step and
not create a new SampleSet object to cut the connection to previous
steps
"""
super(PathSampling, self).__init__(storage)
self.move_scheme = move_scheme
if move_scheme is not None:
self.root_mover = move_scheme.move_decision_tree()
self._mover = paths.PathSimulatorMover(self.root_mover, self)
else:
self.root_mover = None
self._mover = None
initialization_logging(init_log, self,
['move_scheme', 'sample_set'])
self.live_visualizer = None
self.status_update_frequency = 1
if initialize:
samples = []
if sample_set is not None:
for sample in sample_set:
samples.append(sample.copy_reset())
self.sample_set = paths.SampleSet(samples)
mcstep = MCStep(
simulation=self,
mccycle=self.step,
active=self.sample_set,
change=paths.AcceptedSampleMoveChange(self.sample_set.samples)
)
self._current_step = mcstep
else:
self.sample_set = sample_set
self._current_step = None
self.root = self.sample_set
if self.storage is not None:
template_trajectory = self.sample_set.samples[0].trajectory
self.storage.save(template_trajectory)
self.save_current_step()
[docs] def to_dict(self):
return {
'root': self.root,
'move_scheme': self.move_scheme,
'root_mover': self.root_mover,
}
[docs] @classmethod
def from_dict(cls, dct):
# create empty object
obj = cls(None)
# and correct the content
obj.move_scheme = dct['move_scheme']
obj.root = dct['root']
obj.root_mover = dct['root_mover']
obj._mover = paths.PathSimulatorMover(obj.root_mover, obj)
return obj
@property
def current_step(self):
return self._current_step
[docs] def save_current_step(self):
"""
Save the current step to the storage
"""
if self.storage is not None and self._current_step is not None:
self.storage.steps.save(self._current_step)
[docs] @classmethod
def from_step(cls, storage, step, initialize=True):
"""
Parameters
----------
storage : :class:`openpathsampling.storage.Storage`
the storage to be used to hold the simulation results
step : :class:`openpathsampling.MCStep`
the step used to fill the initial parameters
initialize : bool
if `False` the new PathSimulator will continue at the given step and
not create a new SampleSet object to cut the connection to previous
steps.
Returns
-------
:class:`openpathsampling.PathSampling`
the new simulator object
"""
obj = cls(
storage,
step.simulation.move_scheme,
step.sample_set,
initialize=initialize
)
return obj
[docs] def restart_at_step(self, step, storage=None):
"""
Continue with a loaded pathsampling at a given step
Notes
-----
You can only continue from a step that is compatible in the sense
that it was previously generated from the pathsampling instance.
If you want to switch the move scheme you need to create a new
pathsampling instance. You can do so with the constructor or using
the classmethod `from_step` which simplifies the setup process
Parameters
----------
step : :class:`MCStep`
the step to be continued from. You are always free to chose any step
which can be used to fork a simulation but for analysis you may
only use one path of steps.
storage : :class:`Storage`
If given this will change the storage used to store the generated
steps
"""
if step.simulation is not self:
raise RuntimeWarning(
'Trying to continue from other step. Please use the '
'`.from_step` method to create a new PathSampling object '
'instead.')
if storage is not None:
self.storage = storage
self.step = step.mccycle
self.sample_set = step.active
self.root = step.simulation.root
self._current_step = step
def run_until(self, n_steps):
# if self.storage is not None:
# if len(self.storage.steps) > 0:
# self.step = len(self.storage.steps)
n_steps_to_run = n_steps - self.step
self.run(n_steps_to_run)
[docs] def run(self, n_steps):
mcstep = None
# cvs = list()
# n_samples = 0
# if self.storage is not None:
# n_samples = len(self.storage.snapshots)
# cvs = list(self.storage.cvs)
initial_time = time.time()
for nn in range(n_steps):
self.step += 1
logger.info("Beginning MC cycle " + str(self.step))
refresh = self.allow_refresh
if self.step % self.status_update_frequency == 0:
# do we visualize this step?
if self.live_visualizer is not None and mcstep is not None:
# do we visualize at all?
self.live_visualizer.draw_ipynb(mcstep)
refresh = False
elapsed = time.time() - initial_time
if nn > 0:
time_per_step = elapsed / nn
else:
time_per_step = 1.0
paths.tools.refresh_output(
"Working on Monte Carlo cycle number " + str(self.step)
+ "\n" + paths.tools.progress_string(nn, n_steps,
elapsed),
refresh=refresh,
output_stream=self.output_stream
)
time_start = time.time()
movepath = self._mover.move(self.sample_set, step=self.step)
samples = movepath.results
new_sampleset = self.sample_set.apply_samples(samples)
time_elapsed = time.time() - time_start
# TODO: we can save this with the MC steps for timing? The bit
# below works, but is only a temporary hack
setattr(movepath.details, "timing", time_elapsed)
mcstep = MCStep(
simulation=self,
mccycle=self.step,
previous=self.sample_set,
active=new_sampleset,
change=movepath
)
self._current_step = mcstep
self.save_current_step()
# if self.storage is not None:
# # I think this is done automatically when saving snapshots
# # for cv in cvs:
# # n_len = len(self.storage.snapshots)
# # cv(self.storage.snapshots[n_samples:n_len])
# # n_samples = n_len
#
# self.storage.steps.save(mcstep)
if self.step % self.save_frequency == 0:
self.sample_set.sanity_check()
self.sync_storage()
self.sample_set = new_sampleset
self.sync_storage()
if self.live_visualizer is not None and mcstep is not None:
self.live_visualizer.draw_ipynb(mcstep)
paths.tools.refresh_output(
"DONE! Completed " + str(self.step) + " Monte Carlo cycles.\n",
refresh=False,
output_stream=self.output_stream
)
class ShootFromSnapshotsSimulation(PathSimulator):
"""
Generic class for shooting from a set of snapshots.
This mainly serves as a base class for other simulation types
(committor, reactive flux, etc.) All of these take initial snapshots
from within some defined volume, modify the velocities in some way, and
run the dynamics until some ensemble tells them to stop.
While this is usually subclassed, it isn't technically abstract, so a
user can create a simulation of this sort on-the-fly for some weird
ensembles.
Parameters
----------
storage : :class:`.Storage`
the file to store simulations in
engine : :class:`.DynamicsEngine`
the dynamics engine to use to run the simulation
starting_volume : :class:`.Volume`
volume initial frames must be inside of
forward_ensemble : :class:`.Ensemble`
ensemble for shots in the forward direction
backward_ensemble : :class:`.Ensemble`
ensemble for shots in the backward direction
randomizer : :class:`.SnapshotModifier`
the method used to modify the input snapshot before each shot
initial_snapshots : list of :class:`.Snapshot`
initial snapshots to use
"""
def __init__(self, storage, engine, starting_volume, forward_ensemble,
backward_ensemble, randomizer, initial_snapshots):
super(ShootFromSnapshotsSimulation, self).__init__(storage)
self.engine = engine
# FIXME: this next line seems weird; but tests fail without it
paths.EngineMover.default_engine = engine
try:
initial_snapshots = list(initial_snapshots)
except TypeError:
initial_snapshots = [initial_snapshots]
self.initial_snapshots = initial_snapshots
self.randomizer = randomizer
self.starting_ensemble = (
paths.AllInXEnsemble(starting_volume) & paths.LengthEnsemble(1)
)
self.forward_ensemble = forward_ensemble
self.backward_ensemble = backward_ensemble
self.forward_mover = paths.ForwardExtendMover(
ensemble=self.starting_ensemble,
target_ensemble=self.forward_ensemble
)
self.backward_mover = paths.BackwardExtendMover(
ensemble=self.starting_ensemble,
target_ensemble=self.backward_ensemble
)
# subclasses will often override this
self.mover = paths.RandomChoiceMover([self.forward_mover,
self.backward_mover])
def to_dict(self):
dct = {
'engine': self.engine,
'initial_snapshots': self.initial_snapshots,
'randomizer': self.randomizer,
'starting_ensemble': self.starting_ensemble,
'forward_ensemble': self.forward_ensemble,
'backward_ensemble': self.backward_ensemble,
'mover': self.mover
}
return dct
@classmethod
def from_dict(cls, dct):
obj = cls.__new__(cls)
# user must manually set a storage!
super(ShootFromSnapshotsSimulation, obj).__init__(storage=None)
obj.engine = dct['engine']
obj.initial_snapshots = dct['initial_snapshots']
obj.randomizer = dct['randomizer']
obj.starting_ensemble = dct['starting_ensemble']
obj.forward_ensemble = dct['forward_ensemble']
obj.backward_ensemble = dct['backward_ensemble']
obj.mover = dct['mover']
return obj
def run(self, n_per_snapshot, as_chain=False):
"""Run the simulation.
Parameters
----------
n_per_snapshot : int
number of shots per snapshot
as_chain : bool
if as_chain is False (default), then the input to the modifier
is always the original snapshot. If as_chain is True, then the
input to the modifier is the previous (modified) snapshot.
Useful for modifications that can't cover the whole range from a
given snapshot.
"""
self.step = 0
snap_num = 0
for snapshot in self.initial_snapshots:
start_snap = snapshot
# do what we need to get the snapshot set up
for step in range(n_per_snapshot):
paths.tools.refresh_output(
"Working on snapshot %d / %d; shot %d / %d" % (
snap_num+1, len(self.initial_snapshots),
step+1, n_per_snapshot
),
output_stream=self.output_stream,
refresh=self.allow_refresh
)
if as_chain:
start_snap = self.randomizer(start_snap)
else:
start_snap = self.randomizer(snapshot)
sample_set = paths.SampleSet([
paths.Sample(replica=0,
trajectory=paths.Trajectory([start_snap]),
ensemble=self.starting_ensemble)
])
sample_set.sanity_check()
new_pmc = self.mover.move(sample_set)
samples = new_pmc.results
new_sample_set = sample_set.apply_samples(samples)
mcstep = MCStep(
simulation=self,
mccycle=self.step,
previous=sample_set,
active=new_sample_set,
change=new_pmc
)
if self.storage is not None:
self.storage.steps.save(mcstep)
if self.step % self.save_frequency == 0:
self.sync_storage()
self.step += 1
snap_num += 1
[docs]class CommittorSimulation(ShootFromSnapshotsSimulation):
"""Committor simulations. What state do you hit from a given snapshot?
Parameters
----------
storage : :class:`.Storage`
the file to store simulations in
engine : :class:`.DynamicsEngine`
the dynamics engine to use to run the simulation
states : list of :class:`.Volume`
the volumes representing the stable states
randomizer : :class:`.SnapshotModifier`
the method used to modify the input snapshot before each shot
initial_snapshots : list of :class:`.Snapshot`
initial snapshots to use
direction : int or None
if direction > 0, only forward shooting is used, if direction < 0,
only backward, and if direction is None, mix of forward and
backward. Useful if using no modification on the randomizer.
"""
[docs] def __init__(self, storage, engine=None, states=None, randomizer=None,
initial_snapshots=None, direction=None):
all_state_volume = paths.join_volumes(states)
no_state_volume = ~all_state_volume
# shoot forward until we hit a state
forward_ensemble = paths.SequentialEnsemble([
paths.AllOutXEnsemble(all_state_volume),
paths.AllInXEnsemble(all_state_volume) & paths.LengthEnsemble(1)
])
# or shoot backward until we hit a state
backward_ensemble = paths.SequentialEnsemble([
paths.AllInXEnsemble(all_state_volume) & paths.LengthEnsemble(1),
paths.AllOutXEnsemble(all_state_volume)
])
super(CommittorSimulation, self).__init__(
storage=storage,
engine=engine,
starting_volume=no_state_volume,
forward_ensemble=forward_ensemble,
backward_ensemble=backward_ensemble,
randomizer=randomizer,
initial_snapshots=initial_snapshots
)
self.states = states
self.direction = direction
# override the default self.mover given by the superclass
if self.direction is None:
self.mover = paths.RandomChoiceMover([self.forward_mover,
self.backward_mover])
elif self.direction > 0:
self.mover = self.forward_mover
elif self.direction < 0:
self.mover = self.backward_mover
[docs] def to_dict(self):
dct = super(CommittorSimulation, self).to_dict()
dct['states'] = self.states
dct['direction'] = self.direction
return dct
[docs] @classmethod
def from_dict(cls, dct):
obj = super(CommittorSimulation, cls).from_dict(dct)
obj.states = dct['states']
obj.direction = dct['direction']
return obj
[docs]class DirectSimulation(PathSimulator):
"""
Direct simulation to calculate rates and fluxes.
In practice, this is primarily used to calculate the flux if you want to
do so without saving the entire trajectory. However, it will also save
the trajectory, if you want it to.
Parameters
----------
storage : :class:`.Storage`
file to store the trajectory in. Default is None, meaning that the
trajectory isn't stored (also faster)
engine : :class:`.DynamicsEngine`
the engine for the molecular dynamics
states : list of :class:`.Volume`
states to look for transitions between
flux_pairs : list of 2-tuples of ``(state, interface)``
fluxes will calculate the flux out of `state` and through
`interface` for each pair in this list
initial_snapshot : :class:`.Snapshot`
initial snapshot for the MD
Attributes
----------
transitions : dict with keys 2-tuple of paths.Volume, values list of int
for each pair of states (from_state, to_state) as a key, gives the
number of frames for each transition from the entry into from_state
to entry into to_state
rate_matrix : pd.DataFrame
calculates the rate matrix, in units of per-frames
fluxes : dict with keys 2-tuple of paths.Volume, values float
flux out of state and through interface for each (state, interface)
key pair
n_transitions : dict with keys 2-tuple of paths.Volume, values int
number of transition events for each pair of states
n_flux_events : dict with keys 2-tuple of paths.Volume, values int
number of flux events for each (state, interface) pair
"""
[docs] def __init__(self, storage=None, engine=None, states=None,
flux_pairs=None, initial_snapshot=None):
super(DirectSimulation, self).__init__(storage)
self.engine = engine
self.states = states
self.flux_pairs = flux_pairs
if flux_pairs is None:
self.flux_pairs = []
self.initial_snapshot = initial_snapshot
self.save_every = 1
# TODO: might set these elsewhere for reloading purposes?
self.transition_count = []
self.flux_events = {pair: [] for pair in self.flux_pairs}
@property
def results(self):
return {'transition_count': self.transition_count,
'flux_events': self.flux_events}
def load_results(self, results):
self.transition_count = results['transition_count']
self.flux_events = results['flux_events']
[docs] def run(self, n_steps):
most_recent_state = None
first_interface_exit = {p: -1 for p in self.flux_pairs}
last_state_visit = {s: -1 for s in self.states}
was_in_interface = {p: None for p in self.flux_pairs}
local_traj = paths.Trajectory([self.initial_snapshot])
self.engine.current_snapshot = self.initial_snapshot
for step in xrange(n_steps):
frame = self.engine.generate_next_frame()
# update the most recent state if we're in a state
state = None # no state at all
for s in self.states:
if s(frame):
state = s
if state:
last_state_visit[state] = step
if state is not most_recent_state:
# we've made a transition: on the first entrance into
# this state, we reset the last_interface_exit
state_flux_pairs = [p for p in self.flux_pairs
if p[0] == state]
for p in state_flux_pairs:
first_interface_exit[p] = -1
# if this isn't the first change of state, we add the
# transition
if most_recent_state:
self.transition_count.append((state, step))
most_recent_state = state
# update whether we've left any interface
for p in self.flux_pairs:
state = p[0]
interface = p[1]
is_in_interface = interface(frame)
# by line: (1) this is a crossing; (2) the most recent state
# is correct; (3) this is the FIRST crossing
first_exit_condition = (
not is_in_interface and was_in_interface[p] # crossing
and state is most_recent_state # correct recent state
and first_interface_exit[p] < last_state_visit[state]
)
if first_exit_condition:
first_exit = first_interface_exit[p]
# successful exit
if 0 < first_exit < last_state_visit[state]:
flux_time_range = (step, first_exit)
self.flux_events[p].append(flux_time_range)
first_interface_exit[p] = step
was_in_interface[p] = is_in_interface
if self.storage is not None:
local_traj += [frame]
if self.storage is not None:
self.storage.save(local_traj)
@property
def transitions(self):
prev_state = None
prev_time = None
results = {}
for (new_state, time) in self.transition_count:
if prev_state is not None and prev_time is not None:
lag = time - prev_time
try:
results[(prev_state, new_state)] += [lag]
except KeyError:
results[(prev_state, new_state)] = [lag]
prev_state = new_state
prev_time = time
return results
@property
def rate_matrix(self):
transitions = self.transitions
try:
time_per_step = self.engine.snapshot_timestep
except AttributeError:
time_per_step = 1.0
total_time = {s: sum(sum((transitions[t] for t in transitions
if t[0] == s), [])) * time_per_step
for s in self.states}
rates = {t : len(transitions[t]) / total_time[t[0]]
for t in transitions}
# rates = {t : 1.0 / np.array(transitions[t]).mean()
# for t in transitions}
state_names = [s.name for s in self.states]
rate_matrix = pd.DataFrame(columns=state_names, index=state_names)
for t in rates:
rate_matrix.at[t[0].name, t[1].name] = rates[t]
return rate_matrix
@property
def fluxes(self):
results = {}
try:
time_per_step = self.engine.snapshot_timestep
except AttributeError:
time_per_step = 1.0
for p in self.flux_events:
lags = [t[0] - t[1] for t in self.flux_events[p]]
results[p] = 1.0 / np.mean(lags) / time_per_step
return results
# return {p : 1.0 / np.array(self.flux_events[p]).mean()
# for p in self.flux_events}
@property
def n_transitions(self):
transitions = self.transitions
return {t : len(transitions[t]) for t in transitions}
@property
def n_flux_events(self):
return {p : len(self.flux_events[p]) for p in self.flux_events}