Source code for openpathsampling.pathsimulators.direct_md

import logging
import numpy as np
import pandas as pd

import openpathsampling as paths
from .path_simulator import PathSimulator, MCStep

logger = logging.getLogger(__name__)
init_log = logging.getLogger('openpathsampling.initialization')

# python 3 support
try:
    xrange
except NameError:
    xrange = range

[docs] class DirectSimulation(PathSimulator): """ Direct simulation to calculate rates and fluxes. In practice, this is primarily used to calculate the flux if you want to do so without saving the entire trajectory. However, it will also save the trajectory, if you want it to. Parameters ---------- storage : :class:`.Storage` file to store the trajectory in. Default is None, meaning that the trajectory isn't stored (also faster) engine : :class:`.DynamicsEngine` the engine for the molecular dynamics states : list of :class:`.Volume` states to look for transitions between flux_pairs : list of 2-tuples of ``(state, interface)`` fluxes will calculate the flux out of `state` and through `interface` for each pair in this list initial_snapshot : :class:`.Snapshot` initial snapshot for the MD Attributes ---------- transitions : dict with keys 2-tuple of paths.Volume, values list of int for each pair of states (from_state, to_state) as a key, gives the number of frames for each transition from the entry into from_state to entry into to_state rate_matrix : pd.DataFrame calculates the rate matrix, in units of per-frames fluxes : dict with keys 2-tuple of paths.Volume, values float flux out of state and through interface for each (state, interface) key pair n_transitions : dict with keys 2-tuple of paths.Volume, values int number of transition events for each pair of states n_flux_events : dict with keys 2-tuple of paths.Volume, values int number of flux events for each (state, interface) pair """
[docs] def __init__(self, storage=None, engine=None, states=None, flux_pairs=None, initial_snapshot=None): super(DirectSimulation, self).__init__(storage) self.engine = engine self.states = states self.flux_pairs = flux_pairs if flux_pairs is None: self.flux_pairs = [] self.initial_snapshot = initial_snapshot self.save_every = 1 # TODO: might set these elsewhere for reloading purposes? self.transition_count = [] self.flux_events = {pair: [] for pair in self.flux_pairs}
@property def results(self): return {'transition_count': self.transition_count, 'flux_events': self.flux_events} def load_results(self, results): self.transition_count = results['transition_count'] self.flux_events = results['flux_events'] def run(self, n_steps): most_recent_state = None first_interface_exit = {p: -1 for p in self.flux_pairs} last_state_visit = {s: -1 for s in self.states} was_in_interface = {p: None for p in self.flux_pairs} local_traj = paths.Trajectory([self.initial_snapshot]) self.engine.current_snapshot = self.initial_snapshot self.engine.start() for step in xrange(n_steps): frame = self.engine.generate_next_frame() # update the most recent state if we're in a state state = None # no state at all for s in self.states: if s(frame): state = s if state: last_state_visit[state] = step if state is not most_recent_state: # we've made a transition: on the first entrance into # this state, we reset the last_interface_exit state_flux_pairs = [p for p in self.flux_pairs if p[0] == state] for p in state_flux_pairs: first_interface_exit[p] = -1 # if this isn't the first change of state, we add the # transition if most_recent_state: self.transition_count.append((state, step)) most_recent_state = state # update whether we've left any interface for p in self.flux_pairs: state = p[0] interface = p[1] is_in_interface = interface(frame) # by line: (1) this is a crossing; (2) the most recent state # is correct; (3) this is the FIRST crossing first_exit_condition = ( not is_in_interface and was_in_interface[p] # crossing and state is most_recent_state # correct recent state and first_interface_exit[p] < last_state_visit[state] ) if first_exit_condition: first_exit = first_interface_exit[p] # successful exit if 0 < first_exit < last_state_visit[state]: flux_time_range = (step, first_exit) self.flux_events[p].append(flux_time_range) first_interface_exit[p] = step was_in_interface[p] = is_in_interface if self.storage is not None: local_traj += [frame] self.engine.stop(local_traj) if self.storage is not None: self.storage.save(local_traj) @property def transitions(self): prev_state = None prev_time = None results = {} for (new_state, time) in self.transition_count: if prev_state is not None and prev_time is not None: lag = time - prev_time try: results[(prev_state, new_state)] += [lag] except KeyError: results[(prev_state, new_state)] = [lag] prev_state = new_state prev_time = time return results @property def rate_matrix(self): transitions = self.transitions try: time_per_step = self.engine.snapshot_timestep except AttributeError: time_per_step = 1.0 total_time = {s: sum(sum((transitions[t] for t in transitions if t[0] == s), [])) * time_per_step for s in self.states} rates = {t : len(transitions[t]) / total_time[t[0]] for t in transitions} # rates = {t : 1.0 / np.array(transitions[t]).mean() # for t in transitions} state_names = [s.name for s in self.states] rate_matrix = pd.DataFrame(columns=state_names, index=state_names) for t in rates: rate_matrix.at[t[0].name, t[1].name] = rates[t] return rate_matrix @property def fluxes(self): results = {} try: time_per_step = self.engine.snapshot_timestep except AttributeError: time_per_step = 1.0 for p in self.flux_events: lags = [t[0] - t[1] for t in self.flux_events[p]] results[p] = 1.0 / np.mean(lags) / time_per_step return results # return {p : 1.0 / np.array(self.flux_events[p]).mean() # for p in self.flux_events} @property def n_transitions(self): transitions = self.transitions return {t : len(transitions[t]) for t in transitions} @property def n_flux_events(self): return {p : len(self.flux_events[p]) for p in self.flux_events}