Source code for openpathsampling.analysis.replica_network

import collections
import openpathsampling as paths
import pandas as pd
import scipy.sparse
from scipy.sparse.csgraph import reverse_cuthill_mckee
import networkx as nx
import logging
logger = logging.getLogger(__name__)

[docs] class ReplicaNetwork(object): """ Analysis tool for networks of replica exchanges. """
[docs] def __init__(self, scheme, steps, replicas=None): if replicas is None: replicas = steps[0].active.replica_list() try: self.n_replicas = len(replicas) except TypeError: replicas = [replicas] self.n_replicas = 1 self.replicas = replicas self.scheme = scheme self.ensembles = scheme.network.all_ensembles # set defaults self._ensemble_to_string = {} self.ensemble_order = scheme.network.all_ensembles self.traces = self._traces_from_steps(steps) self.transitions = self._transitions_from_traces(self.traces) self.analysis = self._analysis_from_steps(steps)
def to_dict(self): dct = { 'scheme': self.scheme, 'replicas': self.replicas, 'ensembles': self.ensembles, 'ensemble_order': self.ensemble_order, 'ensemble_to_string': self._ensemble_to_string, 'traces': self.traces, 'transitions': self.transitions, 'analysis': self.analysis } return dct @classmethod def from_dict(cls, dct): obj = cls.__new__() obj.scheme = dct['scheme'] obj.replicas = dct['replicas'] obj.n_replcias = len(obj.replicas) obj.ensembles = dct['ensembles'] obj.ensemble_order = dct['ensemble_order'] obj._ensemble_to_string = dct['ensemble_to_string'] obj.traces = dct['traces'] obj.transitions = dct['transitions'] obj.analysis = dct['analysis'] return obj @property def number_to_ensemble(self): return {i: ens for (i, ens) in enumerate(self.ensemble_order)} @property def ensemble_to_number(self): return {ens: i for (i, ens) in enumerate(self.ensemble_order)} @property def string_to_ensemble(self): return {v: k for (k, v) in self.ensemble_to_string.items()} @property def n_ensembles(self): return len(self.ensemble_order) @property def number_to_string(self): ens2str = self.ensemble_to_string num2ens = self.number_to_ensemble return {i: ens2str[ens] for (i, ens) in num2ens.items()} @property def string_to_number(self): str2ens = self.string_to_ensemble ens2num = self.ensemble_to_number return {s: ens2num[ens] for (s, ens) in str2ens.items()} @property def ensemble_order(self): return self._ensemble_order @ensemble_order.setter def ensemble_order(self, value): self._ensemble_order = value default_names = {ens: ens.name for ens in value} default_names.update(self.ensemble_to_string) self.ensemble_to_string = default_names @property def ensemble_to_string(self): return self._ensemble_to_string @ensemble_to_string.setter def ensemble_to_string(self, value): self._ensemble_to_string.update(value) def _analysis_from_steps(self, steps=None): if steps is None: raise RuntimeError("No steps given to analyze!") n_trials = 0 analysis = {} analysis['n_trials'] = {} analysis['n_accepted'] = {} prev = None for step in steps: canonical_mover = step.change.canonical.mover if canonical_mover and canonical_mover.is_ensemble_change_mover: n_trials += 1 hops = [] for old in prev.active: new = step.active if old.replica != new[old.ensemble].replica: # i.e., the prev and step have diff rep in same ens hops.append((old.ensemble, new[old.replica].ensemble)) for hop in hops: try: analysis['n_accepted'][hop] += 1 except KeyError: analysis['n_accepted'][hop] = 1 prev = step # TODO: n_trials no longer needs to be a dict, but other functions # expect that in output, so we return it for key in analysis['n_accepted'].keys(): analysis['n_trials'][key] = n_trials return analysis['n_trials'], analysis['n_accepted'] def _traces_from_steps(self, steps): """ Calculates all the traces (fixed replica or fixed ensemble). """ full_traces = collections.defaultdict(list) for step in steps: for sample in step.active: ens = sample.ensemble rep = sample.replica full_traces[ens].append(rep) full_traces[rep].append(ens) traces = {k: condense_repeats(full_traces[k]) for k in full_traces} return traces def _transitions_from_traces(self, traces): """ Calculate the transitions based on the trace of a given replica. This gives results normalized to *all* move types. Parameters ---------- traces: dict """ transitions = {} for replica in traces: trace = traces[replica] hops = [(trace[i][0], trace[i+1][0]) for i in range(len(trace)-1)] for hop in hops: try: transitions[hop] += 1 except KeyError: transitions[hop] = 1 return transitions def reorder_matrix(self, matrix, index_order): """Return dataframe with matrix row/columns in index_order. Parameters ---------- matrix : a SciPy COO sparse matrix input sparse matrix index_order : list of ensembles or None order to list ensembles. If None, defaults to reverse Cuthill-McKee order. Returns ------- pandas.DataFrame dataframe with rows/columns ordered as desired """ #""" matrix must be a coo_matrix (I think): do other have same `data` #attrib?""" if index_order is None: # reorder based on RCM from scipy.sparse.csgraph rcm_perm = reverse_cuthill_mckee(matrix.tocsr()) rev_perm_dict = {k : rcm_perm.tolist().index(k) for k in rcm_perm} perm_i = [rev_perm_dict[ii] for ii in matrix.row] perm_j = [rev_perm_dict[jj] for jj in matrix.col] new_matrix = scipy.sparse.coo_matrix( (matrix.data, (perm_i, perm_j)), shape=(self.n_ensembles, self.n_ensembles) ) reordered_labels = [self.number_to_string[k] for k in rcm_perm] else: reordered_labels = [self.number_to_string[k] for k in self.number_to_string.keys()] new_matrix = matrix reordered = pd.DataFrame(new_matrix.todense()) reordered.index = reordered_labels reordered.columns = reordered_labels return reordered def matrix_and_dataframe(self, ens_i, ens_j, data, index_order=None): """ Create sparse matrix and pandas.Dataframe from ensemble data. Parameters ---------- ens_i : list of ensembles the "from" ensemble ens_j : list of ensembles the "to" ensemble data : list of floats the data for the transition ensA->ensB, such that matrix[ensA, ensB] = data[k] with ens_i[k]=ensA, ens_j[k]=ensB index_order : order of ensembles for output see `reorder_matrix` """ i = [self.ensemble_to_number[e] for e in ens_i] j = [self.ensemble_to_number[e] for e in ens_j] matrix = scipy.sparse.coo_matrix( (data, (i, j)), shape=(self.n_ensembles, self.n_ensembles) ) df = self.reorder_matrix(matrix, index_order) return (matrix, df) def transition_matrix(self, index_order=None, force=False): """ Create the transition matrix. Parameters ---------- steps : iterable of :class:`.MCStep` input data index_order : list of ensembles or None see `reorder_matrix` force : bool (False) if True, recalculate cached values Returns ------- pandas.DataFrame transition matrix """ (n_try, n_acc) = self.analysis data = [] for k in n_try.keys(): try: n_acc_k = n_acc[k] except KeyError: n_acc_k = 0 data.append(float(n_acc_k) / n_try[k]) ens_i, ens_j = zip(*n_try.keys()) # this part should be the same for all matrices self.acceptance_matrix, df = self.matrix_and_dataframe( ens_i, ens_j, data, index_order ) return df def mixing_matrix(self, steps=None, index_order=None, force=False): """ Create the mixing matrix. Parameters ---------- steps : iterable of :class:`.MCStep` input data index_order : list of ensembles or None see `reorder_matrix` force : bool (False) if True, recalculate cached values Returns ------- pandas.DataFrame mixing matrix """ (n_try, n_acc) = self.analysis data = [] for k in n_try.keys(): try: n_acc_k = n_acc[k] except KeyError: n_acc_k = 0 data.append(float(n_acc_k) * 0.5 / n_try[k]) ens_ii, ens_jj = zip(*n_try.keys()) # symmetrize ens_i = ens_ii + ens_jj ens_j = ens_jj + ens_ii data += data self.mix_matrix, df = self.matrix_and_dataframe( ens_i, ens_j, data, index_order ) return df def flow(self, bottom, top, included_ensembles=None): """ Replica "flow" between ensembles `bottom` and `top`. Replica flow at a given ensemble measures the relative number of visits from replicas which has last visiting the "top" ensemble and those which had last visited the "bottom" ensemble. Ideal flow should be a straight line from 1.0 at "bottom" to 0.0 at "top". Parameters ---------- bottom : paths.Ensemble "bottom" ensemble for this flow calculation top : paths.Ensemble "top" ensemble for this flow calculation steps : iterable of :class:`.MCStep` input data force : bool (False) if True, recalculate cached values References ---------- Katzgraber, Trebst, Huse, and Troyer. J. Stat. Mech. 2006, P03018 (2006). doi:10.1088/1742-5468/2006/03/P03018 """ if included_ensembles is None: included_ensembles = self.ensembles traces = self.traces n_up = { ens : 0 for ens in self.ensembles } n_visit = { ens : 0 for ens in self.ensembles } for replica in self.replicas: trace = self.traces[replica] direction = 0 for (loc, count) in trace: if loc == top: direction = -1 elif loc == bottom: direction = +1 if direction != 0: n_visit[loc] += count if direction == 1: n_up[loc] += count self._flow_up = n_up self._flow_count = n_visit as_dict = {e : float(n_up[e])/n_visit[e] if n_visit[e] > 0 else 0.0 for e in included_ensembles} return as_dict def flow_pd(self, bottom, top): flow_dict = self.flow(bottom, top, steps, force) inverted = {flow_dict[k]: k for k in flow_dict.keys()} sorted_inverted = list(reversed(sorted(inverted.keys()))) re_keyed = {k: sorted_inverted[k] for k in range(len(sorted_inverted))} return pd.Series(re_keyed) def trips(self, bottom, top): """ Calculate round trips, up trips, and down trips. An "up" trip is the number of steps to get from ensemble `bottom` to ensemble `top`. A "down" trip is the reverse. A "round" trip consists of either an up trip followed by a down trip or vice versa. Parameters ---------- bottom : paths.Ensemble ensemble to be considered the "bottom" for these trips top : paths.Ensemble ensemble to be considered the "top" for these trips steps : iterable of :class:`.MCStep` input data force : bool (False) if True, recalculate cached Returns ------- dict keys "up", "down", "round", pointing to values which are a list of the lengths of each trip of that type """ traces = self.traces down_trips = [] up_trips = [] round_trips = [] for replica in self.replicas: trace = traces[replica] direction = None trip_counter = 0 first_direction = None local_down = [] local_up = [] for (loc, count) in trace: if loc == top and direction != +1: direction = +1 if trip_counter > 0: local_up.append(trip_counter) trip_counter = 0 elif loc == bottom and direction != -1: direction = -1 if trip_counter > 0: local_down.append(trip_counter) trip_counter = 0 if direction is not None: if first_direction is None: first_direction = direction trip_counter += count rt_pairs = [] if first_direction == 1: rt_pairs = zip(local_down, local_up) elif first_direction == -1: rt_pairs = zip(local_up, local_down) else: warnstr = "No first direction for replica "+str(replica)+": " warnstr += "Are there no 1-way trips?" logger.warn(warnstr) down_trips.extend(local_down) up_trips.extend(local_up) round_trips.extend([sum(pair) for pair in rt_pairs]) return {'down' : down_trips, 'up' : up_trips, 'round' : round_trips}
[docs] class ReplicaNetworkGraph(object): """ Wrapper for NetworkX graph object generated by replica exchange network. Attributes ---------- repx_network : paths.ReplicaNetwork replica exchange network object """
[docs] def __init__(self, repx_network): (n_try, n_acc) = repx_network.analysis self.graph = nx.Graph() n_accs_adj = {} for k in n_try.keys(): try: n_accs_adj[k] = n_acc[k] except KeyError: n_accs_adj[k] = 0 largest_weight = max(n_accs_adj.values()) for entry in n_try.keys(): self.graph.add_edge( entry[0], entry[1], weight=float(n_accs_adj[entry])/largest_weight ) self.weights = [10*self.graph[u][v]['weight'] for u,v in self.graph.edges()]
def draw(self, layout="graphviz"): """ Lay out and draw graph. Parameters ---------- layout : string ("graphviz") layout method. Default is "graphviz", which also requires installation of pygraphviz. """ if layout == "graphviz": pos = nx.graphviz_layout(self.graph) elif layout == "spring": pos = nx.spring_layout(self.graph) elif layout == "spectral": pos=nx.spectral_layout(self.graph) elif layout == "circular": pos=nx.circular_layout(self.graph) normal = [] msouter = [] minus = [] for node in self.graph.nodes(): if isinstance(node, paths.TISEnsemble): normal.append(node) elif isinstance(node, paths.MinusInterfaceEnsemble): minus.append(node) else: msouter.append(node) nx.draw_networkx_nodes(self.graph, pos, nodelist=normal, node_color='r', node_size=500) nx.draw_networkx_nodes(self.graph, pos, nodelist=minus, node_color='b', node_size=500) nx.draw_networkx_nodes(self.graph, pos, nodelist=msouter, node_color='g', node_size=500) nx.draw_networkx_edges(self.graph, pos, width=self.weights)
# TODO: convert these into functions that do the trace for all # replicas/ensembles in one loop def trace_ensembles_for_replica(replica, steps): """ List of which ensemble a given replica was in at each MC step. Parameters ---------- replica : replica ID steps : iterable of :class:`.MCStep` input data Returns ------- list list of ensembles """ return [s.active[replica].ensemble for s in steps] def trace_replicas_for_ensemble(ensemble, steps): """ List of which replica a given ensemble held at each MC step. Parameters ---------- ensemble : paths.Ensemble selected ensemble steps : iterable of :class:`.MCStep` MC steps to analyze (nonsense if not contiguous Returns ------- list list of replica IDs """ trace = [] for step in steps: sset = step.active trace.append(sset[ensemble].replica) return trace def condense_repeats(ll, use_is=True): """ Count the number of consecutive repeats in a list. Essentially, a way of doing `uniq -c` Parameters ---------- ll : list a list Returns ------- list of tuples list of 2-tuples in the format (element, repeats) where element is the element from the list, and repeats is the number of consecutive times it appeared """ count = 0 old = None vals = [] for e in ll: if (use_is and e is old) or (not use_is and e == old): count += 1 else: if old is not None: vals.append((old, count)) count = 1 old = e vals.append((old, count)) return vals