Source code for openpathsampling.analysis.channel_analysis

import collections
import pandas as pd
import numpy as np

from openpathsampling.netcdfplus import StorableNamedObject

[docs] class ChannelAnalysis(StorableNamedObject): """Analyze path sampling simulation for multiple channels. User defines several channels (e.g., mechanisms) as :class:`.Ensemble` objects. This checks which channels each path satisfies, and provides analysis of switching and residence. Parameters ---------- steps : iterable of :class:`.MCStep` the steps to analyze channels: dict of {string: :class:`.Ensemble`} names (keys) and ensembles (values) representing subtrajectories of the channels of interest replica: int replica ID to analyze from the steps, default is 0. """
[docs] def __init__(self, steps, channels, replica=0): super(ChannelAnalysis, self).__init__() self.channels = channels if steps is None: steps = [] self.replica = replica self._treat_multiples = 'all' self._results = {c: [] for c in list(self.channels.keys()) + [None]} if len(steps) > 0: self._analyze(steps)
def to_dict(self): return { 'results': self._results, 'treat_multiples': self._treat_multiples, 'channels': self.channels, 'replica': self.replica } @classmethod def from_dict(cls, dct): obj = cls(steps=None, channels=dct['channels'], replica=dct['replica']) obj._results = dct['results'] obj._treat_multiples = dct['treat_multiples'] return obj # separate this because I think much of the code might be generalized # later where step_num could be something else @staticmethod def _step_num(step): """Return ordinal number for the given input object. Abstracted so that other things might replace it. Parameters ---------- step : :class:`.MCStep` the step Returns ------- int : MC cycle number """ return step.mccycle def _analyze(self, steps): """Primary analysis routine. Converts the input steps to an internal ._results dictionary of channel name to list of (start, end) tuples for when that channel is occupied. Parameters ---------- steps : iterable of :class:`.MCStep` the steps to analyze """ # for now, this assumes only one ensemble per channel # (would like that to change in the future) prev_traj = None last_start = {c: None for c in self._results} for step in steps: step_num = self._step_num(step) traj = step.active[self.replica].trajectory if prev_traj is None: prev_result = {c: len(self.channels[c].split(traj)) > 0 for c in self.channels} prev_result[None] = not any(prev_result.values()) for c in last_start: if prev_result[c] is True: last_start[c] = step_num # re-use previous if the trajectory hasn't changed if traj is prev_traj: result = prev_result else: result = {c: len(self.channels[c].split(traj)) > 0 for c in self.channels} result[None] = not any(result.values()) changed = [c for c in result if result[c] != prev_result[c]] for c in changed: if result[c] is True: # switched from False to True: entered this label last_start[c] = step_num else: # switched from True to False: exited this label finish = step_num self._results[c] += [(last_start[c], finish)] last_start[c] = None prev_traj = traj prev_result = result # finish off any extras next_step = step_num + 1 # again, this can be changed for c in self._results: if last_start[c] is not None: if len(self._results[c]) > 0: # don't do double it if it's already there if self._results[c][-1][1] != step_num: self._results[c] += [(last_start[c], next_step)] # note: is the else: of the above even possible? # namely, do we need the if statement? should test that else: self._results[c] += [(last_start[c], next_step)] @property def treat_multiples(self): """ string : method for handling paths that match multiple channels Allowed values are: * 'newest': use the most recent channel entered * 'oldest': use the least recent channel entered * 'multiple': treat multiple channels as a new type of channel, e.g., 'a' and 'b' becomes 'a,b' * 'all': treat each channel individually, despite overlaps. For switching, this is the same as ???. For status, this is the same as 'multiple' """ return self._treat_multiples @treat_multiples.setter def treat_multiples(self, value): value = value.lower() if value not in ['all', 'newest', 'oldest', 'multiple']: raise ValueError("Invalid value for treat_multiples: " + str(value)) self._treat_multiples = value @staticmethod def _expand_results(results): """ Takes ._results dict and makes it into chronological list of events Note ---- The output of this is in terms of "channel events." It doesn't do anything to ensure that only one channel is defined at a given time --- so subsequent events can include overlapping times. Other functions relabel by time. See also -------- _labels_by_step_newest _labels_by_step_oldest _labels_by_step_multiple Parameters ---------- results : dict of {str: [(int, int), ...]} the results dictionary. The keys are the channel names, and the values are a list of tuples representing the start and finish step numbers for the range of steps while in this channel Returns ------- list of 3-tuples (int, int, frozenset) : the "events": each event is the tuple of start step, finish step, and channel name (as a frozenset containing one string), sorted according to the start step. """ expanded = [(domain[0], domain[1], frozenset([channel])) for channel in results for domain in results[channel]] return sorted(expanded, key=lambda tup: tup[0]) @staticmethod def _labels_by_step_newest(expanded_results): """ Makes one channel per step, based on most recent channel entered. See also -------- _expand_results _labels_by_step_oldest _labels_by_step_multiple labels_by_step Parameters ---------- expanded_results : list of 3-tuples (int, int, frozenset) input events; the output of _expand_results (see details there) Returns ------- list of 3-tuples (int, int, frozenset) events with ranges such that there is only one channel at any given step number, and that channel is the most recent entered """ relabeled = [] previous = expanded_results[0] for current in expanded_results[1:]: relabeled += [(previous[0], current[0], previous[2])] previous = current relabeled += [expanded_results[-1]] return relabeled @staticmethod def _labels_by_step_oldest(expanded_results): """ Makes one channel per step, based on least recent channel entered. See also -------- _expand_results _labels_by_step_newest _labels_by_step_multiple labels_by_step Parameters ---------- expanded_results : list of 3-tuples (int, int, frozenset) input events; the output of _expand_results (see details there) Returns ------- list of 3-tuples (int, int, frozenset) : events with ranges such that there is only one channel at any given step number, and that channel is the least recent entered """ relabeled = [] previous = expanded_results[0] for current in expanded_results[1:]: if current[1] > previous[1]: # ends after last one ended # if this isn't true, this one gets skipped # if it is true, then previous is used relabeled += [previous] # save the new starting point previous = (previous[1], current[1], current[2]) # NOTE: Tests include a case for the implicit "else" here, but # for some reason an empty `else: pass` doesn't show up as # covering the `pass` line; so removed them if relabeled[-1] != previous: relabeled += [previous] else: pass # for testing return relabeled @staticmethod def _labels_by_step_multiple(expanded_results): """Makes one channel label per step, combining all active channels. See also -------- _expand_results _labels_by_step_newest _labels_by_step_oldest labels_by_step Parameters ---------- expanded_results : list of 3-tuples (int, int, frozenset) input events; the output of _expand_results (see details there) Returns ------- list of 3-tuples (int, int, frozenset) : events such that there is only one event at any given step number, with the channel label as the set of all active channels """ relabeled = [] # start events are times when a channel is added to the active # finish events are when channel is removed from the active # both are dicts of time to a set of channels start_events = collections.defaultdict(set) finish_events = collections.defaultdict(set) for event in expanded_results: start_events[event[0]] |= set(event[2]) finish_events[event[1]] |= set(event[2]) all_event_steps = set(start_events.keys()) | set(finish_events.keys()) active_channels = set([]) prev_step_num = None # note to self: this is some elegant freaking code for step_num in sorted(list(all_event_steps)): if prev_step_num is not None: relabeled += [(prev_step_num, step_num, frozenset(active_channels))] # defaultdict gives empty if doesn't exist active_channels -= finish_events[step_num] active_channels |= start_events[step_num] prev_step_num = step_num return relabeled def labels_by_step(self, treat_multiples=None): """ Prepare internally stored results for primary analysis routines. Note ---- The results of this depend on the value of ``treat_multiples``. In fact, this method is just a switch for the specific ``treat_multiples`` values. See also -------- _labels_by_step_newest _labels_by_step_oldest _labels_by_step_multiple Parameters ---------- treat_multiples : 'all', 'newest', 'oldest', 'multiple', or None method to prepare output; see documentation on ``treat_multiples`` for details. Default is `None`, which uses the value in ``self.treat_multiples``. Returns ------- list of 3-tuples (int, int, frozenset) : events such that there is only one event at any give step """ if treat_multiples is None: treat_multiples = self.treat_multiples expanded_results = self._expand_results(self._results) method = { 'all': lambda x: x, 'newest': self._labels_by_step_newest, 'oldest': self._labels_by_step_oldest, 'multiple': self._labels_by_step_multiple }[treat_multiples] return method(expanded_results) @staticmethod def _labels_as_sets_sort_function(label): """Sort function for labels. The input labels are frozensets of lists of strings. The sort order is first by number of items in the list, and then by the list items. A list of None will be sorted into the first place. Parameters ---------- label: frozenset of list of (string or None) input label Returns ------- list: first element is the length of the input set, followed by the input as a sorted list """ label_list = list(label) if None in label_list: has_None = [None] label_list.remove(None) else: has_None = [] ll = sorted(label_list) return [len(ll)] + has_None + ll @staticmethod def label_to_string(label): """Convert set of string/None to comma-separated string. For example, frozenset(['c', 'a']) becomes 'a,c' (no space). Parameters ---------- label: frozenset of list of (string or None) input label Returns ------- string: the string for this label """ # separated for reusability return ",".join(sorted([str(l) for l in list(label)])) @property def switching_matrix(self): """ pandas.DataFrame : number of switches from one channel to another. Depends on ``treat_multiples``, see details there. """ labeled_results = self.labels_by_step() labels_in_order = [ll[2] for ll in labeled_results] labels_set = set(labels_in_order) sorted_set_labels = sorted(list(labels_set), key=self._labels_as_sets_sort_function) sorted_labels = [self.label_to_string(e) for e in sorted_set_labels] switches = [(self.label_to_string(labels_in_order[i]), self.label_to_string(labels_in_order[i+1])) for i in range(len(labeled_results)-1)] switch_count = collections.Counter(switches) df = pd.DataFrame(index=sorted_labels, columns=sorted_labels) for switch in switch_count: df.at[switch[0], switch[1]] = switch_count[switch] df = df.fillna(0) return df @property def residence_times(self): """ Dict[string, List[int]] : number of steps spent in each channel for each "stay" in that channel; allows calculations of distribution properties. Depends on ``treat_multiples``, see details there. """ labeled_results = self.labels_by_step() durations = [(self.label_to_string(step[2]), step[1] - step[0]) for step in labeled_results] results = collections.defaultdict(list) for dur in durations: results[dur[0]] += [dur[1]] return results @property def total_time(self): """ Dict[string, int] : total number of steps spent in each channel for each "stay" in that channel. Depends on ``treat_multiples``, see details there. """ residences = self.residence_times results = collections.defaultdict(int) for channel in residences: results[channel] = sum(residences[channel]) return results def status(self, step_number): """Reports which channel(s) are associated with a given step number. Note ---- Results will depend on the value of ``treat_multiples``. See details in the documentation for that. Parameters ---------- step_number : int the step number of interest Returns ------- string : the string label for the channel(s) """ treat_multiples = self.treat_multiples if self.treat_multiples == 'all': treat_multiples = 'multiple' labeled_results = self.labels_by_step(treat_multiples) for step in labeled_results: if step[0] <= step_number < step[1]: return self.label_to_string(step[2]) raise RuntimeError("Step " + str(step_number) + " outside of range." + " Max step: " + str(labeled_results[-1][1]))