"""
@author: JD Chodera
@author: JH Prinz
"""
import numpy as np
import mdtraj as md
import simtk.unit as u
import openpathsampling as paths
from openpathsampling.netcdfplus import StorableObject
# =============================================================================================
# SIMULATION TRAJECTORY
# =============================================================================================
[docs]class Trajectory(list, StorableObject):
"""
Simulation trajectory. Essentially a python list of snapshots
"""
engine = None
[docs] def __init__(self, trajectory=None):
"""
Create a simulation trajectory object
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory` or list of :class:`openpathsampling.snapshot.BaseSnapshot`
if specified, make a deep copy of specified trajectory
"""
# Initialize list.
list.__init__(self)
StorableObject.__init__(self)
self.path_probability = None # For future uses
if trajectory is not None:
# Try to make a copy out of whatever container we were provided
if hasattr(trajectory, 'atom_indices'):
self.atom_indices = trajectory.atom_indices
else:
self.atom_indices = None
if type(trajectory) is Trajectory:
self.extend(trajectory.iter_proxies())
else:
self.extend(trajectory)
else:
self.atom_indices = None
def extend(self, iterable):
if type(iterable) is Trajectory:
list.extend(self, iterable.iter_proxies())
else:
list.extend(self, iterable)
def __str__(self):
return 'Trajectory[' + str(len(self)) + ']'
def __repr__(self):
return 'Trajectory[' + str(len(self)) + ']'
[docs] def map(self, fnc, allow_fast=True):
"""
This runs a function and tries to be fast.
Fast here means that functions that are purely based on CVs can be
evaluated without actually loading the real Snapshot object. This
functions tries to do that and if it fails it does it the usual way
and creates the snapshot object. This bears the possibility that
the function uses the fake snapshots and returns a non-sense value.
It is up to the user to make sure this will not happen.
"""
if allow_fast:
try:
return [fnc(frame) for frame in list.__iter__(self)]
except:
pass
return [fnc(frame) for frame in self]
@property
def reversed(self):
"""
Returns a reversed (shallow) copy of the trajectory itself. Effectively
creates a new Trajectory object and then fills it with shallow reversed copies
of the contained snapshots.
Returns
-------
:class:`openpathsampling.trajectory.Trajectory`
the reversed trajectory
"""
return Trajectory([snap for snap in reversed(self)])
[docs] def prepend(self, snapshot):
"""
Prepend a snapshot
Just convenience method to replace insert(0, snapshot)
"""
self.insert(0, snapshot)
# And a generation of scientist-programmers who grew up learning
# "OPS trajectories are just Python lists" scream in pain when they
# find this after googling "python list.prepend not working".
# (Blame JHP. This was his doing.)
[docs] def coordinates(self):
"""
Return all coordinates as a numpy array
Returns
-------
coordinates : numpy.ndarray((n_frames, n_atoms, 3))
numpy.array of coordinates of size number of
frames 'n_frames' x number of atoms 'n_atoms' x 3 in x,y,z
"""
# Make sure snapshots are stored and have an index and then add the snapshot index to the trajectory
n_frames = len(self)
n_atoms = self.n_atoms
n_spatial = self.spatial
output = np.zeros([n_frames, n_atoms, n_spatial], np.float32)
for frame_index in range(n_frames):
if self.atom_indices is None:
output[frame_index, :, :] = self[frame_index].coordinates
else:
output[frame_index, :, :] = self[frame_index].coordinates[self.atom_indices, :]
return output
def xyz(self):
n_frames = len(self)
n_atoms = self.n_atoms
n_spatial = self.spatial
output = np.zeros([n_frames, n_atoms, n_spatial], np.float32)
for frame_index in range(n_frames):
if self.atom_indices is None:
output[frame_index, :, :] = self[frame_index].xyz
else:
output[frame_index, :, :] = self[frame_index].xyz[self.atom_indices, :]
return output
@property
def n_snapshots(self):
"""
Return the number of frames in the trajectory.
Returns
-------
length (int) - the number of frames in the trajectory
Notes
-----
Might be removed in later versions for len(trajectory) is more pythonic
See also
--------
n_frames, len
"""
return len(self)
@property
def n_frames(self):
"""
Return the number of frames in the trajectory.
Returns
-------
length (int) - the number of frames in the trajectory
See also
--------
n_snapshots, len
"""
return len(self)
[docs] def configurations(self):
"""
Return a list of the snapshots in the trajectory
Returns
-------
list of Configuration
the list of Configuration objects
"""
return [f.configuration for f in self]
[docs] def momenta(self):
"""
Return a list of the Momentum objects in the trajectory
Returns
-------
list of Momentum()
the list of Momentum objects
"""
return [f.momenta for f in self]
@property
def spatial(self):
if self.topology is None:
n_spatial = self[0].coordinates.shape[1]
else:
n_spatial = self.topology.n_spatial
return n_spatial
@property
def n_atoms(self):
"""
Return the number of atoms in the trajectory in the current view.
Returns
-------
n_atoms : int
number of atoms
Notes
-----
If a trajectory has been subsetted then this returns only the number
of the view otherwise if equals the number of atoms in the snapshots stored
"""
if self.atom_indices is None:
n_atoms = self[0].xyz.shape[0]
else:
n_atoms = len(self.atom_indices)
return n_atoms
# =============================================================================================
# LIST INHERITANCE FUNCTIONS
# =============================================================================================
def __getslice__(self, *args, **kwargs):
ret = list.__getslice__(self, *args, **kwargs)
if type(ret) is list:
ret = Trajectory(ret)
ret.atom_indices = self.atom_indices
return ret
def __hash__(self):
return object.__hash__(self)
def __getitem__(self, index):
# Allow for numpy style of selecting several indices using a list as index parameter
if hasattr(index, '__iter__'):
ret = [list.__getitem__(self, i) for i in index]
else:
ret = list.__getitem__(self, index)
if type(ret) is list:
ret = Trajectory(ret)
ret.atom_indices = self.atom_indices
elif hasattr(ret, '_idx'):
ret = ret.__subject__
return ret
def __reversed__(this):
class ObjectIterator:
def __init__(self):
self.trajectory = this
self.idx = len(this)
self.length = 0
def __iter__(self):
return self
def next(self):
if self.idx > self.length:
self.idx -= 1
snapshot = self.trajectory[self.idx]
return snapshot.reversed
else:
raise StopIteration()
return ObjectIterator()
[docs] def get_as_proxy(self, item):
"""
Get an actual contained element
This will also return lazy proxy objects and not the referenced ones
as does __iter__, __reversed__ or __getitem__. Useful for faster access to the elements
This is equal to use list.__getitem__(trajectory, item)
Returns
-------
:class:`openpathsampling.snapshot.Snapshot` or :class:`openpathsampling.netcdfplus.proxy.LoaderProxy`
"""
return list.__getitem__(self, item)
[docs] def as_proxies(self):
"""
Returns all contains all actual elements
This will also return lazy proxy objects and not the references ones
as does __iter__, __reversed__ or __getitme__. Useful for faster access to the elements
Returns
-------
list of :class:`openpathsampling.snapshot.Snapshot` or :class:`openpathsampling.netcdfplus.proxy.LoaderProxy`
"""
return list(self.iter_proxies())
[docs] def iter_proxies(self):
"""
Returns an iterator over all actual elements
This will also return lazy proxy objects and not the references ones
as does __iter__, __reversed__ or __getitme__. Useful for faster access to the elements
Returns
-------
Iterator() over list of :class:`openpathsampling.snapshot.Snapshot` or :class:`openpathsampling.netcdfplus.proxy.LoaderProxy`
"""
return list.__iter__(self)
[docs] def __iter__(this):
"""
Return an iterator over all snapshots in the storage
This will always give real :class:`openpathsampling.snapshot.Snapshot` objects and never proxies to snapshots.
If you prefer proxies (if available) use `.iteritems()`
Parameters
----------
iter_range : slice or None
if this is not `None` it confines the iterator to objects specified
in the slice
Returns
-------
Iterator()
The iterator that iterates the objects in the store
"""
class ObjectIterator:
def __init__(self):
self.trajectory = this
self.idx = 0
self.length = len(this)
def __iter__(self):
return self
def next(self):
if self.idx < self.length:
obj = self.trajectory[self.idx]
self.idx += 1
return obj
else:
raise StopIteration()
return ObjectIterator()
def __add__(self, other):
t = Trajectory(self)
t.extend(other)
return t
# =============================================================================================
# PATH ENSEMBLE FUNCTIONS
# =============================================================================================
[docs] def summarize_by_volumes(self, label_dict):
"""Summarize trajectory based on number of continuous frames in volumes.
This uses a dictionary of disjoint volumes: the volumes must be disjoint
so that every frame can be mapped to one volume. If the frame maps to
none of the given volumes, it returns the label None.
Parameters
----------
label_dict : dict
dictionary with labels for keys and volumes for values
Returns
-------
list of tuple
format is (label, number_of_frames)
"""
last_vol = None
count = 0
segment_labels = []
# list.__iter__ for speed
for frame in list.__iter__(self):
in_state = []
for key in label_dict.keys():
vol = label_dict[key]
if vol(frame):
in_state.append(key)
if len(in_state) > 1:
raise RuntimeError("Volumes given to summarize_by_volumes not disjoint")
if len(in_state) == 0:
current_vol = None
else:
current_vol = in_state[0]
if last_vol == current_vol:
count += 1
else:
if count > 0:
segment_labels.append( (last_vol, count) )
last_vol = current_vol
count = 1
segment_labels.append( (last_vol, count) )
return segment_labels
[docs] def summarize_by_volumes_str(self, label_dict, delimiter="-"):
"""
Return string version of the volumes visited by this trajectory.
See `Trajectory.summarize_by_volumes` for details.
Parameters
----------
label_dict : dict
dictionary with labels for keys and volumes for values
delimiter : string (default "-")
string used to separate volumes in output
Returns
-------
string
order in which this trajectory visits the volumes in
`label_dict`, separated by the `delimiter`
"""
summary = self.summarize_by_volumes(label_dict)
return delimiter.join([str(s[0]) for s in summary])
[docs] def pathHamiltonian(self):
"""
Compute the generalized path Hamiltonian of the trajectory.
Returns
-------
H : simtk.unit.Quantity with units of energy
the generalized path Hamiltonian
References
----------
For a description of the path Hamiltonian, see [1]:
[1] Chodera JD, Swope WC, Noe F, Prinz JH, Shirts MR, and Pande VS. Dynamical reweighting:
Improved estimates of dynamical properties from simulations at multiple temperatures.
"""
nsnapshots = len(self)
if nsnapshots > 0:
H = self[0].total_energy
for snapshot_index in range(1, nsnapshots - 1):
H += self[snapshot_index].kinetic_energy
else:
H = 0
return H
[docs] def computeActivity(self, atom_indices=None):
"""
Compute the (timeless!) activity of a given trajectory, defined in Ref. [1] as
.. math::
K[x(t)] / delta_t = delta_t \sum_{t=0}^{t_obs} \sum_{j=1}^N [r_j(t+delta_t) - r_j(t)]^2 / delta_t
RETURNS
-------
K : simtk.unit
activity K[x(t)] for the specified trajectory
NOTES
-----
Can we avoid dividing and multipying by nanometers to speed up?
"""
# Determine number of frames in trajectory.
nframes = len(self)
# Compute activity of component A.
K = 0.0
if atom_indices is None:
atom_indices = slice(None)
for frame_index in range(nframes - 1):
# Compute displacement of all atoms.
delta_r = self[frame_index + 1].coordinates - self[frame_index].coordinates
# Compute contribution to activity K.
K += ((delta_r[atom_indices, :] / u.nanometers) ** 2).sum()
return K * (u.nanometers ** 2)
[docs] def logEquilibriumTrajectoryProbability(self):
"""
Compute the (temperatureless!) log equilibrium probability
Up to an unknown additive constant of an unbiased trajectory evolved
according to Verlet dynamics with Andersen thermostatting.
Parameters
----------
trajectory : openpathsampling.Trajectory
the trajectory
Returns
-------
log_q : float
the log equilibrium probability of the trajectory divided by the
inverse temperature beta
NOTES
-----
This might be better places into the trajectory class. The trajectory
should know the system and ensemble? and so it is not necessarily
TPS specific
"""
nsnapshots = len(self)
log_q = - self[0].total_energy
for snapshot_index in range(1, nsnapshots - 1):
log_q += - self[snapshot_index].kinetic_energy
return log_q
# =============================================================================================
# ANALYSIS FUNCTIONS
# =============================================================================================
[docs] def shared_configurations(self, other):
"""
Returns a set of shared snapshots
Parameters
----------
other : :class:`openpathsampling.trajectory.Trajectory`
the second trajectory to use
Returns
-------
set of :class:`openpathsampling.snapshot.Snapshot`
the set of common snapshots
"""
return set([snap for snap in self]) & set([snap for snap in other])
[docs] def shared_subtrajectory(self, other):
"""
Returns a subtrajectory which only contains frames present in other
Parameters
----------
other : :class:`openpathsampling.trajectory.Trajectory`
the second trajectory to use
Returns
-------
:class:`openpathsampling.trajectory.Trajectory`
the shared subtrajectory
"""
shared = self.shared_configurations(other)
return Trajectory([snap for snap in self if snap in shared])
# =============================================================================================
# UTILITY FUNCTIONS
# =============================================================================================
[docs] def subset(self, atom_indices):
"""
Reduce the view of the trajectory to a subset of atoms specified.
This is only a view, no data will be changed or copied.
Returns
-------
:class:`openpathsampling.trajectory.Trajectory`
the trajectory showing the subsets of atoms
"""
t = Trajectory(self)
t.atom_indices = atom_indices
return t
@property
def solute(self):
"""
Reduce the view of the trajectory to a subset of solute atoms
specified in the associated DynamicsEngine
Returns
-------
:class:`openpathsampling.trajectory.Trajectory`
the trajectory showing the subsets of solute atoms
"""
# TODO: To remove the dependency of the dynamics engine we need to get the information
# TODO: about the solute_indices from somewhere else, preferrably the topology?
if Trajectory.engine is None:
raise ValueError("No engine specified to get solute_indices from !")
return self.subset(Trajectory.engine.solute_indices)
[docs] def full(self):
"""
Return a view of the trajectory with all atoms
Returns
-------
:class:`openpathsampling.trajectory.Trajectory`
the trajectory showing the subsets of solute atoms
"""
return self.subset(None)
[docs] def md(self, topology=None):
"""
Construct a mdtraj.Trajectory object from the Trajectory itself
Parameters
----------
topology : :class:`mdtraj.Topology`
If not None this topology will be used to construct the mdtraj
objects otherwise the topology object will be taken from the
configurations in the trajectory snapshots.
Returns
-------
:class:`mdtraj.Trajectory`
the trajectory
"""
if topology is None:
topology = self.topology.md
output = self.coordinates()
return md.Trajectory(output, topology)
@property
def topology(self):
"""
Return a Topology object representing the topology of the
current view of the trajectory
Returns
-------
:class:`openpathsampling.topology.Topology`
the topology object
"""
topology = None
if len(self) > 0 and self[0].topology is not None:
# if no topology is defined
topology = self[0].topology
if self.atom_indices is not None:
topology = topology.subset(self.atom_indices)
return topology