Source code for openpathsampling.numerics.histogram

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from .lookup_function import LookupFunction, VoxelLookupFunction
import collections
import warnings
from functools import reduce


[docs] class SparseHistogram(object): """ Base class for sparse-based histograms. Parameters ---------- bin_widths : array-like bin (voxel) size left_bin_edges : array-like lesser side of the bin (for each direction) """
[docs] def __init__(self, bin_widths, left_bin_edges): self.bin_widths = np.array(bin_widths) if left_bin_edges is None: self.left_bin_edges = None else: self.left_bin_edges = np.array(left_bin_edges) self.count = 0 self.name = None self._histogram = None
def empty_copy(self): """Returns a new histogram with the same bin shape, but empty""" return type(self)(self.bin_widths, self.left_bin_edges) def histogram(self, data=None, weights=None): """Build the histogram. Parameters ---------- data : list of list of floats input data weights : list of floats weight for each input data point Returns ------- collection.Counter : copy of the current counter """ if data is None and self._histogram is None: raise RuntimeError("histogram() called without data!") elif data is not None: self._histogram = collections.Counter({}) return self.add_data_to_histogram(data, weights) else: return self._histogram.copy() @staticmethod def sum_histograms(hists): # (w, r) = (hists[0].bin_width, hists[0].bin_range) # newhist = Histogram(bin_width=w, bin_range=r) newhist = hists[0].empty_copy() newhist._histogram = collections.Counter({}) for hist in hists: if not newhist.compare_parameters(hist): raise RuntimeError newhist.count += hist.count newhist._histogram += hist._histogram return newhist def map_to_float_bins(self, trajectory): return (np.asarray(trajectory) - self.left_bin_edges) / self.bin_widths def map_to_bins(self, data): """ Parameters ---------- data : np.array input data Returns ------- tuple: the bin that the data represents """ # Reshape data to prevent accidental wrong output data = np.asarray(data).reshape(self.left_bin_edges.shape) return tuple(np.floor((data - self.left_bin_edges) / self.bin_widths)) def add_data_to_histogram(self, data, weights=None): """Adds data to the internal histogram counter. Parameters ---------- data : list or list of list input data weights : list or None weight associated with each datapoint. Default `None` is same weights for all Returns ------- collections.Counter : copy of the current histogram counter """ if self._histogram is None: return self.histogram(data, weights) if weights is None: weights = [1.0]*len(data) part_hist = sum((collections.Counter({self.map_to_bins(d): w}) for (d, w) in zip(data, weights)), collections.Counter({})) self._histogram += part_hist self.count += len(data) if weights is None else sum(weights) return self._histogram.copy() @staticmethod def _left_edge_to_bin_edge_type(left_bins, widths, bin_edge_type): if bin_edge_type == "l": return left_bins elif bin_edge_type == "m": return left_bins + 0.5 * widths elif bin_edge_type == "r": return left_bins + widths elif bin_edge_type == "p": pass # TODO: patches; give the range else: raise RuntimeError("Unknown bin edge type: " + str(bin_edge_type)) def xvals(self, bin_edge_type): """Position values for the bin Parameters ---------- bin_edge_type : 'l' 'm', 'r', 'p' type of values to return; 'l' gives left bin edges, 'r' gives right bin edges, 'm' gives midpoint of the bin, and 'p' is not implemented, but will give vertices of the patch for the bin Returns ------- np.array : The values of the bin edges """ int_bins = np.array(self._histogram.keys()) left_bins = int_bins * self.bin_widths + self.left_bin_edges return self._left_edge_to_bin_edge_type(left_bins, self.bin_widths, bin_edge_type) def __call__(self, bin_edge_type="m"): return VoxelLookupFunction(left_bin_edges=self.left_bin_edges, bin_widths=self.bin_widths, counter=self._histogram) def normalized(self, raw_probability=False, bin_edge="m"): """ Callable normalized version of the sparse histogram. Parameters ---------- raw_probability : bool if True, the voxel size is ignored and the sum of the counts adds to one. If False (default), the sum of the counts times the voxel volume adds to one. bin_edge : string not used; here for compatibility with 1D versions Returns ------- :class:`.VoxelLookupFunction` callable version of the normalized histogram """ voxel_vol = reduce(lambda x, y: x.__mul__(y), self.bin_widths) scale = voxel_vol if not raw_probability else 1.0 norm = 1.0 / (self.count * scale) counter = collections.Counter({k: self._histogram[k] * norm for k in self._histogram.keys()}) return VoxelLookupFunction(left_bin_edges=self.left_bin_edges, bin_widths=self.bin_widths, counter=counter) def compare_parameters(self, other): """Test whether the other histogram has the same parameters. Used to check whether we can simply combine these histograms. Parameters ---------- other : :class:`.SparseHistogram` histogram to compare with Returns ------- bool : True if these were set up with equivalent parameters, False otherwise """ # None returns false: use that as a quick test if other is None: return False if self.left_bin_edges is None or other.left_bin_edges is None: # this is to avoid a numpy warning on the next return self.left_bin_edges is other.left_bin_edges if self.left_bin_edges != other.left_bin_edges: return False if self.bin_widths != other.bin_widths: return False return True
[docs] class Histogram(SparseHistogram): """Wrapper for numpy.histogram with additional conveniences. In addition to the behavior in numpy.histogram, this provides a few additional calculations, as well as behavior that allows for better interactive use (tricks to assist caching by libraries using it, etc.) """
[docs] def __init__(self, n_bins=None, bin_width=None, bin_range=None): """Creates the parameters for the histogram. Either `n_bins` or `bin_width` must be given. If `bin_width` is used, then `bin_range` is required. If `n_bins` is used, then `bin_range` is optional. `n_bins` overrides `bin_width`. If no options are given, the default is to use 20 bins and the range generated by np.histogram. """ # this is to compare whether another histogram had the same setup, # and is useful for other programs that want to cache a histogram self._inputs = [n_bins, bin_width, bin_range] # regularize options self.bin_width = bin_width self.bin_range = bin_range if bin_range is not None: max_bin = max(bin_range) min_bin = min(bin_range) if bin_width is not None: self.n_bins = int(math.ceil((max_bin-min_bin)/self.bin_width)) # if this isn't actually divisible, you'll get one extra bin if n_bins is not None: self.n_bins = n_bins self.bin_width = (max_bin-min_bin)/(self.n_bins) self.bins = [min_bin + self.bin_width*i for i in range(self.n_bins+1)] else: if n_bins is not None: self.n_bins = n_bins self.bin_width = None else: self.n_bins = 20 # default self.bins = self.n_bins try: left_bin_edges = (self.bins[0],) except TypeError: left_bin_edges = None super(Histogram, self).__init__(bin_widths=(self.bin_width,), left_bin_edges=left_bin_edges)
def empty_copy(self): return type(self)(bin_width=self.bin_width, bin_range=self.bin_range) def histogram(self, data=None, weights=None): """Build the histogram based on `data`. Note ---- Calling this with new data overwrites the previous histogram. This is the expected behavior; in using this, you should check if the histogram parameters have changed from a previous run (using `compare_parameters`) and you should be aware whether your data has changed. If you want to add data to the histogram, you should use `add_data_to_histogram`. """ if self.left_bin_edges is not None: return super(Histogram, self).histogram(data, weights) if data is not None: max_val = max(data) min_val = min(data) self.bin_width = (max_val-min_val)/self.bins self.left_bin_edges = np.array((min_val,)) self.bin_widths = np.array((self.bin_width,)) return super(Histogram, self).histogram(data, weights) def xvals(self, bin_edge_type="l"): int_bins = np.array(list(self._histogram.keys()))[:, 0] # always include left_edge_bin as 0 point; always include 0 and # greater bin values (but allow negative) min_bin = min(min(int_bins), 0) n_bins = max(int_bins) - min_bin + 1 width = self.bin_widths[0] left_bins = (self.left_bin_edges[0] + np.arange(n_bins) * width) return self._left_edge_to_bin_edge_type(left_bins, width, bin_edge_type) def __call__(self, bin_edge="m"): """Return copy of histogram if it has already been built""" vals = self.xvals(bin_edge) hist = self.histogram() bins = sorted(hist.keys()) min_bin = min(bins[0][0], 0) max_bin = bins[-1][0] bin_range = range(int(min_bin), int(max_bin)+1) hist_list = [hist[(b,)] for b in bin_range] return LookupFunction(vals, hist_list) def compare_parameters(self, other): """Return true if `other` has the same bin parameters as `self`. Useful for checking whether a histogram needs to be rebuilt. """ if not super(Histogram, self).compare_parameters(other): return False if type(other.bins) is not int: if type(self.bins) is int: return False for (t, b) in zip(self.bins, other.bins): if t != b: return False else: return self._inputs == other._inputs return True def _normalization(self): """Return normalization constant (integral over this histogram).""" hist = self('l') bin_edges = self.xvals('l') dx = [bin_edges[i+1] - bin_edges[i] for i in range(len(bin_edges)-1)] dx += [dx[-1]] # assume the "forever" bin is same as last limited norm = np.dot(hist.values(), dx) return norm # Yes, the following could be cached. No, I don't think it is worth it. # Keep in mind that we need a separate cache for each one that we build, # and that typically it will take almost no time to build one of these # (runtime in linear in number of histogram bins). Adding caching # complicates the code for no real benefit (you're more likely to suffer # from L2 cache misses than to get a speedup). def normalized(self, raw_probability=False, bin_edge="m"): """Return normalized version of histogram. By default (`raw_probability` false), this returns the histogram normalized by its integral (according to rectangle-rule integration). If `raw_probability` is true, this returns the histogram normalized by the sum of the bin counts, with no consideration of the bin widths. """ normed_hist = self() # returns a copy nnorm = self._normalization() if not raw_probability else self.count norm = 1.0/nnorm normed_hist_list = [normed_hist(k) * norm for k in normed_hist.keys()] xvals = self.xvals(bin_edge) return LookupFunction(xvals, normed_hist_list) def cumulative(self, maximum=1.0, bin_edge="r"): """Cumulative from the left: number of values less than bin value. Use `maximum=None` to get the raw counts. """ cumul_hist = [] total = 0.0 hist = self(bin_edge) for k in sorted(hist.keys()): total += hist(k) cumul_hist.append(total) cumul_hist = np.array(cumul_hist) if total == 0: warnings.warn("No non-zero data in the histogram") elif maximum is not None: cumul_hist *= maximum / total xvals = self.xvals(bin_edge) return LookupFunction(xvals, cumul_hist) def reverse_cumulative(self, maximum=1.0, bin_edge="l"): """Cumulative from the right: number of values greater than bin value. Use `maximum=None` to get the raw counts. """ cumul_hist = [] total = 0.0 hist = self(bin_edge) for k in reversed(sorted(hist.keys())): total += hist(k) cumul_hist.insert(0, total) cumul_hist = np.array(cumul_hist) if total == 0: warnings.warn("No non-zero data in the histogram") elif maximum is not None: cumul_hist *= maximum / total xvals = self.xvals(bin_edge) return LookupFunction(xvals, cumul_hist) def rebinned(self, scaling): """Redistributes histogram bins of width binwidth*scaling Exact if scaling is an integer; otherwise uses the assumption that original bins were uniformly distributed. Note that the original data is not destroyed. """ # TODO pass def plot_bins(self, scaling=1.0): """Bins used in plotting. Scaling useful when plotting `rebinned`""" # TODO: add scaling support return self.bins[1:]
[docs] def histograms_to_pandas_dataframe(hists, fcn="histogram", fcn_args={}): """Converts histograms in hists to a pandas data frame""" keys = None frames = [] for hist in hists: # check that the keys match if keys is None: keys = hist.xvals() for (t, b) in zip(keys, hist.xvals()): if t != b: raise Warning("Bins don't match up") if hist.name is None: hist.name = int(hists.index(hist)) hist_data = { "histogram": hist, "normalized": hist.normalized, "reverse_cumulative": hist.reverse_cumulative, "cumulative": hist.cumulative, "rebinned": hist.rebinned }[fcn](**fcn_args).values() bin_edge = { "histogram": "m", "normalized": "m", "reverse_cumulative": "l", "cumulative": "r" }[fcn] xvals = hist.xvals(bin_edge) frames.append(pd.DataFrame({hist.name: hist_data}, index=xvals)) all_frames = pd.concat(frames, axis=1) return all_frames.fillna(0.0)
def write_histograms(fname, hists): """Writes all histograms in list `hists` to file named `fname` If the filename is the empty string, then output is to stdout. Assumes that all files should have the same bins. """ pass # TODO: might as well add a main function to this; read data / weight from # stdin and output an appropriate histogram depending on some options. Then # it is both a useful script and a library class!
[docs] class Histogrammer(object): """ Basically a dictionary to track what each histogram should be making. """
[docs] def __init__(self, f, f_args=None, hist_args=None): self.f = f self.f_args = f_args self._hist_args = hist_args self.empty_hist = Histogram(**self._hist_args)
@property def hist_args(self): return self._hist_args @hist_args.setter def hist_args(self, val): self._hist_args = val self.empty_hist = Histogram(**self._hist_args)
[docs] class HistogramPlotter2D(object): """ Convenience tool for plotting 2D histograms and plotting data atop them. The difficulty is that matplotlib uses the row/column *numbers* of a pandas.DataFrame as the actual internal axis. This class carries all the information to properly plot things (even mapping to CVs, if the histogram supports that). The descriptions below will discuss "real space," "bin space," and "frame space." Real space refers to the actual values of the input data. Bin space refers to the bins that come out of that for histogramming (made into continuous parameters). Frame space is bin space shifted such that the lowest bin values are 0. Parameters ---------- histogram : :class:`.SparseHistogram` input histogram to plot normed : bool whether to normalize the histogram (using raw_probability=True) xticklabels : list of float the desired locations for plot xticks, in real space yticklabels : list of float the desired locations for plot yticks, in real space xlim : 2-tuple of (float, float) horizontal (x-value) range of (minimum, maximum) bounds for displaying the plot ylim : 2-tuple of (float, float) vertical (y-value) range of (minimum, maximum) bounds for displaying the plot label_format : string Python format-style string for formatting tick labels. Default is '{:}'. """
[docs] def __init__(self, histogram, normed=True, xticklabels=None, yticklabels=None, xlim=None, ylim=None, label_format="{:}"): self.histogram = histogram self.normed = normed self.xticklabels = xticklabels self.yticklabels = yticklabels self.xlim = xlim self.ylim = ylim self.label_format = label_format self.xticks_, self.xlim_, self.yticks_, self.ylim_ = self.axes_setup( xticklabels, yticklabels, xlim, ylim )
def to_bins(self, alist, dof): """Convert real-space values to bin-space values for a given dof Parameters ---------- alist : list of float input in real-space dof : integer (0 or 1) degree of freedom; 0 is x, 1 is y Returns ------- list of float : the outputs in bin-space """ left_edge = self.histogram.left_bin_edges[dof] bin_width = self.histogram.bin_widths[dof] result = None if alist is not None: result = (np.asarray(alist) - left_edge) / bin_width return result def axis_input(self, hist, ticklabels, lims, dof): """Get ticks, range, and limits for a given DOF Parameters ---------- hist : list of float input data from the histogram (bin-space) ticklabels : list of float or None user-set tick labels for this DOF (real-space) lims : 2-tuple (float, float) or None user-set plot limits for this DOF dof : integer (0 or 1) degree of freedom; 0 is x, 1 is y Returns ------- ticks_ : list of float or None user-set ticks in bin-space range_ : list of float range for the pandas.DataFrame (bin-space) lims_ : 2-tuple (float, float) range for plot visualization (bin-space) """ ticks_ = self.to_bins(ticklabels, dof) lims_ = self.to_bins(lims, dof) ticks = [] if ticks_ is None else list(ticks_) lims = [] if lims_ is None else list(lims_) range_ = (int(min(list(hist) + ticks + lims)), int(max(list(hist) + ticks + lims))) if lims_ is None: lims_ = (0, range_[1] - range_[0]) else: lims_ = (lims_[0] - range_[0], lims_[1] - range_[0]) return (ticks_, range_, lims_) def axes_setup(self, xticklabels, yticklabels, xlim, ylim): r"""Set up both x-axis and y-axis for plotting. Also sets self.xrange\_ and self.yrange\_, which are the (bin-space) bounds for the pandas.DataFrame. Parameters ---------- xticklabels : list of float the desired locations for plot xticks, in real space yticklabels : list of float the desired locations for plot yticks, in real space xlim : 2-tuple of (float, float) horizontal (x-value) range of (minimum, maximum) bounds for displaying the plot ylim : 2-tuple of (float, float) vertical (y-value) range of (minimum, maximum) bounds for displaying the plot Returns ------- xticks_ : list of float or None user-set xticks in bin-space xlim_ : 2-tuple (float, float) range in x for plot visualization (bin-space) yticks_ : list of float or None user-set yticks in bin-space ylim_ : 2-tuple (float, float) range in y for plot visualization (bin-space) """ if xticklabels is None: xticklabels = self.xticklabels if yticklabels is None: yticklabels = self.yticklabels if xlim is None: xlim = self.xlim if ylim is None: ylim = self.ylim x, y = list(zip(*self.histogram._histogram.keys())) xticks_, xrange_, xlim_ = self.axis_input(x, xticklabels, xlim, dof=0) yticks_, yrange_, ylim_ = self.axis_input(y, yticklabels, ylim, dof=1) self.xrange_ = xrange_ self.yrange_ = yrange_ return (xticks_, xlim_, yticks_, ylim_) def ticks_and_labels(self, ticks, ax, dof): """Obtain the plot ticks and tick labels for given dof. Parameters ---------- ticks : list of float or None user-set input (bin-space) for tick locations ax : matplotlib.Axes axes from the plot dof : integer (0 or 1) degree of freedom; 0 is x, 1 is y Returns ------- ticks : list of float tick locations (bin-space, suitable for matplotlib) labels : list of string labels for the ticks """ if dof == 0: ax_ticks = ax.get_xticks() minval = self.xrange_[0] bw = self.histogram.bin_widths[0] edge = self.histogram.left_bin_edges[0] elif dof == 1: ax_ticks = ax.get_yticks() minval = self.yrange_[0] bw = self.histogram.bin_widths[1] edge = self.histogram.left_bin_edges[1] else: # pragma: no cover raise RuntimeError("Bad DOF: " + str(dof)) to_val = lambda n: (n + minval) * bw + edge ticks = ticks if ticks is not None else ax_ticks labels = [self.label_format.format(to_val(n)) for n in ticks] return (ticks, labels) def plot(self, normed=None, xticklabels=None, yticklabels=None, xlim=None, ylim=None, **kwargs): """Plot the histogram. Parameters ---------- normed : bool whether to normalize the histogram (using raw_probability=True) xticklabels : list of float the desired locations for plot xticks, in real space yticklabels : list of float the desired locations for plot yticks, in real space xlim : 2-tuple of (float, float) horizontal (x-value) range of (minimum, maximum) bounds for displaying the plot ylim : 2-tuple of (float, float) vertical (y-value) range of (minimum, maximum) bounds for displaying the plot kwargs : additional arguments to pass to plt.pcolormesh Returns ------- PolyCollection : return value of plt.pcolormesh """ if normed is None: normed = self.normed xticks_, xlim_, yticks_, ylim_ = self.axes_setup( xticklabels, yticklabels, xlim, ylim ) if normed: hist_fcn = self.histogram.normalized(raw_probability=True) else: hist_fcn = self.histogram() df = hist_fcn.df_2d(x_range=self.xrange_, y_range=self.yrange_) self.df = df mesh = plt.pcolormesh(df.fillna(0.0).transpose(), **kwargs) (xticks, xlabels) = self.ticks_and_labels(xticks_, mesh.axes, dof=0) (yticks, ylabels) = self.ticks_and_labels(yticks_, mesh.axes, dof=1) mesh.axes.set_xticks(xticks) mesh.axes.set_yticks(yticks) mesh.axes.set_xticklabels(xlabels) mesh.axes.set_yticklabels(ylabels) plt.xlim(xlim_[0], xlim_[1]) plt.ylim(ylim_[0], ylim_[1]) plt.colorbar() return mesh def plot_trajectory(self, trajectory, *args, **kwargs): """Plot a trajectory (or CV trajectory) on the axes. Additional arguments pass to plt.plot. Parameters ---------- trajectory : :class:`.Trajectory` or list of 2-tuple list to plot; paths.Trajectory allowed if the histogram can convert it to CVs. """ x, y = list(zip(*self.histogram.map_to_float_bins(trajectory))) px = np.asarray(x) - self.xrange_[0] py = np.asarray(y) - self.yrange_[0] plt.plot(px, py, *args, **kwargs)