"""
Created on 03.09.2014
@author: Jan-Hendrik Prinz, David W.H. Swenson
"""
import abc
import logging
import itertools
from openpathsampling.netcdfplus import StorableNamedObject
import openpathsampling as paths
from future.utils import with_metaclass
logger = logging.getLogger(__name__)
init_log = logging.getLogger('openpathsampling.initialization')
# TODO: Make Full and Empty be Singletons to avoid storing them several times!
[docs]
def join_ensembles(ensemble_list):
"""Join several ensembles using a set theory union.
Parameters
----------
ensemble_list : list of :class:`.Ensemble`
list of ensembles to join
Returns
-------
:class:`.Ensemble`
union of all given ensembles
"""
ensemble = None
for ens in ensemble_list:
if ensemble is None:
ensemble = ens
else:
ensemble = ensemble | ens
return ensemble
def _get_list_traj(trajectory):
"""Return a list of (proxy) snapshots from either a list or Trajectory
Parameters
----------
trajectory : :class:`.Trajectory` or :class:`.list`
trajectory or list to convert into a list of snapshots
Returns
-------
:class:`.list`
list of (proxy) snapshots
Note
----
Due to possible UUID space restrictions, we don't want to slice
Trajectories if we can help it (as this will generate a new Trajectory
object with its own UUID). This is a convenience function that will either
turn a Trajectory into a list of (proxy) snapshots or just list to list
mapping, which you can slice without generating a new UUID
For a more in-depth discussion, please see:
https://github.com/openpathsampling/openpathsampling/pull/978
"""
itraj = getattr(trajectory, 'iter_proxies', trajectory.__iter__)
return list(itraj())
# note: the cache is not storable, because that would just be silly!
class EnsembleCache(object):
"""Object used by ensembles to enable fast algorithms for basic functions.
The contents stored in the `can_append`, `can_prepend`, `call`, and
`check_reverse` dictionaries will depend on the ensemble. Only two of
these dictionaries should be non-`None` at any time: either the pair
`call` and `can_append`, or the pair `check_reverse` and `can_prepend`.
This object also contains basic functions to manage the cache.
Attributes
----------
start_frame : :class:`openpathsampling.snapshot.Snapshot`
prev_last_frame : :class:`openpathsampling.snapshot.Snapshot`
direction : +1 or -1
contents : dictionary
"""
def __init__(self, direction=None):
self.start_frame = None
self.prev_last_frame = None
self.prev_last_index = None
self.last_length = None
self.direction = direction
self.contents = {}
self.trusted = False
self.debug_enabled = False
def bad_direction_error(self):
raise RuntimeError("EnsembleCache.direction = " +
str(self.direction) + " invalid.") # nocover
# def clear(self):
# self.start_frame = None
# self.prev_last_frame = None
# self.last_length = None
# self.contents = {}
def check(self, trajectory=None, reset=None):
"""Checks and resets (if necessary) the ensemble cache.
The trajectory is considered trustworthy based on checking several
factors, compared to the last time the cache was checked. For
forward caches (direction > 0), these are
* the first frame has not changed
* the length is the same, or has changed by 1
* if length unchanged, the final frame is the same; if length
changed by 1, the penultimate frame is the old final frame
Similar rules apply for backward caches (direction < 0), with
obvious changes of "final" and "first" frames.
If the trajectory is not trustworthy, we return True (should be
reset).
Parameters
----------
trajectory : :class:`.Trajectory`
the trajectory to test
reset : bool or None
force a value for reset. If None, the value is determined based
on the test criteria.
Returns
-------
bool :
the value of reset
"""
if self.debug_enabled:
logger.debug("Checking cache....")
# logger.debug("traj " + str([id(s) for s in trajectory]))
logger.debug("start_frame " + str(id(self.start_frame)))
logger.debug("prev_last " + str(id(self.prev_last_frame)))
logger.debug("prev_last_idx " + str(self.prev_last_index))
if trajectory is not None:
# this might get a list instead of Trajectory from internal
# functions
get_frame = getattr(trajectory, "get_as_proxy",
trajectory.__getitem__)
# if the first frame has changed, we should reset
if reset is None:
lentraj = len(trajectory)
if self.direction > 0:
if get_frame(0) != self.start_frame:
reset = True
else:
if lentraj == 1:
# makes no difference here; always reset
reset = True
elif lentraj == self.last_length:
reset = (get_frame(-1) != self.prev_last_frame)
elif lentraj == self.last_length + 1:
reset = (get_frame(-2) != self.prev_last_frame)
else:
reset = True
elif self.direction < 0:
if get_frame(-1) != self.start_frame:
reset = True
else:
if lentraj == 1:
reset = True
elif lentraj == self.last_length:
reset = (get_frame(0) != self.prev_last_frame)
elif lentraj == self.last_length + 1:
reset = (get_frame(1) != self.prev_last_frame)
else:
reset = True
else:
self.bad_direction_error()
else:
reset = True
self.trusted = not reset
self.last_length = len(trajectory)
if reset:
self.debug_enabled = logger.isEnabledFor(logging.DEBUG)
if self.debug_enabled:
logger.debug("Resetting cache " + str(self))
if self.direction > 0:
# TODO: this can be hit with trajectory is None?
self.start_frame = get_frame(0)
self.prev_last_frame = get_frame(-1)
self.last_length = len(trajectory)
self.contents = {}
elif self.direction < 0:
# TODO: this can be hit with trajectory is None?
self.start_frame = get_frame(-1)
self.prev_last_frame = get_frame(0)
self.last_length = len(trajectory)
self.contents = {}
else:
self.bad_direction_error()
else:
self.trusted = True
# by returning reset, we allow the functions that call this to reset
# other things as well
if self.direction > 0:
# TODO: this can be hit with trajectory is None?
self.prev_last_frame = get_frame(-1)
self.prev_last_index = len(trajectory) - 1
elif self.direction < 0:
# TODO: this can be hit with trajectory is None?
self.prev_last_frame = get_frame(0)
self.prev_last_index = 0
else:
self.bad_direction_error()
return reset
[docs]
class Ensemble(with_metaclass(abc.ABCMeta, StorableNamedObject)):
"""
Path ensemble object.
An Ensemble represents a path ensemble, effectively a set of trajectories.
Typical set operations are allowed, here: and, or, xor, -(without), ~
(inverse = all - x)
Notes
-----
Maybe replace - by / to get better notation. So far it has not been used
"""
#__metaclass__ = abc.ABCMeta
[docs]
def __init__(self):
"""
A path volume defines a set of paths.
"""
super(Ensemble, self).__init__()
self._saved_str = None # cached first time it is requested
# https://docs.python.org/3/reference/datamodel.html#object.__hash__
__hash__ = StorableNamedObject.__hash__
def __eq__(self, other):
if self is other:
return True
if (
self.__class__ == other.__class__
and self.__uuid__ == other.__uuid__
):
return True
return str(self) == str(other)
def __ne__(self, other):
return not self == other
@abc.abstractmethod
def __call__(self, trajectory, trusted=None, candidate=False):
"""
Return `True` if the trajectory is part of the path ensemble.
Parameters
----------
trajectory: :class:`.Trajectory`
The trajectory to be checked
trusted : boolean
For many ensembles, a faster algorithm can be used if we know
some information about the trajectory with one fewer frames.
The `trusted` flag tells the ensemble to use such an algorithm.
This is usually used in combination with an
:class:`.EnsembleCache` which makes short-cut calculations
possible.
"""
return False
def check_reverse(self, trajectory, trusted=False):
"""
See __call__; same thing, but potentially in reverse frame order
"""
return self(trajectory, trusted=False)
def check(self, trajectory):
"""Alias for __call__"""
return self(trajectory, trusted=False)
def trajectory_summary(self, trajectory):
"""
Return dict with info on how this ensemble "sees" the trajectory.
Parameters
----------
trajectory : `openpathsampling.Trajectory`
"""
return {}
def trajectory_summary_str(self, trajectory):
"""
Returns a string with the results of the trajectory_summary function.
Parameters
----------
trajectory : `openpathsampling.Trajectory`
"""
summ = self.trajectory_summary(trajectory)
if summ == {}:
return "No summary available"
else:
return str(summ)
def can_append(self, trajectory, trusted=False):
"""
Returns true, if the trajectory so far can still be in the ensemble
if it is appended by a frame. To check, it assumes that the
trajectory to length L-1 is okay. This is mainly for interactive
usage, when a trajectory is generated.
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be tested
trusted : bool
If trusted=True, some ensembles can be computed more efficiently
(e.g., by checking only one frame)
Returns
-------
bool
Returns true or false if using a forward step (extending the
trajectory forward in time at its end) `trajectory` could still
be in the ensemble and thus makes sense to continue a simulation
"""
return True
def can_prepend(self, trajectory, trusted=False):
"""
Returns true, if the trajectory so far can still be in the ensemble
if it is prepended by a frame. To check, it assumes that the
trajectory from index 1 is okay. This is mainly for interactive
usage, when a trajectory is generated using a backward move.
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be tested
trusted : bool
If trusted=True, some ensembles can be computed more efficiently
(e.g., by checking only one frame)
Returns
-------
bool
Returns true or false if using a backward step (extending the
trajectory backwards in time at its beginning) `trajectory`
could still be in the ensemble and thus makes sense to continue
a simulation
"""
return True
def strict_can_append(self, trajectory, trusted=False):
"""
Returns true if the trajectory can be the beginning of a trajectory
in the ensemble.
Parameters
----------
trajectory : :class:`.Trajectory`
trajectory to test
trusted : bool
If trusted=True, some ensembles can be computed more efficiently
(e.g., by checking only one frame)
Returns
-------
bool
True if and only if the given trajectory can be the beginning of
a trajectory in the ensemble.
"""
# default behavior is to be the same as can_append
return self.can_append(trajectory, trusted)
def strict_can_prepend(self, trajectory, trusted=False):
"""
Returns true if the trajectory can be the end of a trajectory in the
ensemble.
Parameters
----------
trajectory : :class:`.Trajectory`
trajectory to test
trusted : bool
If trusted=True, some ensembles can be computed more efficiently
(e.g., by checking only one frame)
Returns
-------
bool
True if and only if the given trajectory can be the end of a
trajectory in the ensemble.
"""
# default behavior is to be the same as can_prepend
return self.can_prepend(trajectory, trusted)
def iter_valid_slices(
self,
trajectory,
max_length=None,
min_length=1,
overlap=1,
reverse=False
):
"""
Return an iterator over slices of subtrajectories matching the ensemble
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be splitted into ensemble parts
max_length : int > 0, optional
if set this determines the maximal size to be tested (is mainly
used in the recursion)
min_length : int > 0, optional
if set this determines the minimal size to be tested (in lazy
mode might no
overlap : int >= 0, optional
determines the allowed overlap of all trajectories to be found.
A value of x means that two sub-trajectorie can share up to x
frames at the beginning and x frames at the end. Default is 1
reverse : bool
if `True` this will start searching from the end of the trajectory.
Otherwise (default) it will start at the beginning.
Returns
-------
list of `slice`
Returns a list of index-slices for sub-trajectories in
trajectory that are in the ensemble.
"""
length = len(trajectory)
if max_length is None:
max_length = length
max_length = min(length, max_length)
min_length = max(1, min_length)
logger.debug("Looking for subtrajectories in " + str(trajectory))
old_tt_len = 0
if not reverse:
start = 0
end = start + min_length
while start <= length - min_length and end <= length:
# print start, end
tt = trajectory[start:end]
if len(tt) != old_tt_len + 1:
can_append_tt = self.strict_can_append(tt)
else:
can_append_tt = self.strict_can_append(tt, trusted=True)
old_tt_len = len(tt)
if end < length and can_append_tt:
end += 1
if end - start > max_length + 1:
start += 1
end = start + min_length
else:
if end - start <= max_length and self(tt, trusted=False):
yield slice(start, end)
pad = min(overlap, end - start - 1)
start = end - pad
if end == length:
# This means we have reached the end and should stop
# All other possible subtraj can only be contained
# in already existing ones
start = length
elif end - start >= min_length + 1 and \
self(tt[0:len(tt) - 1], trusted=False):
yield slice(start, end - 1)
pad = min(overlap + 1, end - start - 2)
start = end - pad
else:
# TODO: for some ensembles, there are better ways to
# change start. For frame-by-frame ensembles
# (AllInX, AllOutX) we know that we can completely
# stop for all subtrajectories.
start += 1
end = start + min_length
else:
end = length
start = end - min_length
while start >= 0 and end >= min_length:
tt = trajectory[start:end]
if len(tt) != old_tt_len + 1:
can_prepend_tt = self.can_prepend(tt)
else:
can_prepend_tt = self.can_prepend(tt, trusted=True)
old_tt_len = len(tt)
if start > 0 and can_prepend_tt:
start -= 1
if end - start > max_length + 1:
end -= 1
start = end - min_length
else:
if end - start <= max_length and self(tt, trusted=False):
yield slice(start, end)
pad = min(overlap, end - start - 1)
end = start + pad
if start == 0:
# This means we have reached the end and should stop
# All other possible subtraj can only be contained
# in already existing ones
end = 0
elif end - start >= min_length + 1 and \
self(tt[1:len(tt)], trusted=False):
yield slice(start + 1, end)
pad = min(overlap + 1, end - start - 2)
end = start + pad
else:
end -= 1
start = end - min_length
def iter_extendable_slices(
self,
trajectory,
max_length=None,
min_length=1,
overlap=1,
reverse=False
):
"""
Return an iterator over maxiaml slices of extendable subtrajectories
In comparison to the iter_valid_slices this will return maximal
subtrajectories that can potentially be extended into samples of the
ensemble. Shorter subparts will also always work. Where we always
use strict_can_append. So for forward extentable ensembles you can
cut at the end and for backward extendable ones you can cut at the
beginning.
Notes
-----
This feature is not yet fully tested and should be used with care!
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be splitted into ensemble parts
max_length : int > 0, optional
if set this determines the maximal size to be tested (is mainly
used in the recursion)
min_length : int > 0, optional
if set this determines the minimal size to be tested (in lazy
mode might no
overlap : int >= 0, optional
determines the allowed overlap of all trajectories to be found.
A value of x means that two sub-trajectorie can share up to x
frames at the beginning and x frames at the end. Default is 1
reverse : bool
if `True` this will start searching from the end of the trajectory.
Otherwise (default) it will start at the beginning.
Returns
-------
list of `slice`
Returns a list of index-slices for sub-trajectories in
trajectory that are in the ensemble.
"""
length = len(trajectory)
logger.info('`iter_extendable_slices` is experimental. Use it on your '
'own risk!')
if max_length is None:
max_length = length
max_length = min(length, max_length)
min_length = max(1, min_length)
logger.debug("Looking for subtrajectories in " + str(trajectory))
old_tt_len = 0
if not reverse:
start = 0
end = start + min_length
while start <= length - min_length and end <= length:
# print start, end
tt = trajectory[start:end]
if len(tt) != old_tt_len + 1:
can_append_tt = self.strict_can_append(tt)
else:
can_append_tt = self.strict_can_append(tt, trusted=True)
old_tt_len = len(tt)
if end < length and can_append_tt:
end += 1
if end - start > max_length + 1:
start += 1
end = start + min_length
else:
if end - start <= max_length + 1:
yield slice(start, end - 1)
pad = min(overlap, end - start - 1)
start = end - pad
if end == length:
# This means we have reached the end and should stop
# All other possible subtraj can only be contained
# in already existing ones
start = length
else:
start += 1
end = start + min_length
else:
end = length
start = end - min_length
while start >= 0 and end >= min_length:
tt = trajectory[start:end]
if len(tt) != old_tt_len + 1:
can_prepend_tt = self.can_prepend(tt)
else:
can_prepend_tt = self.can_prepend(tt, trusted=True)
old_tt_len = len(tt)
if start > 0 and can_prepend_tt:
start -= 1
if end - start > max_length + 1:
end -= 1
start = end - min_length
else:
if end - start <= max_length + 1:
yield slice(start, end - 1)
pad = min(overlap, end - start - 1)
end = start + pad
if start == 0:
# This means we have reached the end and should stop
# All other possible subtraj can only be contained
# in already existing ones
end = 0
else:
end -= 1
start = end - min_length
def find_first_subtrajectory(self, trajectory):
"""
Return the first sub-trajectory that matches the ensemble
Parameters
----------
trajectory : :class:`openpathsampling.Trajectory`
the trajectory in which to look for sub-trajectories
Returns
-------
:class:`openpathsampling.Trajectory` or None
the found sub-trajectory or None if no sub-trajectory was found
"""
try:
return trajectory[
next(self.iter_valid_slices(trajectory))]
except StopIteration:
return None
def find_last_subtrajectory(self, trajectory):
"""
Return the last sub-trajectory that matches the ensemble
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
the trajectory in which to look for sub-trajectories
Returns
-------
:class:`openpathsampling.Trajectory` or None
the found sub-trajectory or None if no sub-trajectory was found
"""
try:
return trajectory[
next(self.iter_valid_slices(trajectory, reverse=True))]
except StopIteration:
return None
def iter_split(
self,
trajectory,
max_length=None,
min_length=1,
overlap=1,
reverse=False):
"""Return iterator over subtrajectories satisfying the given ensemble.
Parameters
----------
trajectory : :py:class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be splitted into ensemble parts
max_length : int > 0
if set this determines the maximal size to be tested (is mainly
used in the recursion)
min_length : int > 0
if set this determines the minimal size to be tested (in lazy
mode might no
overlap : int >= 0
determines the allowed overlap of all trajectories to be found.
A value of x means that two sub-trajectory can share up to x
frames at the beginning and x frames at the end. Default is 1
reverse : bool
if `True` this will start searching from the end of the trajectory.
Otherwise (default) it will start at the beginning.
Returns
-------
iterator of :class:`openpathsampling.trajectory.Trajectory`
Returns a list of sub-trajectories in trajectory that are in the
ensemble.
Notes
-----
This uses self.iter_valid_slices and returns the actual sub-trajectories
"""
for part in self.iter_valid_slices(
trajectory, max_length, min_length, overlap, reverse):
yield trajectory[part]
def split(
self,
trajectory,
max_length=None,
min_length=1,
overlap=1,
reverse=False,
n_results=0):
"""Return list of subtrajectories satisfying the given ensemble.
Parameters
----------
trajectory : :py:class:`openpathsampling.trajectory.Trajectory`
the actual trajectory to be splitted into ensemble parts
max_length : int > 0
if set this determines the maximal size to be tested (is mainly
used in the recursion)
min_length : int > 0
if set this determines the minimal size to be tested (in lazy
mode might no
overlap : int >= 0
determines the allowed overlap of all trajectories to be found.
A value of x means that two sub-trajectory can share up to x
frames at the beginning and x frames at the end. Default is 1
reverse : bool
if `True` this will start searching from the end of the trajectory.
Otherwise (default) it will start at the beginning.
n_results : int
if `0` this will return all results. If the integer is larger than
zero it will stop after the given number of slices has been found
Returns
-------
list of :class:`openpathsampling.trajectory.Trajectory`
Returns a list of sub-trajectories in trajectory that are in the
ensemble.
Notes
-----
This uses self.find_valid_slices and returns the actual sub-trajectories
"""
indices = self.iter_valid_slices(trajectory, max_length,
min_length, overlap, reverse)
if n_results > 0:
return [
trajectory[part]
for part in itertools.islice(indices, n_results)]
else:
return [trajectory[part] for part in indices]
@property
def extendable_sub_ensembles(self):
return {}
def get_sample_from_trajectories(
self, trajectories,
replica=0,
used_trajectories=None,
reuse_strategy='avoid-symmetric'
):
"""
Generate a sample in the ensemble by testing `trajectories`
Parameters
----------
trajectories : (list of) :class:`openpathsampling.trajectory.Trajectory`
single trajectory of list of trajectories to be used to create a
sample in this ensemble
replica : int
the replica id for the sample to be created
used_trajectories : (list of) :class:`openpathsampling.trajectory.Trajectory`
trajectories not taken into account in the first attempt
reuse_strategy : str
if `avoid` then in a second attempt the used trajectories are
tried
"""
trajectories = paths.Trajectory._to_list_of_trajectories(trajectories)
used_and_possible = []
for idx, traj in enumerate(trajectories):
if traj not in used_trajectories and (
not reuse_strategy.endswith('symmetric') or
traj.reversed not in used_trajectories):
if self(traj):
return paths.Sample(
trajectory=traj,
ensemble=self,
replica=replica
)
else:
used_and_possible.append(traj)
return self._handle_used_trajectories(
used_trajectories,
used_and_possible,
reuse_strategy)
def split_sample_from_trajectories(
self, trajectories,
replica=0,
used_trajectories=None,
reuse_strategy='avoid-symmetric',
unique='shortest'):
"""
Generate a sample in the ensemble by searching for sub-parts
Parameters
----------
trajectories : (list of) :class:`openpathsampling.trajectory.Trajectory`
single trajectory of list of trajectories to be used to create a
sample in this ensemble
replica : int
the replica id for the sample to be created
used_trajectories : (list of) :class:`openpathsampling.trajectory.Trajectory`
trajectories not taken into account in the first attempt
reuse_strategy : str
if `avoid` then in a second attempt the used trajectories are
tried
unique : str
If `first` the first found subtrajectory is selected. If
`shortest` then from all subparts the shortest one is used.
"""
trajectories = paths.Trajectory._to_list_of_trajectories(trajectories)
used_and_possible = []
for idx, traj in enumerate(trajectories):
parts = self._get_trajectory_parts_in_order(traj, unique)
for part in parts:
if part not in used_trajectories and (
not reuse_strategy.endswith('symmetric') or
part.reversed not in used_trajectories):
return paths.Sample(
trajectory=part,
ensemble=self,
replica=replica
)
else:
used_and_possible.append(part)
return self._handle_used_trajectories(
used_trajectories,
used_and_possible,
reuse_strategy)
def extend_sample_from_trajectories(
self,
trajectories,
engine,
replica=0,
unique='median',
level='complex',
on_error='retry',
attempts=2):
"""
Generate a sample in the ensemble by extending parts of `trajectories`
This will take an initial trajectory look for useable subparts and
try to extend them into a valid sample. This works by taking information
from an ensemble what are resonable subparts, this is returned by a
function `.extendable_sub_ensembles()` which is only defined for
complex ensembles like Minus or TIS ensemble.
As an example the minus could extend from the segment ensemble or even
a segment + parts completely in the inner ensemble. Of course the
ensemble itself is always valid.
The function tries to find extendable subparts from largest to smallest
ones, starting with the ensemble itself and ending with small subparts
If a list of trajectories is provided it will be attempt to find a
valid trajectory using all the trajectory parts.
Parameters
----------
trajectories : (list of) :class:`openpathsampling.trajectory.Trajectory`
single trajectory of list of trajectories to be used to create a
sample in this ensemble
engine : :class:`openpathsampling.dynamicsengine.DynamicsEngine`
engine to use for MD extension
replica : int
the replica id for the sample to be created
unique : str
If `first` the first found subtrajectory is selected. If
`shortest` then from all subparts the shortest one is used.
level : str
there are three levels you chose and not all are implemented for
an ensemble. For all ensembles you can use `native` which will
simply try to extend the ensemble itself, the mose simple one, which
is always possible.
Picking `complex` will use the largest (most complex)
sub-ensemble that makes sense. Like in the case of a Minus move
this is the segment ensemble.
The other choice is `minimal` which
choses the minimal necessary subtrajectory extending makes sense
from. For TIS or Minus Ensembles this will be crossing from the
(initial) core to the outside. You should try `complex` first and
then `minimal`. `complex` should be much faster.
on_error : str
if `retry` (default) then any error will trigger a retry and
eventually no sample will be retured. `fail` will raise the
exception. Typical things to happen are `MaxLengthError` or
`NaNError`, but also initialisation error can happen. `fail` should
only be used for debugging purposes since you will not get a
preliminary sampleset as a result but an exception.
attempts : int
the number of attemps on a trajectory to extend
"""
logger.info("Starting extend_sample_from_trajectories with level "
+ str(level))
if level == 'native':
sub_ensemble = self
else:
if not hasattr(self, 'extendable_sub_ensembles'):
logger.info("Missing ensemble.extendable_sub_ensembles")
return None
sub_ensembles = self.extendable_sub_ensembles
if level not in sub_ensembles:
logger.info("Missing level: " + repr(level))
return None
sub_ensemble = sub_ensembles[level]
trajectories = paths.Trajectory._to_list_of_trajectories(trajectories)
for idx, traj in enumerate(trajectories):
traj_parts = sub_ensemble._get_trajectory_parts_in_order(
traj, unique)
for orig in traj_parts:
for attempt in range(attempts):
part = paths.Trajectory(orig)
logger.info((
'extend - attempt [%d] : extending from initial '
'length %d\n') % (
attempt + 1,
len(part)
))
try:
if self.strict_can_append(part):
# seems we could extend forward
part = part[:-1] + \
engine.generate(
part[-1],
[paths.PrefixTrajectoryEnsemble(
self,
part
).strict_can_append],
direction=+1
)
if self.strict_can_prepend(part):
# and extend backward
part = engine.generate(
part[0].reversed,
[paths.SuffixTrajectoryEnsemble(
self,
part
).strict_can_prepend],
direction=-1
).reversed + part[1:]
logger.info("Candidate trajectory: " + str(part))
if self(part): # make sure we found a sample
return paths.Sample(
trajectory=part,
ensemble=self,
replica=replica
)
except paths.engines.EngineError as e:
if on_error == 'fail':
raise
elif on_error == 'retry':
pass
else:
# This should not happen!
pass
logger.info("Returning None because nothing worked")
return None
def _get_trajectory_parts_in_order(self, traj, unique='first'):
if unique == 'first':
# this returns an iterator and can thus be faster
parts = self.iter_split(traj)
elif unique == 'shortest':
parts = sorted(self.split(traj), key=len)
elif unique == 'median':
# resort the found trajectories so that the middle one is
# first, then the one right to it, then the one before, etc
# e.g. [0,1,2,3,4,5,6,7,8,9] is rearranges into
# [5,4,6,3,7,2,8,1,9,0]
ordered = sorted(self.split(traj), key=len)
parts = list([p for p2 in zip(
ordered[len(ordered) // 2:],
reversed(ordered[:len(ordered) // 2])
) for p in p2])
if len(ordered) & 1:
parts.append(ordered[-1])
elif unique == 'longest':
parts = sorted(self.split(traj), key=len, reverse=True)
else:
parts = []
try:
if len(parts) > 0:
lens = map(len, parts)
logger.info(
('splitting - found %d slices of lengths '
'[%d, ..., %d, ..., %d] '
'ordered by `%s`\n') % (
len(parts),
min(lens),
sorted(lens)[len(parts) / 2],
max(lens),
unique
))
except TypeError:
pass
return parts
def _handle_used_trajectories(
self,
used_trajectories,
used_and_possible,
reuse_strategy):
if reuse_strategy.startswith('avoid') \
and used_trajectories is not None:
for part in used_trajectories:
if part in used_and_possible:
if self(part):
# move the used one to the back of the list to
# not reuse it directly
del used_trajectories[used_trajectories.index(part)]
used_trajectories.append(part)
return paths.Sample(
trajectory=part,
ensemble=self
)
if reuse_strategy.endswith('symmetric'):
if part.reversed in used_and_possible:
if self(part):
# move the used one to the back of the list to
# not reuse it directly
del used_trajectories[used_trajectories.index(part)]
used_trajectories.append(part)
return paths.Sample(
trajectory=part,
ensemble=self
)
return None
def __str__(self):
if self._saved_str is None:
self._saved_str = self._str()
return self._saved_str
def _str(self):
"""
Returns a complete mathematical expression that defines the current
ensemble in a readable form.
Notes
-----
This should be cleaned up a little
"""
return 'Ensemble'
def __or__(self, other):
if self is other:
return self
elif type(other) is EmptyEnsemble:
return self
elif type(other) is FullEnsemble:
return other
else:
return UnionEnsemble(self, other)
# This is not correct for all ensembles.
# def __xor__(self, other):
# # TODO: return (self | other) & ~(self & other)
# # NOTE: that should also get the automatic special case handling
# # (other is self, Empty, or Full) from treatment in __and__/__or__
# if self is other:
# return EmptyEnsemble()
# elif type(other) is EmptyEnsemble:
# return self
# elif type(other) is FullEnsemble:
# return NegatedEnsemble(self)
# else:
# return SymmetricDifferenceEnsemble(self, other)
def __and__(self, other):
if self is other:
return self
elif type(other) is EmptyEnsemble:
return other
elif type(other) is FullEnsemble:
return self
else:
return IntersectionEnsemble(self, other)
# This is not correct for all ensembles.
# def __sub__(self, other):
# if self is other:
# return EmptyEnsemble()
# elif type(other) is EmptyEnsemble:
# return self
# elif type(other) is FullEnsemble:
# return EmptyEnsemble()
# else:
# return RelativeComplementEnsemble(self, other)
# This is not correct for all ensembles.
# def __invert__(self):
# return NegatedEnsemble(self)
@staticmethod
def _indent(s):
spl = s.split('\n')
spl = [' ' + p for p in spl]
return '\n'.join(spl)
[docs]
class EmptyEnsemble(Ensemble):
"""
The empty path ensemble of no trajectories.
"""
[docs]
def __init__(self):
super(EmptyEnsemble, self).__init__()
def __call__(self, trajectory, trusted=None, candidate=False):
return False
def can_append(self, trajectory, trusted=False):
return False
def can_prepend(self, trajectory, trusted=False):
return False
def __invert__(self):
return FullEnsemble()
def __sub__(self, other):
return EmptyEnsemble()
def __and__(self, other):
return self
def __xor__(self, other):
return other
def __or__(self, other):
return other
def _str(self):
return 'empty'
[docs]
class FullEnsemble(Ensemble):
"""
The full path ensemble of all possible trajectories.
"""
[docs]
def __init__(self):
super(FullEnsemble, self).__init__()
def __call__(self, trajectory, trusted=None, candidate=False):
return True
def can_append(self, trajectory, trusted=False):
return True
def can_prepend(self, trajectory, trusted=False):
return True
def __invert__(self):
return EmptyEnsemble()
def __sub__(self, other):
if type(other) is EmptyEnsemble:
return self
elif type(other) is FullEnsemble:
return EmptyEnsemble()
else:
return NegatedEnsemble(other)
def __and__(self, other):
return other
def __xor__(self, other):
if type(other) is EmptyEnsemble:
return self
elif type(other) is FullEnsemble:
return EmptyEnsemble()
else:
return NegatedEnsemble(other)
def __or__(self, other):
return self
def _str(self):
return 'all'
class NegatedEnsemble(Ensemble):
"""
Negates an Ensemble and simulates a `not` statement
"""
# TODO: this whole concept is false and this should be removed
def __init__(self, ensemble):
super(NegatedEnsemble, self).__init__()
self.ensemble = ensemble
def __call__(self, trajectory, trusted=None, candidate=False):
return not self.ensemble(trajectory, trusted, candidate)
def can_append(self, trajectory, trusted=False):
# We cannot guess the result here so keep on running forever
return True
def can_prepend(self, trajectory, trusted=False):
# We cannot guess the result here so keep on running forever
return True
def _str(self):
return 'not ' + str(self.ensemble)
[docs]
class EnsembleCombination(Ensemble):
"""
Logical combination of two ensembles
"""
[docs]
def __init__(self, ensemble1, ensemble2, fnc, str_fnc):
super(EnsembleCombination, self).__init__()
self.ensemble1 = ensemble1
self.ensemble2 = ensemble2
self.fnc = fnc
self.sfnc = str_fnc
self.debug = logger.isEnabledFor(logging.DEBUG)
def to_dict(self):
return {'ensemble1': self.ensemble1, 'ensemble2': self.ensemble2}
def _generalized_short_circuit(self, combo, f1, f2, trajectory, trusted,
fname=""):
"""
Handles short-circuit logic, all in one place for code simplicity.
Short-circuit logic skips the second part of the combination if the
result doesn't depend on it.
Note
----
If you want to enable debug logging for this, it either needs to
be enabled when the class is instantiated or set with the .debug
instance variable. This is to improve performance since this
method is called very frequently.
Parameters
----------
combo :
the combination function
f1 :
ensemble1's function. Takes trajectory, returns bool. Examples
include `__call__`, `can_append`, etc.
f2 :
ensemble2's function. As with f1, but for ensemble 2.
trajectory : :class:`.Trajectory`
input trajectory
trusted : bool
the `trusted` flag to send to f1 and f2
fname : string
name of the functions f1 and f2. Only used in debug output.
"""
if self.debug: # pragma: no cover
logger.debug("Combination is " + self.__class__.__name__)
a = f1(trajectory, trusted)
if self.debug: # pragma: no cover
logger.debug("Combination." + fname + ": " +
self.ensemble1.__class__.__name__ + " is " + str(a))
ens2 = f2(trajectory, trusted)
# logger.debug("Doing ens2_prime")
# ens2_prime = f2(trajectory, trusted)
logger.debug("Combination." + fname + ": " +
self.ensemble2.__class__.__name__ + " is " + str(ens2))
# assert(ens2 == ens2_prime)
logger.debug("Combination should return " + str(self.fnc(a, ens2)))
res_true = self.fnc(a, True)
res_false = self.fnc(a, False)
if res_false == res_true:
# result is independent of ensemble_b so ignore it
# logger.debug("Returning res_true == res_false ==" + str(res_true))
return res_true
else:
b = f2(trajectory, trusted)
# logger.debug("Needs test:" + str(a) + " " + str(self.fnc) +
# str(b) + str(self.fnc(a,b)))
return self.fnc(a, b)
def __call__(self, trajectory, trusted=None, candidate=False):
return self._generalized_short_circuit(
combo=self.fnc,
f1=self.ensemble1,
f2=self.ensemble2,
trajectory=trajectory,
trusted=trusted,
fname="__call__"
)
def can_append(self, trajectory, trusted=False):
return self._generalized_short_circuit(
combo=self.fnc,
f1=self.ensemble1.can_append,
f2=self.ensemble2.can_append,
trajectory=trajectory,
trusted=trusted,
fname="can_append"
)
def can_prepend(self, trajectory, trusted=False):
return self._generalized_short_circuit(
combo=self.fnc,
f1=self.ensemble1.can_prepend,
f2=self.ensemble2.can_prepend,
trajectory=trajectory,
trusted=trusted,
fname="can_prepend"
)
def strict_can_append(self, trajectory, trusted=False):
return self._generalized_short_circuit(
combo=self.fnc,
f1=self.ensemble1.strict_can_append,
f2=self.ensemble2.strict_can_append,
trajectory=trajectory,
trusted=trusted,
fname="strict_can_append"
)
def strict_can_prepend(self, trajectory, trusted=False):
return self._generalized_short_circuit(
combo=self.fnc,
f1=self.ensemble1.strict_can_prepend,
f2=self.ensemble2.strict_can_prepend,
trajectory=trajectory,
trusted=trusted,
fname="strict_can_prepend"
)
def _str(self):
# print self.sfnc, self.ensemble1, self.ensemble2,
# print self.sfnc.format(
# '(' + str(self.ensemble1) + ')',
# '(' + str(self.ensemble1) + ')')
return self.sfnc.format(
'(\n' + Ensemble._indent(str(self.ensemble1)) + '\n)',
'(\n' + Ensemble._indent(str(self.ensemble2)) + '\n)')
[docs]
class UnionEnsemble(EnsembleCombination):
[docs]
def __init__(self, ensemble1, ensemble2):
super(UnionEnsemble, self).__init__(ensemble1, ensemble2,
fnc=lambda a, b: a or b,
str_fnc='{0}\nor\n{1}')
[docs]
class IntersectionEnsemble(EnsembleCombination):
[docs]
def __init__(self, ensemble1, ensemble2):
super(IntersectionEnsemble, self).__init__(ensemble1, ensemble2,
fnc=lambda a, b: a and b,
str_fnc='{0}\nand\n{1}')
# class SymmetricDifferenceEnsemble(EnsembleCombination):
# # TODO: this is not yet supported. Should be removed. ~DWHS
# # should just be a shortcut for (ens1 | ens2) & ~(ens1 & ens2)
# # should probably not even be a class. Just have `ensemble.__xor__`
# # return (ens1 | ens2) & ~(ens1 & ens2)
# def __init__(self, ensemble1, ensemble2):
# super(SymmetricDifferenceEnsemble, self).__init__(
# ensemble1,
# ensemble2,
# fnc=lambda a, b: a ^ b,
# str_fnc='{0}\nxor\n{1}')
# class RelativeComplementEnsemble(EnsembleCombination):
# # TODO: this is not yet supported. Should be removed. ~DWHS
# # should be a shortcut for ens1 & ~ens2
# # should probably not even be a class. Just have `ensemble.__sub__`
# # return ens1 & ~ens2
# def __init__(self, ensemble1, ensemble2):
# super(RelativeComplementEnsemble, self).__init__(
# ensemble1,
# ensemble2,
# fnc=lambda a, b: a and not b,
# str_fnc='{0}\nand not\n{1}')
[docs]
class SequentialEnsemble(Ensemble):
"""Ensemble which satisfies several subensembles in sequence.
Attributes
----------
ensembles : tuple of Ensemble
The ensembles, in time-order of when they should occur in the
trajectory.
min_overlap : int or tuple of int
The minimum number of frames that overlap between two ensembles in
the sequence. A positive number n indicates that at least n frames
must be in both ensembles at the transition between them. A negative
number -n indicates that at least n frames in neither ensemble at
the transition between them. If given as a list, the list should be
of length len(ensembles)-1, with one value for each transition. If
given as an integer, that value will be used for all transitions.
max_overlap : int or list of int
The maximum number of frames that overlap between two ensembles in
the sequence. A positive number n indicates that no more than n
frames can be in both ensembles at the transition between them. A
negative number -n indicates no more than n frames in neither
ensemble at the transition between them. If given as a list, the
list should be of length len(ensembles)-1, with one value for each
transition. If given as an integer, that value will be used for all
transitions.
Notes
-----
TODO: Overlap features not implemented because ohmygod this was hard
enough already.
"""
[docs]
def __init__(self, ensembles, min_overlap=0, max_overlap=0, greedy=False):
# make tuples of the min/max overlaps
super(SequentialEnsemble, self).__init__()
if type(min_overlap) is int:
min_overlap = (min_overlap,) * (len(ensembles) - 1)
if type(max_overlap) is int:
max_overlap = (max_overlap,) * (len(ensembles) - 1)
self.ensembles = ensembles
self.min_overlap = min_overlap
self.max_overlap = max_overlap
self.greedy = greedy
self._use_cache = True # cache can be turned off
self._cache_can_append = EnsembleCache(+1)
self._cache_strict_can_append = EnsembleCache(+1)
self._cache_call = EnsembleCache(+1)
self._cache_can_prepend = EnsembleCache(-1)
self._cache_strict_can_prepend = EnsembleCache(-1)
self._cache_check_reverse = EnsembleCache(-1)
self._zero_traj = paths.Trajectory([])
# sanity checks
if len(self.min_overlap) != len(self.max_overlap):
raise ValueError("len(min_overlap) != len(max_overlap)")
if len(self.min_overlap) != len(self.ensembles) - 1:
raise ValueError(
"Number of overlaps doesn't match number of transitions")
for i in range(len(self.min_overlap)):
if min_overlap[i] > max_overlap[i]:
raise ValueError("min_overlap greater than max_overlap!")
@staticmethod
def update_cache(cache, ens_num, ens_from, subtraj_from):
"""Updates the given cache.
Parameters
----------
cache : `EnsembleCache`
the cache to be updated
ens_num : integer
current value of `ens_num` in the sequential ensemble
ens_from : integer
current "start" ensemble index. For forward-direction caches,
this is ens_first. For reverse-direction caches, this is
ens_final. The "initial" (in the appropriate direction) frame is
assigned to this ensemble
subtraj_from : integer
index of the "start" frame of the subtrajectory in this
subensemble. For forward-direction caches, this is the first
frame of the subtrajectory. For reverse-direction caches, this
is the final frame of the subtrajectory.
"""
if ens_num == "keep":
ens_num = cache.contents['ens_num']
if ens_from == "keep":
ens_from = cache.contents['ens_from']
if subtraj_from == "keep":
subtraj_from = cache.contents['subtraj_from']
cache.contents['ens_num'] = ens_num
cache.contents['ens_from'] = ens_from
cache.contents['subtraj_from'] = subtraj_from
logger.debug("Setting cache | ens_num " + str(ens_num) +
" | ens_from " + str(ens_from) +
" | subtraj_from " + str(subtraj_from))
logger.debug("Cache is Trusted: " + str(cache.trusted))
@staticmethod
def assign_frames(cache, ens_num,
subtraj_first=None, subtraj_final=None):
if ens_num is None:
cache.contents['assignments'] = {}
else:
cache.contents['assignments'][ens_num] = \
slice(subtraj_first, subtraj_final)
logger.debug("Cache assignments: " + str(cache.contents['assignments']))
def transition_frames(self, trajectory, trusted=None):
# it is easiest to understand this decision tree as a simplified
# version of the can_append decision tree; see that for detailed
# comments
# self._check_cache(trajectory, function="call")
ens_num = 0
subtraj_first = 0
traj_final = len(trajectory)
final_ens = len(self.ensembles) - 1
transitions = []
while True:
if ens_num <= final_ens:
subtraj_final = self._find_subtraj_final(trajectory,
subtraj_first, ens_num)
else:
return transitions
if subtraj_final - subtraj_first > 0:
# subtraj = trajectory[slice(subtraj_first, subtraj_final)]
if ens_num == final_ens:
if subtraj_final == traj_final:
# success
transitions.append(subtraj_final)
return transitions
else:
# fails because we have more frames to assign
transitions.append(subtraj_final)
return transitions
else:
ens_num += 1
transitions.append(subtraj_final)
subtraj_first = subtraj_final
else:
if ens_num <= final_ens and \
self.ensembles[ens_num](self._zero_traj):
ens_num += 1
transitions.append(subtraj_final)
subtraj_first = subtraj_final
else:
return transitions
def __call__(self, trajectory, trusted=None, candidate=False):
logger.debug("Looking for transitions in trajectory " + str(trajectory))
transitions = self.transition_frames(trajectory, trusted)
logger.debug("Found transitions: " + str(transitions))
# if we don't have the right number of transitions, or if the last
# print transitions
if len(transitions) != len(self.ensembles):
# print "Returns false b/c not enough ensembles"
return False
elif transitions[-1] != len(trajectory):
# print "Returns false b/c not all frames assigned"
return False
subtraj_first = 0
subtraj_i = 0
# Make a list before slicing
ltraj = _get_list_traj(trajectory)
while subtraj_i < len(self.ensembles):
subtraj_final = transitions[subtraj_i]
subtraj = ltraj[slice(subtraj_first, subtraj_final)]
if not self.ensembles[subtraj_i](subtraj):
# print "Returns false b/c ensemble", subtraj_i," fails"
return False
subtraj_i += 1
subtraj_first = subtraj_final
return True
def _find_subtraj_final(self, traj, subtraj_first, ens_num,
last_checked=None):
"""
Find the longest subtrajectory of trajectory which starts at
subtraj_first and satifies self.ensembles[ens_num].can_append
Returns
-------
int
Frame of traj which is the final frame for a subtraj starting at
subtraj_first and satisfying self.ensembles.can_append[ens_num]
"""
if last_checked is None:
subtraj_final = subtraj_first
else:
subtraj_final = max(last_checked, subtraj_first)
traj_final = len(traj)
ens = self.ensembles[ens_num]
# Make a list before slicing
ltraj = _get_list_traj(traj)
subtraj = ltraj[slice(subtraj_first, subtraj_final + 1)]
# if we're in the ensemble or could eventually be in the ensemble,
# we keep building the subtrajectory
# TODO: this doesn't actually reflect the cleanest behavior: should
# be the proper hybrid definition where we can append until/unless
# we overshoot
logger.debug("*Traj slice " + str(subtraj_first) + " " +
str(subtraj_final + 1) + " / " + str(traj_final))
# logger.debug("Ensemble " + str(ens.__class__.__name__))# + str(ens))
# logger.debug("Can-app " + str(ens.can_append(subtraj, trusted=True)))
# logger.debug("Call " + str(ens(subtraj, trusted=True)))
# TODO: the weird while condition is handling the OVERSHOOTING
while ((ens.can_append(subtraj, trusted=True) or
ens(subtraj, trusted=True)) and subtraj_final < traj_final):
subtraj_final += 1
subtraj = ltraj[slice(subtraj_first, subtraj_final + 1)]
logger.debug(" Traj slice " + str(subtraj_first) + " " +
str(subtraj_final + 1) + " / " + str(traj_final))
return subtraj_final
def _find_subtraj_first(self, traj, subtraj_final, ens_num,
last_checked=None):
if last_checked is None:
subtraj_first = subtraj_final - 1
else:
subtraj_first = min(last_checked, subtraj_final - 1)
traj_first = 0
ens = self.ensembles[ens_num]
# Make a list before slicing
ltraj = _get_list_traj(traj)
subtraj = ltraj[slice(subtraj_first, subtraj_final)]
logger.debug("*Traj slice " + str(subtraj_first) + " " +
str(subtraj_final) + " / " + str(len(traj)))
# logger.debug("Ensemble " + str(ens.__class__.__name__))# + str(ens))
# logger.debug("Can-app " + str(ens.can_prepend(subtraj, trusted=True)))
# logger.debug("Call " + str(ens(subtraj, trusted=True)))
# TODO: the weird while condition is handling the OVERSHOOTING
while ((ens.can_prepend(subtraj, trusted=True) or
ens.check_reverse(subtraj, trusted=True)
) and subtraj_first >= traj_first):
subtraj_first -= 1
subtraj = ltraj[slice(subtraj_first, subtraj_final)]
logger.debug(" Traj slice " + str(subtraj_first + 1) + " " +
str(subtraj_final) + " / " + str(len(traj)))
return subtraj_first + 1
def _generic_can_append(self, trajectory, trusted, strict):
# treat this like we're implementing a regular expression parser ...
# .*ensemble.+ ; but we have to do this for all possible matches
# There are three tests we consider:
# 1. subtraj_final - subtraj_first > 0: Do we obtain a subtrajectory?
# 2. subtraj_final == traj_final: Have we assigned all the frames?
# 3. ens_num == final_ens: are we looking at the last ensemble
# Various combinations of these result in three possible outcomes:
# (a) return True (we can append)
# (b) return False (we can't append)
# (c) loop around to text another subtrajectory (we can't tell)
# Returning false can only happen if all ensembles have been tested
# self._check_cache(trajectory, function="can_append")
cache = self._cache_can_append
if strict:
cache = self._cache_strict_can_append
if trusted:
cache.trusted = True
subtraj_first = 0
ens_num = 0
ens_first = 0
if self._use_cache:
_ = cache.check(trajectory)
if cache.contents == {}:
self.update_cache(cache, 0, 0, 0)
self.assign_frames(cache, None)
else:
subtraj_first = cache.contents['subtraj_from']
ens_num = cache.contents['ens_num']
ens_first = cache.contents['ens_from']
traj_final = len(trajectory)
final_ens = len(self.ensembles) - 1
# Make a list before slicing
ltraj = _get_list_traj(trajectory)
# print traj_final, final_ens
# logging startup
if cache.debug_enabled: # pragma: no cover
logger.debug(
"Beginning can_append with subtraj_first="
+ str(subtraj_first) + "; ens_first=" + str(ens_first)
+ "; ens_num=" + str(ens_num)
+ "; strict=" + str(strict)
)
logger.debug(
"Can-append sees a trusted cache: " + str(cache.trusted)
)
if cache.trusted:
logger.debug("Cache contents: " + str(cache.contents))
logger.debug("cache.prev_last_frame: " +
str(trajectory.index(cache.prev_last_frame)))
for i in range(len(self.ensembles)):
ens = self.ensembles[i]
logger.debug("Ensemble " + str(i) + " : "
+ ens.__class__.__name__)
while True: # main loop, with various exits
if self._use_cache and cache.trusted:
# TODO: trajectory.index is expensive... how to speed up?
# offset = 1
offset = 0
# if cache.last_length == len(trajectory):
# offset += 1
try:
last_checked_index = cache.prev_last_index - offset
except: # on any exception
# TODO: ideally, this won't be covered by tests, and can
# eventually be removed (along with try/except)
last_checked_index = \
trajectory.index(cache.prev_last_frame) - offset
#last_checked = trajectory.index(cache.prev_last_frame) - offset
last_checked = last_checked_index
else:
last_checked_index = None
last_checked = None
if cache.debug_enabled:
logger.debug("last_checked = " + str(last_checked))
subtraj_final = self._find_subtraj_final(
trajectory, subtraj_first, ens_num, last_checked
)
cache.last_length = subtraj_final
if cache.debug_enabled:
logger.debug(
"Subtraj for ens " + str(ens_num) + " : " +
"(" + str(subtraj_first) + "," + str(subtraj_final) + ")"
)
if subtraj_final - subtraj_first > 0:
subtraj = ltraj[slice(subtraj_first, subtraj_final)]
if ens_num == final_ens:
if subtraj_final == traj_final:
# we're in the last ensemble and the whole
# trajectory is assigned: can we append?
ens = self.ensembles[ens_num]
if cache.debug_enabled:
logger.debug("Returning can_append for " +
str(ens.__class__.__name__))
self.update_cache(cache, ens_num,
ens_first, subtraj_first)
return ens.can_append(subtraj, trusted=True)
else:
logger.debug(
"Returning false due to incomplete assigns: " +
str(subtraj_final) + "!=" + str(traj_final)
)
return False # in final ensemble, not all assigned
else:
# subtraj existed, but not yet final ensemble
# so we start with the next ensemble
end_traj = (subtraj_final == traj_final)
ensemble = self.ensembles[ens_num]
if not end_traj and not ensemble(subtraj,
trusted=cache.trusted):
logger.debug(
"Couldn't assign frames " + str(subtraj_first) +
" through " + str(subtraj_final) +
" to ensemble " + str(ens_num) + ": No match"
)
else:
logger.debug(
"Assigning frames " + str(subtraj_first) +
" through " + str(subtraj_final) +
" to ensemble " + str(ens_num)
)
self.assign_frames(cache, ens_num, subtraj_first,
subtraj_final)
self.update_cache(cache, ens_num, ens_first,
subtraj_first)
ens_num += 1
subtraj_first = subtraj_final
logger.debug("Moving to the next ensemble " + str(ens_num))
else: # no subtrajectory found
if subtraj_final == traj_final:
# all frames assigned, but not all ensembles finished;
# next frame might satisfy next ensemble
if self._use_cache:
prev_slice = cache.contents['assignments'][ens_num - 1]
prev_subtraj = ltraj[prev_slice]
prev_ens = self.ensembles[ens_num - 1]
if prev_ens.can_append(prev_subtraj, trusted=True):
logger.debug(
"Premature promotion: returning to ensemble " +
str(ens_num - 1)
)
ens_num -= 1
subtraj_first = "keep"
self.update_cache(cache, ens_num, ens_first,
subtraj_first)
logger.debug(
"All frames assigned, more ensembles to go: "
"returning True")
return True
elif self.ensembles[ens_num](self._zero_traj):
logger.debug(
"Moving on because of allowed zero-length ensemble")
ens_num += 1
subtraj_first = subtraj_final
self.update_cache(cache, ens_num, ens_first, subtraj_first)
else:
# not all frames assigned, couldn't find a sequence
# start over with sequences that begin with the next
# ensemble
if ens_first == final_ens:
logger.debug(
"Started with the last ensemble, got nothin'")
return False
elif strict is False:
logger.debug(
"Reassigning all frames, starting with ensemble " +
str(ens_first)
)
ens_first += 1
ens_num = ens_first
subtraj_first = 0
self.update_cache(cache, ens_num, ens_first,
subtraj_first)
else:
logger.debug(
"First ensemble fails and strict -- return false"
)
return False
def can_append(self, trajectory, trusted=False):
return self._generic_can_append(trajectory, trusted, strict=False)
def strict_can_append(self, trajectory, trusted=False):
return self._generic_can_append(trajectory, trusted, strict=True)
def _generic_can_prepend(self, trajectory, trusted, strict):
# based on .can_append(); see notes there for algorithm details
cache = self._cache_can_prepend
if strict:
cache = self._cache_strict_can_prepend
if trusted:
cache.trusted = True
traj_first = 0
first_ens = 0
subtraj_final = len(trajectory)
ens_final = len(self.ensembles) - 1
ens_num = ens_final
# Make list before slicing
ltraj = _get_list_traj(trajectory)
if self._use_cache:
_ = cache.check(trajectory)
if cache.contents == {}:
self.update_cache(cache, ens_num, first_ens, subtraj_final)
self.assign_frames(cache, None)
else:
logger.debug(
"len(traj)=" + str(len(trajectory))
+ "cache_from=" + str(cache.contents['subtraj_from'])
)
subtraj_from = cache.contents['subtraj_from']
if subtraj_from is None:
subtraj_from = 0
subtraj_final = len(trajectory) + subtraj_from
ens_num = cache.contents['ens_num']
ens_final = cache.contents['ens_from']
# logging startup
if logger.isEnabledFor(logging.DEBUG): # pragma: no cover
logger.debug(
"Beginning can_prepend with ens_num:" + str(ens_num)
+ " ens_final:" + str(ens_final) + " subtraj_final "
+ str(subtraj_final) + "; strict=" + str(strict)
)
if cache.trusted:
logger.debug("Cache contents: " + str(cache.contents))
logger.debug("cache.prev_start_frame: " +
str(trajectory.index(cache.start_frame)))
for i in range(len(self.ensembles)):
logger.debug(
"Ensemble " + str(i) +
" : " + self.ensembles[i].__class__.__name__
)
while True:
if self._use_cache and cache.trusted:
# offset = 1
offset = 0
try:
last_checked_index = cache.prev_last_index + offset
except: # on any exception
# TODO: ideally, this won't be covered by tests, and can
# eventually be removed (along with try/except)
last_checked_index = \
trajectory.index(cache.prev_last_frame) + offset
last_checked = trajectory.index(cache.prev_last_frame) + offset
else:
last_checked_index = None
last_checked = None
subtraj_first = self._find_subtraj_first(
trajectory, subtraj_final, ens_num, last_checked)
cache.last_length = len(trajectory) - subtraj_first
assign_final = subtraj_final - len(trajectory)
if assign_final == 0:
assign_final = None
logger.debug(
str(ens_num) + " : " +
"(" + str(subtraj_first) + "," + str(subtraj_final) + ")"
)
if subtraj_final - subtraj_first > 0:
subtraj = ltraj[slice(subtraj_first, subtraj_final)]
if ens_num == first_ens:
if subtraj_first == traj_first:
logger.debug("Returning can_prepend")
self.update_cache(cache, ens_num, ens_final,
assign_final)
return self.ensembles[ens_num].can_prepend(subtraj,
trusted=True)
else:
logger.debug(
"Returning false due to incomplete assigns: " +
str(subtraj_first) + "!=" + str(traj_first)
)
return False
else:
if subtraj_first != traj_first and \
not self.ensembles[ens_num](
subtraj, trusted=True):
logger.debug(
"Couldn't assign frames " + str(subtraj_first) +
" through " + str(subtraj_final) +
" to ensemble " + str(ens_num) + ": No match"
)
else:
logger.debug(
"Assigning frames " + str(subtraj_first) +
" through " + str(subtraj_final) +
" to ensemble " + str(ens_num)
)
assign_first = subtraj_first - len(trajectory)
self.assign_frames(cache, ens_num, assign_first,
assign_final)
self.update_cache(cache, ens_num, ens_final,
assign_final)
ens_num -= 1
subtraj_final = subtraj_first
logger.debug("Moving to the next ensemble " + str(ens_num))
else:
if subtraj_first == traj_first:
if self._use_cache:
prev_slice = cache.contents['assignments'][ens_num + 1]
logger.debug("prev_slice " + str(prev_slice))
prev_subtraj = ltraj[prev_slice]
logger.debug("prev_subtraj " + str(prev_subtraj))
logger.debug("traj " + str(trajectory))
prev_ens = self.ensembles[ens_num + 1]
if prev_ens.can_prepend(prev_subtraj, trusted=True):
logger.debug(
"Premature promotion: returning to ensemble " +
str(ens_num + 1)
)
ens_num += 1
assign_final = "keep"
logger.debug("(first, final)" + str((subtraj_first,
subtraj_final)))
self.update_cache(cache, ens_num, ens_final,
assign_final)
logger.debug(
"All frames assigned, more ensembles to go: "
"returning True")
return True
elif self.ensembles[ens_num](self._zero_traj):
logger.debug(
"Moving on because of allowed zero-length ensemble")
ens_num -= 1
subtraj_final = subtraj_first
self.update_cache(cache, ens_num, ens_final, subtraj_final)
else:
if ens_final == first_ens:
logger.debug(
"Started with the last ensemble, got nothin'")
return False
elif strict is False:
logger.debug(
"Reassigning all frames, starting with ensemble " +
str(ens_final)
)
ens_final -= 1
ens_num = ens_final
subtraj_final = len(trajectory)
self.update_cache(cache, ens_num, ens_final,
subtraj_final)
else:
logger.debug(
"First ensemble fails and strict -- return false"
)
return False
def can_prepend(self, trajectory, trusted=False):
return self._generic_can_prepend(trajectory, trusted, strict=False)
def strict_can_prepend(self, trajectory, trusted=False):
return self._generic_can_prepend(trajectory, trusted, strict=True)
def _str(self):
head = "[\n"
tail = "\n]"
sequence_str = ",\n".join([str(ens) for ens in self.ensembles])
return head + sequence_str + tail
[docs]
class LengthEnsemble(Ensemble):
"""
The ensemble of trajectories of a given length
"""
[docs]
def __init__(self, length):
"""
A path ensemble that describes path of a specific length
Parameters
----------
length : int or slice
The specific length (int) or the range of allowed trajectory
lengths (slice)
"""
#TODO: remove support for slice?
super(LengthEnsemble, self).__init__()
self.length = length
def __call__(self, trajectory, trusted=None, candidate=False):
length = len(trajectory)
if type(self.length) is int:
return length == self.length
else:
return length >= self.length.start and (
self.length.stop is None or length < self.length.stop)
def can_append(self, trajectory, trusted=False):
length = len(trajectory)
if type(self.length) is int:
return_value = (length < self.length)
logger.debug("LengthEnsemble.can_append: Segment length " +
str(length) + " < " + str(self.length) + " : " +
str(return_value))
return return_value
else:
return self.length.stop is None or length < self.length.stop - 1
def can_prepend(self, trajectory, trusted=False):
return self.can_append(trajectory)
def _str(self):
if type(self.length) is int:
return 'len(x) = {0}'.format(self.length)
else:
start = self.length.start
if start is None:
start = 0
stop = self.length.stop
if stop is None:
stop = 'infty'
else:
stop = str(self.length.stop - 1)
return 'len(x) in [{0}, {1}]'.format(start, stop)
[docs]
class VolumeEnsemble(Ensemble):
"""
Path ensembles based on the Volume object
"""
[docs]
def __init__(self, volume, trusted=True):
# TODO: does `trusted` actually mean anything or do anything as a
# property? it is about the condition of trusting the trajectory
# when we run it, so it relevant in functions. I don't think we need
# it here. ~DWHS
super(VolumeEnsemble, self).__init__()
self.volume = volume
self.trusted = trusted
self._use_cache = True
self._cache_can_append = EnsembleCache(+1)
self._cache_call = EnsembleCache(+1)
self._cache_can_prepend = EnsembleCache(-1)
self._cache_check_reverse = EnsembleCache(-1)
@property
def _volume(self):
"""
The volume that is used in the specification.
"""
return self.volume
[docs]
class AllInXEnsemble(VolumeEnsemble):
"""
Ensemble of trajectories with all frames in the given volume
"""
def _trusted_call(self, trajectory, cache):
"""
Generalized version of the call when trusted.
This uses a cache, which has the result for the previous trajectory
(`trajectory[:-1]` if forward, `trajectory[1:]` if backward) in the
`cache.contents['previous']`.
Paramters
---------
trajectory : paths.Trajectory
input trajectory to test
cache : paths.EnsembleCache
ensemble cache for this function
Returns
-------
bool :
result of __call__
"""
frame_num = -(cache.direction + 1) // 2 # 1 -> -1; -1 -> 0
reset = cache.check(trajectory)
if reset:
if len(trajectory) < 2:
cache.contents['previous'] = None
else:
# NOTE: is it possible that we'd reset a cache more than
# once in a single trajectory? that could mean that this
# starts to scale quadratically. I can't think of a case
# where this is a practical concern (short-circuit logic
# means the recache should only happen once per trajectory
# for All*XEnsembles, and the call should only happen once
# per trajectory for Part*XEnsembles.) In any case, the fix
# would be to implement a more complicated cache.reset,
# which checks whether the previous traj was a subtraj of
# this one (other than one frame less). ~~~DWHS
if frame_num == -1:
reset_value = self(trajectory[:-1], trusted=False)
elif frame_num == 0:
reset_value = self(trajectory[1:], trusted=False)
else: # pragma: no cover
raise RuntimeError("Bad value for frame_num: " +
str(frame_num))
cache.contents['previous'] = reset_value
cached_val = cache.contents['previous']
if cached_val or cached_val is None:
# need to check this frame (no prev traj, or prev traj is True)
# This sometimes gets a list instead of a full Trajectory
get_frame = getattr(trajectory, "get_as_proxy",
trajectory.__getitem__)
frame = get_frame(frame_num)
cache.contents['previous'] = self._volume(frame)
return cache.contents['previous']
else:
# cached_val is false, result must be false
return False
def can_append(self, trajectory, trusted=False):
if len(trajectory) == 0:
return True
elif trusted and self._use_cache:
return self._trusted_call(trajectory, self._cache_can_append)
else:
return self(trajectory)
def can_prepend(self, trajectory, trusted=False):
if len(trajectory) == 0:
return True
if trusted and self._use_cache:
return self._trusted_call(trajectory, self._cache_can_prepend)
else:
return self(trajectory)
def __call__(self, trajectory, trusted=None, candidate=False):
if len(trajectory) == 0:
return False
# TODO: We might be able to speed this up based on can_append
# being the same as call for this ensemble. Something like check
# the can_append cache instead of/as well as the call cache. May
# still have problems with overshooting -- but this might provide a
# speed-up in sequential ensemble's checking phase. ~~~DWHS
if trusted and self._use_cache:
return self._trusted_call(trajectory, self._cache_call)
else:
logger.debug("Untrusted VolumeEnsemble " + repr(self))
# logger.debug("Trajectory " + repr(trajectory))
# This can sometimes get a list instead of a Trajectory
# Make sure this is a proxy list
for frame in _get_list_traj(trajectory):
if not self._volume(frame):
return False
return True
def check_reverse(self, trajectory, trusted=False):
# order in this one only matters if it is trusted
if trusted and self._use_cache:
# print "Rev Trusted"
return self._trusted_call(trajectory, self._cache_check_reverse)
# frame = trajectory.get_as_proxy(0)
# return self._volume(frame)
else:
# print "Rev UnTrusted"
return self(trajectory) # in this case, order wouldn't matter
def __invert__(self):
return PartOutXEnsemble(self.volume, self.trusted)
def _str(self):
return 'x[t] in {0} for all t'.format(self._volume)
[docs]
class AllOutXEnsemble(AllInXEnsemble):
"""
Ensemble of trajectories with all frames outside the given volume
"""
[docs]
def __init__(self, volume, trusted=True):
super(AllOutXEnsemble, self).__init__(volume, trusted)
self._cached_volume = ~self.volume
@property
def _volume(self):
return self._cached_volume
def _str(self):
return 'x[t] in {0} for all t'.format(self._volume)
def __invert__(self):
return PartInXEnsemble(self.volume, self.trusted)
[docs]
class PartInXEnsemble(VolumeEnsemble):
"""
Ensemble of trajectory with at least one frame in the volume
"""
def _str(self):
return 'exists t such that x[t] in {0}'.format(self._volume)
def __call__(self, trajectory, trusted=None, candidate=False):
"""
Returns True if the trajectory is part of the PathEnsemble
Parameters
----------
trajectory : :class:`openpathsampling.trajectory.Trajectory`
The trajectory to be checked
"""
for frame in _get_list_traj(trajectory):
if self._volume(frame):
return True
return False
def __invert__(self):
return AllOutXEnsemble(self.volume, self.trusted)
[docs]
class PartOutXEnsemble(PartInXEnsemble):
"""
Ensemble of trajectories with at least one frame outside the volume
"""
[docs]
def __init__(self, volume, trusted=True):
super(PartOutXEnsemble, self).__init__(volume, trusted)
self._cached_volume = ~self.volume
@property
def _volume(self):
return self._cached_volume
def _str(self):
return 'exists t such that x[t] in {0}'.format(self._volume)
def __invert__(self):
return AllInXEnsemble(self.volume, self.trusted)
def __call__(self, trajectory, trusted=None, candidate=False):
# Don't load proxies if this is a Trajectory
for frame in _get_list_traj(trajectory):
if self._volume(frame):
return True
return False
[docs]
class WrappedEnsemble(Ensemble):
"""
Wraps an ensemble to alter it or the way it sees a trajectory
"""
[docs]
def __init__(self, ensemble):
super(WrappedEnsemble, self).__init__()
self.ensemble = ensemble
# you can also build wrapped ensembles with more flexibility when using
# a property for _new_ensemble
self._new_ensemble = self.ensemble
self.trusted = None
self._cache_can_append = EnsembleCache(+1)
self._cache_strict_can_append = EnsembleCache(+1)
self._cache_call = EnsembleCache(+1)
# cache_can_prepend has to think it is going forward because the
# frames given to it are from a forward growing trajectory... only
# later is everything turned around
self._cache_can_prepend = EnsembleCache(+1)
self._cache_strict_can_prepend = EnsembleCache(+1)
def __call__(self, trajectory, trusted=None, candidate=False):
return self._new_ensemble(self._alter(trajectory), trusted)
def _alter(self, trajectory):
return trajectory
def can_append(self, trajectory, trusted=None):
return self._new_ensemble.can_append(self._alter(trajectory),
trusted)
def can_prepend(self, trajectory, trusted=None):
return self._new_ensemble.can_prepend(self._alter(trajectory),
trusted)
def strict_can_append(self, trajectory, trusted=None):
return self._new_ensemble.strict_can_append(self._alter(trajectory),
trusted)
def strict_can_prepend(self, trajectory, trusted=None):
return self._new_ensemble.strict_can_prepend(self._alter(trajectory),
trusted)
def _str(self):
return str(self._new_ensemble)
class SlicedTrajectoryEnsemble(WrappedEnsemble):
"""
Alters trajectories given as arguments by taking Python slices.
"""
def __init__(self, ensemble, region):
super(SlicedTrajectoryEnsemble, self).__init__(ensemble)
if type(region) == int:
if region == -1:
self.region = slice(region, None)
else:
self.region = slice(region, region + 1)
else:
self.region = region
def _alter(self, trajectory):
return trajectory[self.region]
def _str(self):
# TODO: someday may add different string support for slices with
# only one frame
start = "" if self.region.start is None else str(self.region.start)
stop = "" if self.region.stop is None else str(self.region.stop)
step = "" if self.region.step is None else " every " + str(
self.region.step)
return ("(" + str(self.ensemble) +
" in {" + start + ":" + stop + "}" + step + ")")
[docs]
class SuffixTrajectoryEnsemble(WrappedEnsemble):
"""
Ensemble which prepends its trajectory to a given trajectory.
Used in backward shooting.
"""
[docs]
def __init__(self, ensemble, add_trajectory):
super(SuffixTrajectoryEnsemble, self).__init__(ensemble)
self.add_trajectory = add_trajectory
self._cached_trajectory = paths.Trajectory(add_trajectory.as_proxies())
def _alter(self, trajectory):
logger.debug("Starting Suffix._alter")
# logger.debug(
# "altered " + str([id(i) for i in self._cached_trajectory]))
reset = self._cache_can_prepend.check(trajectory)
# logger.debug(
# "altered " + str([id(i) for i in self._cached_trajectory]))
# logger.debug("traj " + str([id(i) for i in trajectory]))
# logger.debug("trajrev " + str([id(i) for i in trajectory.reversed]))
# reset = False
if not reset:
logger.debug("SuffixTrajectory was not reset")
first_frame = trajectory.get_as_proxy(-1)
if self._cached_trajectory.get_as_proxy(0) != first_frame:
self._cached_trajectory.insert(0, first_frame)
else:
self._cached_trajectory = trajectory.reversed + self.add_trajectory
# logger.debug("revtraj " + str([id(i) for i in revtraj]))
# logger.debug("add " + str([id(i) for i in self.add_trajectory]))
# logger.debug(
# "altered " + str([id(i) for i in self._cached_trajectory]))
return self._cached_trajectory
def can_append(self, trajectory, trusted=None):
raise RuntimeError("SuffixTrajectoryEnsemble.can_append is nonsense.")
def strict_can_append(self, trajectory, trusted=None):
# was overridden in WrappedEnsemble: here should raise same error as
# can_append does
return self.can_append(trajectory, trusted)
[docs]
class PrefixTrajectoryEnsemble(WrappedEnsemble):
"""
Ensemble which appends its trajectory to a given trajectory.
Used in forward shooting.
"""
[docs]
def __init__(self, ensemble, add_trajectory):
super(PrefixTrajectoryEnsemble, self).__init__(ensemble)
self.add_trajectory = add_trajectory
self._cached_trajectory = paths.Trajectory(add_trajectory.as_proxies())
def _alter(self, trajectory):
logger.debug("Starting _alter")
reset = self._cache_can_append.check(trajectory)
if not reset:
final_frame = trajectory.get_as_proxy(-1)
if self._cached_trajectory.get_as_proxy(-1) != final_frame:
self._cached_trajectory.append(final_frame)
else:
logger.debug("doing it oldstyle")
self._cached_trajectory = self.add_trajectory + trajectory
# DEBUG
# logger.debug("add " + str([i for i in self.add_trajectory]))
# logger.debug("traj " + str([i for i in trajectory]))
# logger.debug("cache " + str([i for i in self._cached_trajectory]))
# oldstyle = self.add_trajectory + trajectory
# for (t,b) in zip(self._cached_trajectory,
# self.add_trajectory+trajectory):
# logger.debug(str(t) + " ?=? " + str(b))
# assert(t == b)
# assert(len(self._cached_trajectory) == len(oldstyle))
return self._cached_trajectory
def can_prepend(self, trajectory, trusted=None):
raise RuntimeError("PrefixTrajectoryEnsemble.can_prepend is nonsense.")
def strict_can_prepend(self, trajectory, trusted=None):
# was overridden in WrappedEnsemble: here should raise same error as
# can_append does
return self.can_prepend(trajectory, trusted)
[docs]
class ReversedTrajectoryEnsemble(WrappedEnsemble):
"""
Ensemble based on reversing the trajectory.
"""
def _alter(self, trajectory):
return trajectory.reverse()
class AppendedNameEnsemble(WrappedEnsemble):
"""
Add string to ensemble name: allows multiple copies of an ensemble.
"""
def __init__(self, ensemble, label):
self.label = label
super(AppendedNameEnsemble, self).__init__(ensemble)
def _str(self):
return str(self.ensemble) + " " + self.label
[docs]
class OptionalEnsemble(WrappedEnsemble):
"""
An ensemble which is optional for SequentialEnsembles.
"""
[docs]
def __init__(self, ensemble):
super(OptionalEnsemble, self).__init__(ensemble)
self._new_ensemble = LengthEnsemble(0) | self.ensemble
def _str(self):
return "{" + str(self.ensemble) + "} (OPTIONAL)"
[docs]
class SingleFrameEnsemble(WrappedEnsemble):
"""
Convenience ensemble to `and` a LengthEnsemble(1) with a given ensemble.
Frequently used for SequentialEnsembles.
Attributes
----------
ensemble : :class:`openpathsampling.ensemble.Ensemble`
the ensemble which should be represented in the single frame
Notes
-----
We allow the user to choose to be stupid: if, for example, the user
tries to make a SingleFrameEnsemble from an ensemble which requires
more than one frame to be satisfied (e.g., a SequentialEnsemble with
more than one subensemble), it can be created, but no path will ever
satisfy it. Since we can't stop all possible mistakes, we don't bother
here.
"""
[docs]
def __init__(self, ensemble):
super(SingleFrameEnsemble, self).__init__(ensemble)
self._new_ensemble = LengthEnsemble(1) & self.ensemble
def _str(self):
return "{" + str(self.ensemble) + "} (SINGLE FRAME)"
[docs]
class MinusInterfaceEnsemble(WrappedEnsemble):
"""
This creates an ensemble for the minus interface.
The specific implementation allows us to use the multiple-segment minus
ensemble described by Swenson and Bolhuis. The minus interface was
originally developed by van Erp. For more details, see the section
"Anatomy of a PathMover: the Minus Move" in the OpenPathSampling
Documentation.
Parameters
----------
state_vol : :class:`.Volume`
The Volume which defines the state for this minus interface
innermost_vols : list of :class:`.Volume`
The Volume defining the innermost interface with which this minus
interface does its replica exchange.
n_l : integer (greater than one)
The number of segments crossing innermost_vol for this interface.
References
----------
T.S. van Erp. Phys. Rev. Lett.
D.W.H. Swenson and P.G. Bolhuis. J. Chem. Phys. 141, 044101 (2014).
doi:10.1063/1.4890037
"""
# don't store unnecessary stuff we recreate at initialization
# TODO: Check with David if it makes sense to store these and allow
# them being used in __init__ instead of the self-made ones
[docs]
def __init__(self, state_vol, innermost_vols, n_l=2, forbidden=None,
greedy=False):
if n_l < 2:
raise ValueError("The number of segments n_l must be at least 2")
self.state_vol = state_vol
try:
innermost_vols = list(innermost_vols)
except TypeError:
innermost_vols = [innermost_vols]
if forbidden is None:
forbidden = [paths.EmptyVolume()]
else:
try:
forbidden = list(forbidden)
except TypeError:
forbidden = [forbidden]
self.forbidden = forbidden
forbidden_volume = paths.join_volumes(forbidden)
forbidden_ensemble = paths.AllOutXEnsemble(forbidden_volume)
self.innermost_vols = innermost_vols
self.innermost_vol = paths.FullVolume()
for vol in self.innermost_vols:
self.innermost_vol = self.innermost_vol & vol
self.greedy = greedy
in_A = AllInXEnsemble(state_vol)
out_A = AllOutXEnsemble(state_vol)
in_X = AllInXEnsemble(self.innermost_vol)
leave_X = PartOutXEnsemble(self.innermost_vol)
# interstitial = out_A & in_X
interstitial = self.innermost_vol - state_vol
in_interstitial = AllInXEnsemble(interstitial)
segment_ensembles = [paths.TISEnsemble(state_vol, state_vol, inner)
for inner in self.innermost_vols]
self._segment_ensemble = join_ensembles(segment_ensembles)
# interstitial = AllInXEnsemble(self.innermost_vol - state_vol)
start = [
SingleFrameEnsemble(in_A),
OptionalEnsemble(in_interstitial),
]
loop = [
out_A, # & leave_X, # redundant b/c next stop for previous
in_X # & hitA # redundant due to stop req for previous outA
]
end = [
out_A, # & leave_X,
OptionalEnsemble(in_interstitial),
SingleFrameEnsemble(in_A)
]
sequence = start + loop * (n_l - 1) + end
ensemble = paths.SequentialEnsemble(sequence) & forbidden_ensemble
self.n_l = n_l
super(MinusInterfaceEnsemble, self).__init__(ensemble)
def to_dict(self):
dct = super(MinusInterfaceEnsemble, self).to_dict()
dct['state_vol'] = self.state_vol
dct['innermost_vols'] = self.innermost_vols
dct['innermost_vol'] = self.innermost_vol
dct['_segment_ensemble'] = self._segment_ensemble
dct['forbidden'] = self.forbidden
dct['n_l'] = self.n_l
return dct
@property
def extendable_sub_ensembles(self):
# A-X-A and the one from TISEnsemble
state_vol = self.state_vol
sub_ensembles = {}
in_A = AllInXEnsemble(state_vol)
out_A = AllOutXEnsemble(state_vol)
# this code is for potential
# in_X = AllInXEnsemble(self.innermost_vol)
# leave_X = PartOutXEnsemble(self.innermost_vol)
# interstitial = out_A & in_X
# segment_ensembles = [paths.TISEnsemble(state_vol, state_vol, inner)
# for inner in self.innermost_vols]
# start = [
# SingleFrameEnsemble(in_A),
# OptionalEnsemble(interstitial),
# ]
# loop = [
# out_A & leave_X,
# in_X # & hitA # redundant due to stop req for previous outA
# ]
# end = [
# out_A & leave_X,
# OptionalEnsemble(interstitial),
# SingleFrameEnsemble(in_A)
# ]
# do not add higher orders, you would
# for n_l in range(self.n_l - 2, 0, -1):
# # add ens with less loops
# sub_ensembles.append(
# SequentialEnsemble(start + loop * n_l + end))
sub_ensembles['complex'] = self._segment_ensemble
# and the simplest possible just crossing from in_state to outside
sub_ensembles['minimal'] = \
LengthEnsemble(2) & \
SequentialEnsemble([
SingleFrameEnsemble(in_A),
SingleFrameEnsemble(out_A)
])
return sub_ensembles
# def populate_minus_ensemble(self, partial_traj, minus_replica_id, engine):
# """
# Generate a sample for the minus ensemble by extending `partial_traj`
#
# Parameters
# ----------
# partial_traj : :class:`openpathsampling.trajectory.Trajectory`
# trajectory to extend
# minus_replica_id : int or str
# replica ID for this sample
# engine : :class:`openpathsampling.dynamicsengine.DynamicsEngine`
# engine to use for MD extension
# """
# last_frame = partial_traj[-1]
# if not self._segment_ensemble(partial_traj):
# raise RuntimeError(
# "Invalid input trajectory for minus extension. (Not A-to-A?)"
# )
# fwd_extend_ens = PrefixTrajectoryEnsemble(self, partial_traj)
# extension = engine.generate(last_frame,
# [fwd_extend_ens.can_append])
# first_minus = paths.Trajectory(partial_traj + extension[1:])
# assert self(first_minus)
# minus_samp = paths.Sample(
# replica=minus_replica_id,
# trajectory=first_minus,
# ensemble=self
# )
# logger.info(first_minus.summarize_by_volumes_str(
# {"A": self.state_vol,
# "I": ~self.state_vol & self.innermost_vol,
# "X": ~self.innermost_vol})
# )
# return minus_samp
# def populate_minus_ensemble_from_set(self, samples, minus_replica_id,
# engine):
# """
# Generate a sample for this minus ensemble by extending trajectory.
#
# Parameters
# ----------
# samples : iterable of :class:`.Sample`
# samples with trajectories that might be extended
# minus_replica_id : int or str
# replica ID for the return sample
# engine : :class:`openpathsampling.dynamicsengine.DynamicsEngine`
# engine to use for MD extension
#
# Returns
# -------
# :class:`.Sample` :
# a sample for this minus ensemble
# """
# partials = [s.trajectory for s in samples
# if self._segment_ensemble(s.trajectory)]
# if len(partials) == 0:
# # TODO: add support for trying to run backwards
# raise RuntimeError("No trajectories can be extended")
#
# samp = None
#
# good_sample = False
# while not good_sample:
# partial_traj = partials[0]
# # I think it should be impossible to RuntimeError in this
# samp = self.populate_minus_ensemble(
# partial_traj=partial_traj,
# minus_replica_id=minus_replica_id,
# engine=engine
# )
#
# good_sample = samp.ensemble(samp.trajectory)
#
# return samp
[docs]
class TISEnsemble(WrappedEnsemble):
"""An ensemble for TIS (or AMS).
Begin in `initial_states`, end in either `initial_states` or
`final_states`, and cross `interface`.
Attributes
----------
initial_states : `openpathsampling.volume.Volume` or list of `openpathsampling.volume.Volume`
Volume(s) that only the first or last frame may be in
final_states : `openpathsampling.volume.Volume` or list of `openpathsampling.volume.Volume`
Volume(s) that only the last frame may be in
interface : `openpathsampling.volume.Volume`
Volume which the trajectory must exit to be accepted
orderparameter : `openpathsampling.collectivevariable.CollectiveVariable`
CV to be used as order parameter for this
"""
@property
def extendable_sub_ensembles(self):
# this is tricky. The only extendable sub-ensembles are (In, Out)
# at the crossing of leaving or entering the core
# pick only the initial ones like for A-X-AB pick A
states = list(set(self.initial_states))
volume = paths.volume.join_volumes(states)
return {
'minimal':
LengthEnsemble(2) &
SequentialEnsemble([
SingleFrameEnsemble(AllInXEnsemble(volume)),
SingleFrameEnsemble(AllOutXEnsemble(volume))
])
}
[docs]
def __init__(self, initial_states, final_states, interface,
orderparameter=None, cv_max=None, lambda_i=None):
# regularize to list of volumes
# without orderparameter, some info can't be obtained
try:
_ = len(initial_states)
except TypeError:
initial_states = [initial_states]
try:
_ = len(final_states)
except TypeError:
final_states = [final_states]
volume_a = paths.volume.join_volumes(initial_states)
volume_b = paths.volume.join_volumes(final_states)
ensemble = SequentialEnsemble([
AllInXEnsemble(volume_a) & LengthEnsemble(1),
OptionalEnsemble(AllOutXEnsemble(volume_a | volume_b)),
AllInXEnsemble(volume_a | volume_b) & LengthEnsemble(1)
]) & PartOutXEnsemble(interface)
super(TISEnsemble, self).__init__(ensemble)
self.initial_states = initial_states
self.final_states = final_states
self.interface = interface
# self.name = interface.name
self.orderparameter = orderparameter # TODO: is this used? remove?
self.cv_max = cv_max
self.lambda_i = lambda_i
self._initial_volumes = volume_a
self._final_volumes = volume_b | volume_a
def __call__(self, trajectory, trusted=None, candidate=False):
logger.debug("TIS ENSEMBLE: candidate={0}".format(str(candidate)))
use_candidate = (candidate and self.lambda_i is not None)
if use_candidate and self.cv_max is not None:
logger.debug("Using candidate shortcut with self.cv_max")
return (
self._initial_volumes(trajectory[0])
& self._final_volumes(trajectory[-1])
& (self.cv_max(trajectory) > self.lambda_i)
)
elif use_candidate and self.orderparameter is not None:
logger.debug("Using candidate shortcut with max(orderparameter)")
# as a candidate trajectory, we assume that only the first and
# final frames can be in a state
#logger.debug("initial: " +
#str(self._initial_volumes(trajectory[0])))
#logger.debug("final: " +
#str(self._final_volumes(trajectory[0])))
#logger.debug("max: " +
#str(max(self.orderparameter(trajectory))))
return (
self._initial_volumes(trajectory[0])
& self._final_volumes(trajectory[-1])
& (max(self.orderparameter(trajectory)) > self.lambda_i)
)
else:
logger.debug("No shortcut possible")
# it still works fine if we use the slower algorithm
return super(TISEnsemble, self).__call__(trajectory, trusted)
def trajectory_summary(self, trajectory):
initial_state_i = None
final_state_i = None
for state_i in range(len(self.initial_states)):
if self.initial_states[state_i](trajectory.get_as_proxy(0)):
initial_state_i = state_i
break
all_states = self.initial_states + self.final_states
for state_i in range(len(all_states)):
if all_states[state_i](trajectory.get_as_proxy(-1)):
final_state_i = state_i
break
if self.orderparameter is not None:
lambda_traj = self.orderparameter(trajectory)
min_lambda = min(lambda_traj)
max_lambda = max(lambda_traj)
else:
min_lambda = None
max_lambda = None
return {
'initial_state': initial_state_i,
'final_state': final_state_i,
'max_lambda': max_lambda,
'min_lambda': min_lambda
}
def trajectory_summary_str(self, trajectory):
summ = self.trajectory_summary(trajectory)
all_states = self.initial_states + self.final_states
# TODO: remove the .name from this when string returns correctly
init_st_i = summ['initial_state']
fin_st_i = summ['final_state']
# TODO: how can we have None?
if init_st_i is None:
init_st = "None"
else:
init_st = str(self.initial_states[summ['initial_state']].name)
if fin_st_i is None:
fin_st = "None"
else:
fin_st = str(all_states[summ['final_state']].name)
# if self.orderparameter is not None:
# opname = self.orderparameter.name
# else:
# opname = "None"
min_l = str(summ['min_lambda'])
max_l = str(summ['max_lambda'])
mystr = (
"initial_state=" + init_st + " " +
"final_state=" + fin_st + " " +
"min_lambda=" + min_l + " " +
"max_lambda=" + max_l + " "
)
return mystr
def _str(self):
return str(self.ensemble)
# class EnsembleFactory(object):
# """
# Convenience class to construct Ensembles
# """
# @staticmethod
# def StartXEnsemble(volume):
# """
# Construct an ensemble that starts (x[0]) in the specified volume
# Parameters
# ----------
# volume : :class:`openpathsampling.volume.Volume`
# The volume to start in
# Returns
# -------
# ensemble : :class:`openpathsampling.ensemble.Ensemble`
# The constructed Ensemble
# """
# return AllInXEnsemble(volume, 0)
# @staticmethod
# def EndXEnsemble(volume):
# """
# Construct an ensemble that ends (x[-1]) in the specified volume
# Parameters
# ----------
# volume : :class:`openpathsampling.volume.Volume`
# The volume to end in
# Returns
# -------
# ensemble : :class:`openpathsampling.ensemble.Ensemble`
# The constructed Ensemble
# """
# return AllInXEnsemble(volume, -1)
# @staticmethod
# def A2BEnsemble(volume_a, volume_b, trusted=True):
# """
# Construct an ensemble that starts in `volume_a`, ends in
# `volume_b` and is in either volumes in between
# Parameters
# ----------
# volume_a : :class:`openpathsampling.Volume`
# The volume to start in
# volume_b : :class:`openpathsampling.Volume`
# The volume to end in
# Returns
# -------
# ensemble : :class:`openpathsampling.Ensemble`
# The constructed Ensemble
# """
# # TODO: this is actually only for flexible path length TPS now
# return SequentialEnsemble([
# SingleFrameEnsemble(AllInXEnsemble(volume_a)),
# AllOutXEnsemble(volume_a | volume_b),
# SingleFrameEnsemble(AllInXEnsemble(volume_b))
# ])
# @staticmethod
# def TISEnsembleSet(volume_a, volume_b, volumes_x, orderparameter,
# lambdas=None):
# if lambdas is None:
# lambdas = [None] * len(volumes_x)
# myset = [paths.TISEnsemble(volume_a, volume_b, vol, orderparameter,
# lambda_i)
# for (vol, lambda_i) in zip(volumes_x, lambdas)]
# return myset