import openpathsampling as paths
from openpathsampling.numerics import SparseHistogram
from openpathsampling.progress import SimpleProgress
from collections import Counter
import numpy as np
class VoxelInterpolator(object):
"""
Identify voxels visited during linear interpolation between two points
This class includes some conveniences to facilate this with
n-dimensional histograms.
This is an abstract class; the __call__ method needs to be implemented
by subclasses. The __call__ method should take the arguments ``old_pt``
and ``new_pt``, which are the points to interpolate between, and should
return the list of voxel identifiers (n-dimensional integer tuples) for
the visited voxels, excluding the voxel for ``old_pt``.
Parameters
----------
histogram : :class:`.PathHistogram`
the histogram that this will interpolate for
"""
def __init__(self, histogram):
self.histogram = histogram
@property
def left_bin_edges(self):
return self.histogram.left_bin_edges
@property
def bin_widths(self):
return self.histogram.bin_widths
def map_to_bins(self, point):
return self.histogram.map_to_bins(point)
def __call__(self, old_pt, new_pt):
raise NotImplementedError("Can't use abstract class Interpolator")
class NoInterpolation(VoxelInterpolator):
"""No interpolation.
Just returns the bin for the new point. The effect is that no
interpolation is done.
Parameters
----------
histogram : :class:`.PathHistogram`
the histogram that this will interpolate for
"""
def __call__(self, old_pt, new_pt):
return [self.map_to_bins(new_pt)]
class SubdivideInterpolation(VoxelInterpolator):
"""Interpolate by bisection.
Identifies all voxels that are visited between two points by
successively recursively taking the midpoint, until all voxels found are
adjacent (with special case handling for corners, where the line does
not go through a face).
Parameters
----------
histogram : :class:`.PathHistogram`
the histogram that this will interpolate for
"""
def _interpolated_bins(self, old_pt, new_pt):
"""Interpolate between trajectory points.
Parameters
----------
old_pt : array-like of float
previous point in CV space
new_pt : array-like of float
next point in CV space
Returns
-------
list of array-like of int
bins which interpolate between old_pt and new_pt (not including
the bin that old_pt is found in)
"""
# bins for this do not include the bin of the old point
# TODO: add a way to have this handle periodic variables as well
old_bin = self.map_to_bins(old_pt)
new_bin = self.map_to_bins(new_pt)
abs_dx = abs(np.asarray(new_bin) - np.asarray(old_bin))
manhattan_distance = sum(abs_dx)
bin_list = [new_bin]
# if the manhattan_distance is 1, we're adjacent
if manhattan_distance <= 1:
return bin_list
# otherwise, use one of the interpolation algos to find bins
bin_list = self._subdivide_interpolation(start_pt=old_pt,
end_pt=new_pt,
start_bin=old_bin,
end_bin=new_bin)
return list(set(bin_list) - set([old_bin]))
def _subdivide_interpolation(self, start_pt, end_pt, start_bin, end_bin):
"""Interpolate between bins using recursive division.
Note that this is probably not the very fastest possible algorithm,
but an easy one to prove works in arbitrary dimensions.
Paramters
---------
start_pt : array-like of float
initial point to interpolate from
end_pt : array-like of float
final point to interpolate to
start_bin : array-like of int
bin associated with initial point
end_bin : array-like of int
bin associated with final point
Returns
-------
list of array-like of int :
the bins associated with this path
"""
delta = np.asarray(end_pt) - np.asarray(start_pt)
mid_pt = start_pt + 0.5 * delta
mid_bin = self.map_to_bins(mid_pt)
# check for diagonal first
if np.all(abs(np.asarray(end_bin) - np.asarray(start_bin)) == 1):
left_edges = self.left_bin_edges + self.bin_widths * end_bin
test_array = (left_edges - start_pt) / delta
if np.allclose(test_array, test_array[0], atol=1e-6):
return [start_bin, end_bin]
elif np.allclose(delta, [0.0]*len(delta), atol=1e-6):
return [start_bin, end_bin]
manhattan_dist_start = sum(abs(np.asarray(mid_bin) -
np.asarray(start_bin)))
manhattan_dist_end = sum(abs(np.asarray(end_bin) -
np.asarray(mid_bin)))
# how much work we have to do depends on what's already adjacent
if start_bin == end_bin:
return [end_bin]
elif manhattan_dist_start == 1 and manhattan_dist_end == 1:
return [start_bin, mid_bin, end_bin]
# if we're in the same bin, only have one direction to go
elif mid_bin == start_bin:
return self._subdivide_interpolation(start_pt=mid_pt,
end_pt=end_pt,
start_bin=mid_bin,
end_bin=end_bin)
elif mid_bin == end_bin:
return self._subdivide_interpolation(start_pt=start_pt,
end_pt=mid_pt,
start_bin=start_bin,
end_bin=mid_bin)
elif manhattan_dist_start == 1:
return ([start_bin] +
self._subdivide_interpolation(start_pt=mid_pt,
end_pt=end_pt,
start_bin=mid_bin,
end_bin=end_bin))
elif manhattan_dist_end == 1:
return ([end_bin] +
self._subdivide_interpolation(start_pt=start_pt,
end_pt=mid_pt,
start_bin=start_bin,
end_bin=mid_bin))
else:
start_side = self._subdivide_interpolation(start_pt=start_pt,
end_pt=mid_pt,
start_bin=start_bin,
end_bin=mid_bin)
end_side = self._subdivide_interpolation(start_pt=mid_pt,
end_pt=end_pt,
start_bin=mid_bin,
end_bin=end_bin)
return start_side + end_side
def __call__(self, old_pt, new_pt):
return self._interpolated_bins(old_pt, new_pt)
class BresenhamInterpolation(VoxelInterpolator):
"""Interpolation based on the Bresenham line-drawing algorithm.
Basic idea from https://www.crisluengo.net/archives/400.
Parameters
----------
histogram : :class:`.PathHistogram`
the histogram that this will interpolate for
"""
def _interpolated_bins(self, new_pt, old_pt, new_bin, old_bin, delta,
n_steps):
step_size = delta / n_steps if n_steps > 0 else 0
# TODO: do I want rint or floor here?
bins = [np.rint(old_bin + (i+1) * step_size)
for i in range(n_steps)]
return bins
def __call__(self, old_pt, new_pt):
old_bin = self.map_to_bins(old_pt)
new_bin = self.map_to_bins(new_pt)
delta = np.asarray(new_bin) - np.asarray(old_bin)
n_steps = int(max(np.abs(delta)))
if n_steps == 0:
n_steps = 1
bins = self._interpolated_bins(new_pt, old_pt, new_bin, old_bin,
delta, n_steps)
return [tuple(b) for b in bins]
class BresenhamLikeInterpolation(BresenhamInterpolation):
"""Interpolation based on floating point analog to Bresenham algorithm.
Parameters
----------
histogram : :class:`.PathHistogram`
the histogram that this will interpolate for
"""
def _interpolated_bins(self, new_pt, old_pt, new_bin, old_bin, delta,
n_steps):
step_size = (np.asarray(new_pt) - np.asarray(old_pt)) / n_steps
interp_points = [old_pt + (i+1) * step_size for i in range(n_steps)]
bins = [self.map_to_bins(pt) for pt in interp_points]
return bins
# should path histogram be moved to the generic histogram.py? Seems to be
# independent of the fact that this is actually OPS
[docs]
class PathHistogram(SimpleProgress, SparseHistogram):
"""
N-dim sparse histogram for trajectories.
This allows features like interpolating between bins and normalizing the
histogram to the number of trajectories.
Parameters
----------
left_bin_edges : array-like
lesser side of the bin (for each direction)
bin_widths : array-like
bin (voxel) size
interpolate : bool or callable
how to interpolate missing bin visits. Default True gives
"subdivide" method, False gives no interpolation. Arbitrary callable
should take ``old_pt`` and ``new_pt``, and return the list of bins
that were visited, excluding the bin for ``old_pt``.
per_traj : bool
whether to normalize per trajectory (instead of per-snapshot)
"""
[docs]
def __init__(self, left_bin_edges, bin_widths, interpolate=True,
per_traj=True):
super(PathHistogram, self).__init__(left_bin_edges=left_bin_edges,
bin_widths=bin_widths)
if interpolate is True:
interpolate = BresenhamLikeInterpolation
elif interpolate is False:
interpolate = NoInterpolation
self.interpolate = interpolate(self)
self.per_traj = per_traj
def single_trajectory_counter(self, trajectory):
"""
Calculate the counter (local histogram) for an unweighted trajectory
Parameters
----------
trajectory : list of array-like
the reduced space trajectory
Returns
-------
collections.Counter
histogram counter for this trajectory
"""
# make a list of every bin visited, possibly interpolating gaps
bin_list = [self.map_to_bins(trajectory[0])]
for fnum in range(len(trajectory)-1):
bin_list += self.interpolate(trajectory[fnum],
trajectory[fnum+1])
# if self.interpolate:
# bin_list += self.interpolated_bins(trajectory[fnum],
# trajectory[fnum+1])
# else:
# bin_list += [self.map_to_bins(trajectory[fnum+1])]
local_hist = Counter(bin_list)
if self.per_traj:
# keys only exist once, so the counter gives 1 if key present
local_hist = Counter(local_hist.keys())
return local_hist
def add_data_to_histogram(self, trajectories, weights=None):
"""Adds data to the internal histogram counter.
Parameters
----------
trajectories : list of list of array-like
input data
weights : list or None
weight associated with each datapoint. Default `None` is same
weights for all
Returns
-------
collections.Counter :
copy of the current histogram counter
"""
if weights is None:
weights = [1.0] * len(trajectories)
for (traj, w) in self.progress(list(zip(trajectories, weights))):
# list so that progress can know the length
self.add_trajectory(traj, w)
return self._histogram.copy()
def add_trajectory(self, trajectory, weight=1.0):
"""Add a single trajectory to internal counter, with given weight
Parameters
----------
trajectory : list of array-like
the reduced space trajectory
weight : float
the weight of the trajectory. Default 1.0
"""
local_hist = self.single_trajectory_counter(trajectory)
local_hist = Counter({k : local_hist[k] * weight
for k in local_hist.keys()})
if self._histogram is None:
self._histogram = Counter({})
self._histogram += local_hist
self.count += weight
#TODO: some of this might be moved to a more generic TrajectoryHistogram,
# allowing reuse between PathDensityHistogram and FreeEnergyHistogram
[docs]
class PathDensityHistogram(PathHistogram):
"""Histogram for path density plot.
Parameters
----------
cvs : list of paths.CollectiveVariable
the collective variables to define the reduced space
left_bin_edges : array-like
lesser side of the bin (for each direction)
bin_widths : array-like
bin (voxel) size
interpolate : bool or string
whether to interpolate missing bin visits. String value determines
interpolation type (currently only "subdivide" allowed). Default
True gives "subdivide" method, False gives no interpolation.
"""
[docs]
def __init__(self, cvs, left_bin_edges, bin_widths, interpolate=True):
super(PathDensityHistogram, self).__init__(
left_bin_edges=left_bin_edges,
bin_widths=bin_widths,
interpolate=interpolate,
per_traj=True
)
self.cvs = cvs
def _add_ops_trajectory(self, trajectory, weight):
cv_traj = [cv(trajectory) for cv in self.cvs]
self.add_trajectory(list(zip(*cv_traj)), weight)
def add_data_to_histogram(self, trajectories, weights=None):
"""Adds data to the internal histogram counter.
Parameters
----------
trajectories : list of :class:`.Trajectory` or :class:`.Trajectory`
input data
weights : list or None
weight associated with each datapoint. Default `None` is same
weights for all
Returns
-------
collections.Counter :
copy of the current histogram counter
"""
if isinstance(trajectories, paths.Trajectory):
trajectories = [trajectories]
if weights is None:
weights = [1.0] * len(trajectories)
# TODO: add something so that we don't recalc the same traj twice
for (traj, w) in self.progress(list(zip(trajectories, weights))):
self._add_ops_trajectory(traj, w)
return self._histogram.copy()
def map_to_float_bins(self, trajectory):
"""Map trajectory to the bin value, without rounding bin number.
Unlike the :class:`.SparseHistogram` version, this allows input of
either a :class:`.Trajectory` (which is then mapped according to the
PathDensityHistogram's collective variables), or a list of numbers,
which is assumed to be the proper CV trajectory (and is the input
for the sparse histogram version, too).
Parameters
----------
trajectory : list of array-like or :class:`.Trajectory`
input trajectory or input CV-based trajectory
Returns
-------
list of array-like :
un-rounded bin value for each frame in the input trajectory
"""
if isinstance(trajectory, paths.Trajectory):
cv_traj = list(zip(*[cv(trajectory) for cv in self.cvs]))
else:
cv_traj = trajectory
return super(PathDensityHistogram, self).map_to_float_bins(cv_traj)