Source code for openpathsampling.snapshot_modifier

import logging
import copy
import abc
import functools

import numpy as np

from openpathsampling.netcdfplus import StorableNamedObject
from openpathsampling.deprecations import SNAPSHOTMODIFIER_PROB_RAT

logger = logging.getLogger(__name__)


[docs] class SnapshotModifier(StorableNamedObject): """Abstract class for snapshot modification. In general, a snapshot modifer will take a snapshot and return a modified version of that snapshot. These are useful for, e.g., two-way shooting or for committor analysis. Attributes ---------- subset_mask : list of int or None the subset to use (default None, meaning use all). The values select along the first axis of the input array. For example, in a typical shape=(n_atoms, 3) array, this will pick the atoms. Note ---- It is tempting to use the various indexing tricks in numpy to get a view on the data, instead of using this subset_mask. However, this can be dangerous: note that fancy indexing copies the data, whereas normal indexing returns a proper view. See http://stackoverflow.com/a/4371049. Rather than play with fire here, we'll just do something straightforward. We can try something more clever in the future. """ __metaclass__ = abc.ABCMeta
[docs] def __init__(self, subset_mask=None): super(SnapshotModifier, self).__init__() self.subset_mask = subset_mask
def extract_subset(self, full_array, subset=None): """Extracts elements from full_array according to self.subset_mask Note that, if ``subset`` and ``self.subset`` are None, this returns ``full_array``. If you intend to modify the object returned by this functions, you should ensure that your input ``full_array`` is a copy of any orginal immutable data. Parameters ---------- full_array : list-like the input array subset : list of int or None the subset to use; see ``SnapshotModifier.subset_mask``. Default (None) uses the value of ``self.subset_mask``. Returns ------- list-like the elements of full_array which are selected by self.subset_mask, or full_array if subset and self.subset_mask are None """ if subset is None: subset = self.subset_mask if subset is None: return full_array else: return [full_array[i] for i in subset] def apply_to_subset(self, full_array, modified, subset_mask=None): """Replaces elements of full_array according to the subset mask This returns the full_array, but the modification is done in-place. Parameters ---------- full_array : list-like array to modify modified : list-like array containing len(self.subset_mask) elements which will replace those in `full_array` subset_mask : list of int or None the subset to use; see ``SnapshotModifier.subset_mask``. Default (None) uses the value of ``self.subset_mask``. Returns ------- full_array : list-like modified version of the input array, where the elements specified by self.subset_mask have been replaced """ if subset_mask is None: subset_mask = self.subset_mask if subset_mask is None: subset_mask = range(len(full_array)) for (i, val) in zip(subset_mask, modified): full_array[i] = val return full_array @abc.abstractmethod def __call__(self, snapshot): raise NotImplementedError def probability_ratio(self, old_snapshot, new_snapshot): """Only here for backwards compatability, will raise in the future""" SNAPSHOTMODIFIER_PROB_RAT.warn(stacklevel=3) return 1.0
[docs] class NoModification(SnapshotModifier): """Modifier with no change: " Parameters ---------- as_copy : bool, default True if True calls return a copy of the snapshot, else the snapshot itself. """
[docs] def __init__(self, as_copy=True): # masking is nonsense for no modification, but used in testing so this # is here to conserve API super(NoModification, self).__init__(subset_mask=None) self.as_copy = as_copy
def __call__(self, snapshot): return snapshot.copy() if self.as_copy else snapshot def probability_ratio(self, old_snapshot, new_snapshot): """This modifier does not alter the snapshot, so equal probability.""" return 1.0
[docs] class RandomVelocities(SnapshotModifier): """Randomize velocities according to the Boltzmann distribution. Notes ----- This modifier will only work with snapshots that have the ``velocities`` feature and the ``masses`` feature. Furthermore, the units have to be such that the input ``beta`` and the features `masses` and `velocities` are all in the same unit system. In particular, ``1.0 / beta * masses`` must be in units of ``velocity**2``. For the OpenMMEngine, for example (after ``from openmm import unit as u``), the ``beta`` parameter for 300 K would be created with .. code-block:: python beta = 1.0 / (300.0 * u.kelvin * u.BOLTZMANN_CONSTANT_kB) Parameters ---------- beta : float or openmm.unit.Quantity inverse temperature (including kB) for the distribution engine : :class:`.DynamicsEngine` or None engine to be used for constraints; if None, use the snapshot's engine subset_mask : list of int or None the subset to use (default None, meaning use all). The values select along the first axis of the input array. For example, in a typical shape=(n_atoms, 3) array, this will pick the atoms. """
[docs] def __init__(self, beta=None, engine=None, subset_mask=None): super(RandomVelocities, self).__init__(subset_mask) self.beta = beta self.engine = engine
def _default_random_velocities(self, snapshot, beta, subset): if beta is None: raise RuntimeError("Engine can't use RandomVelocities") # raises AttributeError is snapshot doesn't support velocities velocities = copy.copy(snapshot.velocities) # copy.copy for units vel_subset = self.extract_subset(velocities, subset) # raises AttributeError if snapshot doesn't support masses feature all_masses = snapshot.masses masses = self.extract_subset(all_masses, subset) n_spatial = len(vel_subset[0]) n_atoms = len(vel_subset) for atom_i in range(n_atoms): radicand = 1.0 / (self.beta * masses[atom_i]) try: # preference that masses is quantity or np.array sigma = radicand.sqrt() except AttributeError: # if masses regular list sigma = np.sqrt(radicand) vel_subset[atom_i] = sigma * np.random.normal(size=n_spatial) self.apply_to_subset(velocities, vel_subset, subset) new_snap = snapshot.copy_with_replacement(velocities=velocities) # applying constraints, if they exist if self.engine is None: engine = new_snap.engine else: engine = self.engine try: apply_constraints = engine.apply_constraints except AttributeError: pass # fine if there isn't one else: new_snap = apply_constraints(new_snap) return new_snap def __call__(self, snapshot): # default value; we'll override if needed try: beta = self.beta if self.beta is not None else self.engine.beta except AttributeError: beta = None make_snapshot = functools.partial( self._default_random_velocities, beta=beta, subset=self.subset_mask ) if self.engine: try: make_snapshot = functools.partial( self.engine.randomize_velocities, beta=beta, subset=self.subset_mask ) except AttributeError: pass # use default new_snap = make_snapshot(snapshot) return new_snap def probability_ratio(self, old_snapshot, new_snapshot): # Should always be 1.0 becasue it is directly sampled from the right # distribution return 1.0
[docs] class GeneralizedDirectionModifier(SnapshotModifier): """ Snapshot modifier which changes velocity direction with constant energy. Abstract class with core implementation. The implementation here requires takes `delta_v` to be the standard deviation of a Gaussian distribution of velocity displacements. The velocities are modified according to that displacement, then renormalized so that each atom still has the same total momentum as before (although the direction can be changed). Parameters ---------- delta_v : float or array-like of float velocity change parameter, as the width of a Gaussian from which each degree of freedom's velocity change is chosen. Can be a list with length ``n_atoms`` (mapping to each atom); length equal to the subset mask (mapping to each element of the subset mask); or a float (mapping the same value to all atoms). subset_mask : list of int or None the subset to use (default None, meaning use all). The values select along the first axis of the input array. For example, in a typical shape=(n_atoms, 3) array, this will pick the atoms. remove_linear_momentum : bool whether the total linear momentum should be removed from the system, default is True See Also -------- VelocityDirectionModifier SingleAtomVelocityDirectionModifier """
[docs] def __init__(self, delta_v, subset_mask=None, remove_linear_momentum=True, engine=None): super(GeneralizedDirectionModifier, self).__init__(subset_mask) self.delta_v = delta_v self.remove_linear_momentum = remove_linear_momentum self.engine = engine
def _verify_snapshot(self, snapshot): """ Verifies that a snapshot has the right number of degrees of freedom. The approach implemented in this will not satisfy detailed balance if there are constraints on the atoms that are changing. This is a way of checking for that. Parameters ---------- snapshot : :class:`.Snapshot` input snapshot to check for validity """ try: has_constraints = self.engine.has_constraints() except AttributeError: pass else: if has_constraints: raise RuntimeError("Cannot use this snapshot modifier with " "an engine that has constraints.") try: n_dofs = snapshot.n_degrees_of_freedom except AttributeError: raise RuntimeError("Snapshot missing n_degrees_of_freedom. " + "Can't use this snapshot modifier.") # all engines should have n_spatial and n_atoms n_spatial = snapshot.engine.n_spatial n_atoms = snapshot.engine.n_atoms # NOTE: none of our engines currently explicitly remove angular # (but isn't angular momentum impossible in periodic condensed # phase) # try: # box_vectors = snapshot.box_vectors # except AttributeError: # box_vectors = None remove_angular = 0 # if box_vectors is None else 1 remove_linear = 1 if n_atoms != 1 else 0 if remove_linear: try: ignore = snapshot.engine.ignore_linear_momentum except AttributeError: ignore = False remove_linear = 0 if ignore else remove_linear n_motion_removers = n_spatial * (remove_linear + remove_angular) n_dofs_required = n_spatial * n_atoms - n_motion_removers if n_dofs != n_dofs_required: raise RuntimeError("Snapshot has " + str(n_dofs) + " degrees of freedom, not " + str(n_dofs_required) + ". " + "Are there constraints? Constraints can't" + " be used with this modifier.") def _select_atoms_to_modify(self, n_subset_atoms): raise NotImplementedError def _dv_widths(self, n_atoms, n_subset_atoms): """ Generate the list of velocity delta widths. Parameters ---------- n_atoms : int number of total atoms n_subset_atoms : int number of atoms in the subset to be (possibly) changed Returns ------- list, length n_subset_atoms list of velocity deltas associated with each atom from the subset """ try: dv_widths = list(self.delta_v) except TypeError: dv_widths = [self.delta_v] * n_subset_atoms if len(dv_widths) == n_atoms: dv_widths = self.extract_subset(dv_widths) # assert len(dv_widths) == n_subset_atoms return dv_widths @staticmethod def _remove_linear_momentum(velocities, masses): """ Remove COM motion. Parameters ---------- velocities : array-like, shape (n_atoms, n_spatial) input velocities (after snapshot change) masses : array-like, shape (n_atoms,) masses of each atom Returns ------- array-like velocities adjusted to have 0 linear momentum """ # TODO: initially, maybe see if there's an internal motion remover # to do most of this? and get KE from a snapshot feature? n_atoms = len(masses) inv_masses = 1.0 / masses momenta = velocities * masses[:, np.newaxis] total_momenta = sum(momenta, 0*momenta[0]) remove_momenta = total_momenta / n_atoms remove_velocities = inv_masses[:, np.newaxis] * remove_momenta velocities -= remove_velocities return velocities @staticmethod def _rescale_kinetic_energy(velocities, masses, double_KE): """ Rescale KE to desired value. Parameters ---------- velocities : array-like, shape (n_atoms, n_spatial) input velocities (after snapshot change) masses : array-like, shape (n_atoms,) masses of each atom double_KE : float or unitted Quantity the desired kinetic energy multiplied by 2.0 (because to avoid needing to multiple by 1/2 internally) Returns ------- array-like velocities adjusted to have the desired kinetic energy """ # from here, we're doing the KE rescaling # can't just use the dot product because of openmm.unit momenta = velocities * masses[:, np.newaxis] dof_ke = momenta * velocities zero_energy = 0 * dof_ke[0][0] new_ke = sum(sum(dof_ke, zero_energy), zero_energy) rescale_factor = np.sqrt(double_KE / new_ke) velocities *= rescale_factor return velocities def __call__(self, snapshot): """ Primary call function to be used by subclasses. Uses the :meth:`._select_atoms_to_modify` method to determine exactly which atoms from the subset should be modified. This method must be written in subclasses. Parameters ---------- snapshot : :class:`.Snapshot` initial snapshot Returns ------- :class:`.Snapshot` modified snapshot """ self._verify_snapshot(snapshot) velocities = copy.copy(snapshot.velocities) vel_subset = self.extract_subset(velocities) atoms_to_change = self._select_atoms_to_modify(len(vel_subset)) dv_widths = self._dv_widths(n_atoms=len(velocities), n_subset_atoms=len(vel_subset)) # zero_with_units is a hack for using units (or not) in `sum` zero_with_units = (vel_subset[0][0] - vel_subset[0][0])**2 for atom_i in atoms_to_change: initial_sum_sq_vel = sum([v**2 for v in vel_subset[atom_i]], zero_with_units) randoms = np.random.normal(size=len(vel_subset[atom_i])) delta_v = dv_widths[atom_i] * randoms vel_subset[atom_i] += delta_v final_sum_sq_vel = sum([v**2 for v in vel_subset[atom_i]], zero_with_units) rescale_factor = np.sqrt(initial_sum_sq_vel / final_sum_sq_vel) vel_subset[atom_i] *= rescale_factor self.apply_to_subset(velocities, vel_subset) # calculate the total KE so we can preserve it masses = snapshot.masses momenta = velocities * masses[:, np.newaxis] dof_double_KE = momenta * velocities zero_energy = 0 * dof_double_KE[0][0] double_KE = sum(sum(dof_double_KE, zero_energy), zero_energy) if self.remove_linear_momentum: velocities = self._remove_linear_momentum(velocities, masses) self._rescale_kinetic_energy(velocities, masses, double_KE) new_snap = snapshot.copy_with_replacement(velocities=velocities) # NOTE: no constraint correction here! constraints are not allowed! return new_snap def probability_ratio(self, old_snapshot, new_snapshot): # No constraints allowed, so this should always be 1.0 return 1.0
[docs] class VelocityDirectionModifier(GeneralizedDirectionModifier): """ Randomly change all the velocities (in the subset masked, at least) according to the given delta_v. Parameters ---------- delta_v : float or array-like of float velocity change parameter, as the width of a Gaussian from which each degree of freedom's velocity change is chosen. Can be a list with length ``n_atoms`` (mapping to each atom); length equal to the subset mask (mapping to each element of the subset mask); or a float (mapping the same value to all atoms). subset_mask : list of int or None the subset to use (default None, meaning use all). The values select along the first axis of the input array. For example, in a typical shape=(n_atoms, 3) array, this will pick the atoms. rescale_linear_momenta : bool whether the total linear momentum should be removed from the system, default is True See Also -------- GeneralizedDirectionModifier SingleAtomVelocityDirectionModifier """ def _select_atoms_to_modify(self, n_subset_atoms): return list(range(n_subset_atoms))
[docs] class SingleAtomVelocityDirectionModifier(GeneralizedDirectionModifier): """ Change a single atom according to the ``delta_v``. Note that even though this only uses the ``delta_v`` to change one atom, the velocities of other atoms will also be changed in order to preserve the initial kinetic energy and possibly also to remove linear momenta. Parameters ---------- delta_v : float or array-like of float velocity change parameter, as the width of a Gaussian from which each degree of freedom's velocity change is chosen. Can be a list with length ``n_atoms`` (mapping to each atom); length equal to the subset mask (mapping to each element of the subset mask); or a float (mapping the same value to all atoms). subset_mask : list of int or None the subset to use (default None, meaning use all). The values select along the first axis of the input array. For example, in a typical shape=(n_atoms, 3) array, this will pick the atoms. rescale_linear_momenta : bool whether the total linear momentum should be removed from the system, default is True See Also -------- GeneralizedDirectionModifier VelocityDirectionModifier """ def _select_atoms_to_modify(self, n_subset_atoms): return [np.random.choice(range(n_subset_atoms))]