'''
Created on 03.09.2014
@author: Jan-Hendrik Prinz, David W.H. Swenson
'''
from . import range_logic
import abc
from openpathsampling.netcdfplus import StorableNamedObject
import numpy as np
import warnings
# TODO: Make Full and Empty be Singletons to avoid storing them several times!
def join_volumes(volume_list, name=None):
"""
Make the union of a list of volumes. (Useful shortcut.)
Parameters
----------
volume_list : list of :class:`openpathsampling.Volume`
the list to be joined together
name : str or callable
string for name, or callable that creates string for name from
``volume_list``
Returns
-------
:class:`openpathsampling.UnionVolume`
the union of the elements of the list, or EmptyVolume if list is
empty
"""
volume = EmptyVolume()
# EmptyVolume is smart and knows its OR just takes the other
for vol in volume_list:
volume = volume | vol
if name is not None:
try:
name_str = name(volume_list)
except TypeError:
name_str = name
volume = volume.named(name_str)
return volume
[docs]
class Volume(StorableNamedObject):
"""
A Volume describes a set of snapshots
"""
__metaclass__ = abc.ABCMeta
[docs]
def __init__(self):
super(Volume, self).__init__()
@abc.abstractmethod
def __call__(self, snapshot):
'''
Returns `True` if the given snapshot is part of the defined Region
'''
return False # pragma: no cover
def __str__(self):
'''
Returns a string representation of the volume
'''
return 'volume' # pragma: no cover
__hash__ = StorableNamedObject.__hash__
def __or__(self, other):
if self is other:
return self
elif type(other) is EmptyVolume:
return self
elif type(other) is FullVolume:
return other
else:
return UnionVolume(self, other)
def __xor__(self, other):
if self is other:
return EmptyVolume()
elif type(other) is EmptyVolume:
return self
elif type(other) is FullVolume:
return ~ self
else:
return SymmetricDifferenceVolume(self, other)
def __and__(self, other):
if self is other:
return self
elif type(other) is EmptyVolume:
return other
elif type(other) is FullVolume:
return self
else:
return IntersectionVolume(self, other)
def __sub__(self, other):
if self is other:
return EmptyVolume()
elif type(other) is EmptyVolume:
return self
elif type(other) is FullVolume:
return EmptyVolume()
else:
return RelativeComplementVolume(self, other)
def __invert__(self):
return NegatedVolume(self)
def __eq__(self, other):
return str(self) == str(other)
def __ne__(self, other):
return not self == other
[docs]
class VolumeCombination(Volume):
"""
Logical combination of volumes.
This should be treated as an abstract class. For storage purposes, use
specific subclasses in practice.
"""
[docs]
def __init__(self, volume1, volume2, fnc, str_fnc):
super(VolumeCombination, self).__init__()
self.volume1 = volume1
self.volume2 = volume2
self.fnc = fnc
self.sfnc = str_fnc
def __call__(self, snapshot):
# short circuit following JHP's implementation in ensemble.py
a = self.volume1(snapshot)
res_true = self.fnc(a, True)
res_false = self.fnc(a, False)
if res_false == res_true:
return res_true
else:
b = self.volume2(snapshot)
return self.fnc(a, b)
#return self.fnc(self.volume1.__call__(snapshot),
#self.volume2.__call__(snapshot))
def __str__(self):
return '(' + self.sfnc.format(str(self.volume1), str(self.volume2)) + ')'
def to_dict(self):
return {'volume1': self.volume1, 'volume2': self.volume2}
[docs]
class UnionVolume(VolumeCombination):
""" "Or" combination (union) of two volumes."""
[docs]
def __init__(self, volume1, volume2):
super(UnionVolume, self).__init__(
volume1=volume1,
volume2=volume2,
fnc=lambda a, b: a or b,
str_fnc='{0} or {1}'
)
[docs]
class IntersectionVolume(VolumeCombination):
""" "And" combination (intersection) of two volumes."""
[docs]
def __init__(self, volume1, volume2):
super(IntersectionVolume, self).__init__(
volume1=volume1,
volume2=volume2,
fnc=lambda a, b: a and b,
str_fnc='{0} and {1}'
)
[docs]
class SymmetricDifferenceVolume(VolumeCombination):
""" "Xor" combination of two volumes."""
[docs]
def __init__(self, volume1, volume2):
super(SymmetricDifferenceVolume, self).__init__(
volume1=volume1,
volume2=volume2,
fnc=lambda a, b: a ^ b,
str_fnc='{0} xor {1}'
)
[docs]
class RelativeComplementVolume(VolumeCombination):
""" "Subtraction" combination (relative complement) of two volumes."""
[docs]
def __init__(self, volume1, volume2):
super(RelativeComplementVolume, self).__init__(
volume1=volume1,
volume2=volume2,
fnc=lambda a, b: a and not b,
str_fnc='{0} and not {1}'
)
class NegatedVolume(Volume):
"""Negation (logical not) of a volume."""
def __init__(self, volume):
super(NegatedVolume, self).__init__()
self.volume = volume
def __call__(self, snapshot):
return not self.volume(snapshot)
def __str__(self):
return '(not ' + str(self.volume) + ')'
[docs]
class EmptyVolume(Volume):
"""Empty volume: no snapshot can satisfy"""
[docs]
def __init__(self):
super(EmptyVolume, self).__init__()
def __call__(self, snapshot):
return False
def __and__(self, other):
return self
def __or__(self, other):
return other
def __xor__(self, other):
return other
def __sub__(self, other):
return self
def __invert__(self):
return FullVolume()
def __str__(self):
return 'empty'
[docs]
class FullVolume(Volume):
"""Volume which all snapshots can satisfy."""
[docs]
def __init__(self):
super(FullVolume, self).__init__()
def __call__(self, snapshot):
return True
def __invert__(self):
return EmptyVolume()
def __and__(self, other):
return other
def __or__(self, other):
return self
def __xor__(self, other):
return ~ other
def __sub__(self, other):
return ~ other
def __str__(self):
return 'all'
[docs]
class CVDefinedVolume(Volume):
"""
Volume defined by a range of a collective variable `collectivevariable`.
Contains all snapshots `snap` for which `lamba_min <=
collectivevariable(snap)` and `lambda_max > collectivevariable(snap)`.
Parameters
----------
collectivevariable : :class:`.CollectiveVariable`
the CV to base the volume on
lambda_min : float
minimum value of the CV
lambda_max : float
maximum value of the CV
"""
[docs]
def __init__(self, collectivevariable, lambda_min=0.0, lambda_max=1.0):
super(CVDefinedVolume, self).__init__()
self.collectivevariable = collectivevariable
try:
self.lambda_min = lambda_min.__float__()
except AttributeError:
self.lambda_min = float(lambda_min)
try:
self.lambda_max = lambda_max.__float__()
except AttributeError:
self.lambda_max = float(lambda_max)
self._cv_returns_iterable = None # used to raise warnings
# Typically, the logical combinations are only done once. Because of
# this, it is worth passing these through a check to speed up the logic.
# To get all the usefulness of the range logic in a subclass, all you
# should need to override is _copy_with_new_range (so that it inits any
# extra info the subclass carries) and range_and/or/sub, so that they
# return the correct behavior for the new subclass. Everything else
# comes for free.
@property
def default_name(self):
return (str(self.lambda_min) + "<"
+ str(self.collectivevariable.name) + "<"
+ str(self.lambda_max))
def _copy_with_new_range(self, lmin, lmax):
"""Shortcut to make a CVDefinedVolume with all parameters the same as
this one except the range. This is useful for the range logic when
dealing with subclasses: just override this function to copy extra
information.
"""
return CVDefinedVolume(self.collectivevariable, lmin, lmax)
@staticmethod
def range_and(amin, amax, bmin, bmax):
return range_logic.range_and(amin, amax, bmin, bmax)
@staticmethod
def range_or(amin, amax, bmin, bmax):
return range_logic.range_or(amin, amax, bmin, bmax)
@staticmethod
def range_sub(amin, amax, bmin, bmax):
return range_logic.range_sub(amin, amax, bmin, bmax)
def _lrange_to_Volume(self, lrange):
"""Takes results from one of the range_logic functions and returns
the appropriate Volume.
Parameters
----------
lrange : None or 1 or list of 2-tuples
Key to the volume to be returned: None returns the EmptyVolume, 1
returns self, and a list of 2-tuples is __or__'d as (min,max) to
make a VolumeCombinations
Returns
-------
Volume
appriate volume according to lrange
Raises
------
ValueError
if the input lrange is not an allowed value
"""
if lrange is None:
return EmptyVolume()
elif lrange == 1:
return self
elif lrange == -1:
return FullVolume()
elif len(lrange) == 1:
return self._copy_with_new_range(lrange[0][0], lrange[0][1])
elif len(lrange) == 2:
return UnionVolume(
self._copy_with_new_range(lrange[0][0], lrange[0][1]),
self._copy_with_new_range(lrange[1][0], lrange[1][1])
)
else:
raise ValueError(
"lrange value not understood: {0}".format(lrange)
) # pragma: no cover
def __and__(self, other):
if (type(other) is type(self) and
self.collectivevariable == other.collectivevariable):
lminmax = self.range_and(self.lambda_min, self.lambda_max,
other.lambda_min, other.lambda_max)
return self._lrange_to_Volume(lminmax)
else:
return super(CVDefinedVolume, self).__and__(other)
def __or__(self, other):
if (type(other) is type(self) and
self.collectivevariable == other.collectivevariable):
lminmax = self.range_or(self.lambda_min, self.lambda_max,
other.lambda_min, other.lambda_max)
return self._lrange_to_Volume(lminmax)
else:
return super(CVDefinedVolume, self).__or__(other)
def __xor__(self, other):
if (type(other) is type(self) and
self.collectivevariable == other.collectivevariable):
# taking the shortcut here
return (self | other) - (self & other)
else:
return super(CVDefinedVolume, self).__xor__(other)
def __sub__(self, other):
if (type(other) is type(self) and
self.collectivevariable == other.collectivevariable):
lminmax = self.range_sub(self.lambda_min, self.lambda_max,
other.lambda_min, other.lambda_max)
return self._lrange_to_Volume(lminmax)
else:
return super(CVDefinedVolume, self).__sub__(other)
def _is_iterable(self, val):
try:
# openmm.Quantity erroneously allows iter, so use len
# besides, CVs shouldn't return generators
_ = len(val)
except TypeError:
return False
else:
cv = self.collectivevariable
warnings.warn("The CV '" + str(cv.name) + "' returns an "
"iterable. This may lead to problem in analysis.")
return True
def _get_cv_float(self, snapshot):
val = self.collectivevariable(snapshot)
if self._cv_returns_iterable is None:
self._cv_returns_iterable = self._is_iterable(val)
return val.__float__()
def __call__(self, snapshot):
l = self._get_cv_float(snapshot)
# we explicitly test for infinity to allow the user to
# define `lambda_min/max='inf'` also when using units
# an openmm unit cannot be compared to a python infinite float
if self.lambda_min != float('-inf') and self.lambda_min > l:
return False
if self.lambda_max != float('inf') and self.lambda_max <= l:
return False
return True
def __str__(self):
return '{{x|{2}(x) in [{0:g}, {1:g}]}}'.format(
self.lambda_min, self.lambda_max, self.collectivevariable.name)
[docs]
class PeriodicCVDefinedVolume(CVDefinedVolume):
"""
As with `CVDefinedVolume`, but for a periodic order parameter.
Defines a Volume containing all states where collectivevariable, a periodic
function wrapping into the range [period_min, period_max], is in the
given range [lambda_min, lambda_max].
Attributes
----------
period_min : float (optional)
minimum of the periodic domain
period_max : float (optional)
maximum of the periodic domain
"""
_excluded_attr = ['wrap']
[docs]
def __init__(
self, collectivevariable, lambda_min=0.0, lambda_max=1.0,
period_min=None, period_max=None):
super(PeriodicCVDefinedVolume, self).__init__(collectivevariable,
lambda_min, lambda_max)
self.period_min = period_min
self.period_max = period_max
if (period_min is not None) and (period_max is not None):
self._period_shift = period_min
self._period_len = period_max - period_min
if self.lambda_max - self.lambda_min > self._period_len:
raise Exception("Range of volume larger than periodic bounds.")
elif self.lambda_max-self.lambda_min == self._period_len:
# this is only the case that we really have a FullVolume
self.lambda_min = period_min
self.lambda_max = period_max
# hack: better to create factory, returning FullVolume
# this hack: https://stackoverflow.com/questions/38541015/
class MonkeyPatch(type(self)):
def __call__(self, *arg, **kwarg):
return True
self.__class__ = MonkeyPatch
else:
self.lambda_min = self.do_wrap(lambda_min)
self.lambda_max = self.do_wrap(lambda_max)
self.wrap = True
else:
self.wrap = False
def do_wrap(self, value):
"""Wraps `value` into the periodic domain."""
# this looks strange and mimics the modulo operation `%` while
# being fully compatible for openmm quantities and plain python as
# well working for ints and floats.
val = value - self._period_shift
# little trick to check for positivity without knowing the the units
# or if it actually has units
if val > val * 0:
return value - int(val / self._period_len) * self._period_len
else:
wrapped = value + int((self._period_len - val) / self._period_len) \
* self._period_len
if wrapped >= self._period_len:
wrapped -= self._period_len
return wrapped
# next few functions add support for range logic
def _copy_with_new_range(self, lmin, lmax):
return PeriodicCVDefinedVolume(self.collectivevariable, lmin, lmax,
self.period_min, self.period_max)
@staticmethod
def range_and(amin, amax, bmin, bmax):
return range_logic.periodic_range_and(amin, amax, bmin, bmax)
@staticmethod
def range_or(amin, amax, bmin, bmax):
return range_logic.periodic_range_or(amin, amax, bmin, bmax)
@staticmethod
def range_sub(amin, amax, bmin, bmax):
return range_logic.periodic_range_sub(amin, amax, bmin, bmax)
def __invert__(self):
# consists of swapping max and min
return PeriodicCVDefinedVolume(self.collectivevariable,
self.lambda_max, self.lambda_min,
self.period_min, self.period_max
)
def __call__(self, snapshot):
l = self._get_cv_float(snapshot)
if self.wrap:
l = self.do_wrap(l)
if self.lambda_min > self.lambda_max:
return l >= self.lambda_min or l < self.lambda_max
else:
return self.lambda_min <= l < self.lambda_max
def __str__(self):
if self.wrap:
fcn = 'x|({0}(x) - {2:g}) % {1:g} + {2:g}'.format(
self.collectivevariable.name,
self._period_len, self._period_shift)
if self.lambda_min < self.lambda_max:
domain = '[{0:g}, {1:g}]'.format(
self.lambda_min, self.lambda_max)
else:
domain = '[{0:g}, {1:g}] union [{2:g}, {3:g}]'.format(
self._period_shift, self.lambda_max,
self.lambda_min, self._period_shift+self._period_len)
return '{'+fcn+' in '+domain+'}'
else:
return '{{x|{2}(x) [periodic] in [{0:g}, {1:g}]}}'.format(
self.lambda_min, self.lambda_max,
self.collectivevariable.name)
class VoronoiVolume(Volume):
'''
Volume given by a Voronoi cell specified by a set of centers
Parameters
----------
collectivevariable : MultiRMSDCV
must be an MultiRMSDCV collectivevariable that returns several RMSDs
state : int
the index of the center for the chosen voronoi cell
Attributes
----------
collectivevariable : collectivevariable
the collectivevariable object
state : int
the index of the center for the chosen voronoi cell
'''
def __init__(self, collectivevariable, state):
super(VoronoiVolume, self).__init__()
self.collectivevariable = collectivevariable
self.state = state
def cell(self, snapshot):
'''
Returns the index of the voronoicell snapshot is in
Parameters
----------
snapshot : :class:`opensampling.engines.BaseSnapshot`
the snapshot to be tested
Returns
-------
int
index of the voronoi cell
'''
distances = self.collectivevariable(snapshot)
min_val = 1000000000.0
min_idx = -1
for idx, d in enumerate(distances):
if d < min_val:
min_val = d
min_idx = idx
return min_idx
def __call__(self, snapshot, state=None):
'''
Returns `True` if snapshot belongs to voronoi cell in state
Parameters
----------
snapshot : :class:`opensampling.engines.BaseSnapshot`
snapshot to be tested
state : int or None
index of the cell to be tested. If `None` (Default) then the
internal self.state is used
Returns
-------
bool
returns `True` is snapshot is on the specified voronoi cell
'''
if state is None:
state = self.state
return self.cell(snapshot) == state
# class VolumeFactory(object):
# @staticmethod
# def _check_minmax(minvals, maxvals):
# # if one is an integer, convert it to a list
# if type(minvals) == int or type(minvals) == float:
# if type(maxvals) == list:
# minvals = [minvals]*len(maxvals)
# else:
# raise ValueError("minvals is a scalar; maxvals is not a list")
# elif type(maxvals) == int or type(maxvals) == float:
# if type(minvals) == list:
# maxvals = [maxvals]*len(minvals)
# else:
# raise ValueError("maxvals is a scalar; minvals is not a list")
# if len(minvals) != len(maxvals):
# raise ValueError("len(minvals) != len(maxvals)")
# return (minvals, maxvals)
# @staticmethod
# def CVRangeVolumeSet(op, minvals, maxvals):
# # TODO: clean up to only use min_i or max_i in name if necessary
# minvals, maxvals = VolumeFactory._check_minmax(minvals, maxvals)
# myset = []
# for (min_i, max_i) in zip(minvals, maxvals):
# volume = CVDefinedVolume(op, min_i, max_i)
# myset.append(volume)
# return myset
# @staticmethod
# def CVRangeVolumePeriodicSet(op, minvals, maxvals,
# period_min=None, period_max=None):
# minvals, maxvals = VolumeFactory._check_minmax(minvals, maxvals)
# myset = []
# for i in range(len(maxvals)):
# myset.append(PeriodicCVDefinedVolume(op, minvals[i], maxvals[i],
# period_min, period_max))
# return myset