Source code for openpathsampling.high_level.transition

import logging

import numpy as np

import openpathsampling as paths
from openpathsampling.numerics import (
    Histogram, histograms_to_pandas_dataframe, LookupFunction, Histogrammer
)
from openpathsampling.numerics import WHAM
from openpathsampling.netcdfplus import StorableNamedObject

from openpathsampling.analysis.tools import (
    pathlength, max_lambdas, guess_interface_lambda, minus_sides_summary,
    sampleset_sample_generator
)

logger = logging.getLogger(__name__)


[docs] class Transition(StorableNamedObject): """ Describes (in general) a transition between two states. """
[docs] def __init__(self, stateA, stateB): super(Transition, self).__init__() self.stateA = stateA self.stateB = stateB
# whoops: can't be set here, but must be set in subclass # TODO: make that work in a more sensible way #self.ensembles = [] @property def all_ensembles(self): return self.ensembles def to_dict(self): return { 'stateA' : self.stateA, 'stateB' : self.stateB, } @classmethod def from_dict(cls, dct): return Transition( stateA=dct['stateA'], stateB=dct['stateB'] )
[docs] class TPSTransition(Transition): """ Transition using TPS ensembles """
[docs] def __init__(self, stateA, stateB, name=None): super(TPSTransition, self).__init__(stateA, stateB) if name is not None: self.name = name if not hasattr(self, "ensembles"): self.ensembles = [self._tps_ensemble(stateA, stateB)]
def to_dict(self): return { 'stateA' : self.stateA, 'stateB' : self.stateB, 'ensembles' : self.ensembles, 'name' : self.name } @classmethod def from_dict(cls, dct): if 'name' not in dct: dct['name'] = None mytrans = TPSTransition(dct['stateA'], dct['stateB'], dct['name']) mytrans.ensembles = dct['ensembles'] return mytrans def _tps_ensemble(self, stateA, stateB): return paths.SequentialEnsemble([ paths.AllInXEnsemble(stateA) & paths.LengthEnsemble(1), paths.AllOutXEnsemble(stateA | stateB), paths.AllInXEnsemble(stateB) & paths.LengthEnsemble(1) ]) def add_transition(self, stateA, stateB): new_ens = self._tps_ensemble(stateA, stateB) try: self.ensembles[0] = self.ensembles[0] | new_ens except AttributeError: self.ensembles = [new_ens]
[docs] class FixedLengthTPSTransition(TPSTransition): """Transition using fixed length TPS ensembles"""
[docs] def __init__(self, stateA, stateB, length, name=None): self.length = length super(FixedLengthTPSTransition, self).__init__(stateA, stateB, name)
def to_dict(self): dct = super(FixedLengthTPSTransition, self).to_dict() dct['length'] = self.length return dct @classmethod def from_dict(cls, dct): mytrans = super(FixedLengthTPSTransition, cls).from_dict(dct) mytrans.length = dct['length'] return mytrans def _tps_ensemble(self, stateA, stateB): return paths.SequentialEnsemble([ paths.LengthEnsemble(1) & paths.AllInXEnsemble(stateA), paths.LengthEnsemble(self.length - 2), paths.LengthEnsemble(1) & paths.AllInXEnsemble(stateB) ])
[docs] class TISTransition(Transition): """ Transition using TIS ensembles. The additional information from the TIS ensembles allows us to set up all the analysis (assuming we built these are proper TIS ensembles, which we DO in the intitialization!) Parameters ---------- stateA : Volume Volume for the state from which the transition begins stateB : Volume Volume for the state in which the transition ends interfaces : list of Volume Volumes for the interfaces orderparameter : CollectiveVariable order parameter to be used in the analysis (does not need to be the parameter which defines the interfaces, although it usually is) name : string name for the transition """
[docs] def __init__(self, stateA, stateB, interfaces, orderparameter=None, name=None, name_suffix=""): super(TISTransition, self).__init__(stateA, stateB) self.stateA = stateA self.stateB = stateB self.interfaces = interfaces self.name_suffix = name_suffix if name is not None: self.name = name # If we reload from a storage file, we want to use the # ensembles from the file, not the automatically generated # ones here # build ensembles if we don't already have them self.orderparameter = orderparameter if not hasattr(self, "ensembles"): self._build_ensembles(self.stateA, self.stateB, self.interfaces, self.orderparameter) self.default_orderparameter = self.orderparameter self.total_crossing_probability_method = "wham" self.histograms = {} # caches for the results of our calculation self._flux = None self._rate = None self.hist_args = {} # shortcut to ensemble_histogram_info[].hist_args self.ensemble_histogram_info = { 'max_lambda' : Histogrammer( f=max_lambdas, f_args={'orderparameter' : self.orderparameter}, hist_args={} ), 'pathlength' : Histogrammer( f=pathlength, f_args={}, hist_args={} ) } self.minus_ensemble = paths.MinusInterfaceEnsemble( state_vol=stateA, innermost_vols=interfaces[0], forbidden=stateB ).named("Out " + stateA.name + " minus" + self.name_suffix)
def copy(self, with_results=True): copy = self.from_dict(self.to_dict()) copy.copy_analysis_from(self) return copy def copy_analysis_from(self, other): self.default_orderparameter = other.default_orderparameter self.total_crossing_probability_method = other.total_crossing_probability_method self.hist_args = other.hist_args self.ensemble_histogram_info = other.ensemble_histogram_info self.histograms = other.histograms self._flux = other._flux self._rate = other._rate try: self.tcp = other.tcp except AttributeError: pass try: self.ctp = other.ctp except AttributeError: pass def __str__(self): mystr = str(self.__class__.__name__) + ": " + str(self.name) + "\n" mystr += (str(self.stateA.name) + " -> " + str(self.stateA.name) + " or " + str(self.stateB.name) + "\n") for iface in self.interfaces: mystr += "Interface: " + str(iface.name) + "\n" return mystr def _build_ensembles(self, stateA, stateB, interfaces, orderparameter): try: cv = interfaces.cv except AttributeError: cv = orderparameter if cv is None: # in case interface.cv is None and orderparameter is not None cv = orderparameter try: cv_max = interfaces.cv_max except AttributeError: cv_max = None try: lambdas = [interfaces.get_lambda(iface_vol) for iface_vol in interfaces] except AttributeError: lambdas = [None] * len(interfaces) self.ensembles = [ paths.TISEnsemble( initial_states=stateA, final_states=stateB, interface=iface_vol, orderparameter=cv, cv_max=cv_max, lambda_i=lambda_i ) for (iface_vol, lambda_i) in zip(interfaces, lambdas) ] #self.ensembles = paths.EnsembleFactory.TISEnsembleSet( #stateA, stateB, self.interfaces, orderparameter #) for idx, ensemble in enumerate(self.ensembles): ensemble.named(self.name + " " + str(idx) + self.name_suffix) # parameters for different types of output def _ensemble_statistics(self, ensemble, samples, weights=None, force=False): """Calculate stats for a given ensemble: path length, crossing prob In general we do all of these at once because the extra cost of running through the samples twice is worse than doing the extra calculations. Parameters ---------- ensemble: Ensemble samples : iterator over samples """ # figure out which histograms need to updated for this ensemble run_it = [] if not force: # TODO figure out which need to be rerun pass else: run_it = list(self.ensemble_histogram_info.keys()) for hist in run_it: hist_info = self.ensemble_histogram_info[hist] if hist_info.hist_args == {} and self.hist_args[hist] != {}: hist_info.hist_args = self.hist_args[hist] if hist not in self.histograms.keys(): self.histograms[hist] = {} self.histograms[hist][ensemble] = Histogram(**(hist_info.hist_args)) in_ens_samples = (s for s in samples if s.ensemble.__uuid__ == ensemble.__uuid__) hist_data = {} buflen = -1 sample_buf = [] prev_sample = {h: None for h in run_it} prev_result = {h: None for h in run_it} for sample in in_ens_samples: for hist in run_it: if sample is prev_sample[hist]: hist_data_sample = prev_result[hist] else: hist_info = self.ensemble_histogram_info[hist] hist_data_sample = hist_info.f(sample, **hist_info.f_args) prev_result[hist] = hist_data_sample prev_sample[hist] = sample try: hist_data[hist].append(hist_data_sample) except KeyError: hist_data[hist] = [hist_data_sample] for hist in run_it: self.histograms[hist][ensemble].histogram(hist_data[hist], weights) self.histograms[hist][ensemble].name = (hist + " " + self.name + " " + ensemble.name) def _all_statistics(self, steps, weights=None, force=False): """ Run all statistics for all ensembles. """ # TODO: speed this up by just running over all samples once and # dealing them out to the appropriate histograms for ens in self.ensembles: samples = sampleset_sample_generator(steps) self._ensemble_statistics(ens, samples, weights, force) def pathlength_histogram(self, ensemble): """ Return the pathlength histogram for the given ensemble. """ # check existence and correctness of self.histograms[pl][ens] if "pathlength" not in self.histograms: self.histograms['pathlength'] = {} hist = self.histograms['pathlength'][ensemble] return hist.normalized() def crossing_probability(self, ensemble, n_blocks=1): """ Return the crossing probability for the given ensemble. """ # check existence and correctness of self.histograms[cp][ens] hist = self.histograms['crossing_probability'][ensemble] return hist.reverse_cumulative() def total_crossing_probability(self, steps=None, method="wham", force=False): """Return the total crossing probability using `method` Parameters ---------- steps : iterable of :class:`.MCStep` cycles to be analyzed method : "wham" (later: or "mbar" or "tram") approach to use to combine the histograms force : bool (False) if true, cached results are overwritten """ if method == "wham": run_ensembles = False for ens in self.ensembles: try: hist = self.histograms['max_lambda'][ens] except KeyError: run_ensembles = True if run_ensembles or force: if steps is None: raise RuntimeError("Unable to build histograms without steps source") self._all_statistics(steps, force=True) df = histograms_to_pandas_dataframe( self.histograms['max_lambda'].values(), fcn="reverse_cumulative" ).sort_index(axis=1) # if lambdas not set, returns None and WHAM uses fallback lambdas = self.interfaces.lambdas wham = WHAM(interfaces=lambdas) # wham.load_from_dataframe(df) # wham.clean_leading_ones() tcp = wham.wham_bam_histogram(df).to_dict() # elif method == "mbar": # pass else: raise ValueError("Only supported method is 'wham'. " + "'mbar' is not yet implemented!") self.tcp = LookupFunction(tcp.keys(), tcp.values()) return self.tcp def conditional_transition_probability(self, steps, ensemble, force=False): """ This transition's conditional transition probability for a given ensemble. The conditional transition probability for an ensemble is the probability that a path in that ensemble makes the transition from state A to state B. Parameters ---------- steps : iterable of :class:`.MCStep` cycles to analyze ensemble : Ensemble which ensemble to calculate the CTP for force : bool (False) if true, cached results are overwritten """ samples = sampleset_sample_generator(steps) n_acc = 0 n_try = 0 for samp in samples: if samp.ensemble is ensemble: if self.stateB(samp.trajectory.get_as_proxy(-1)): n_acc += 1 n_try += 1 ctp = float(n_acc)/n_try logger.info("CTP: " + str(n_acc) + "/" + str(n_try) + "=" + str(ctp) + "\n") try: self.ctp[ensemble] = ctp except AttributeError: self.ctp = { ensemble : ctp } return ctp def rate(self, steps, flux=None, outer_ensemble=None, outer_lambda=None, error=None, force=False): """Calculate the rate for this transition. For TIS transitions, this requires the result of an external calculation of the flux. Parameters ========== steps : iterable of :class:`.MCStep` flux : float outer_ensemble : openpathsampling.TISEnsemble error : list(3) or None """ logger.info("Rate for " + self.stateA.name + " -> " + self.stateB.name) # get the flux if flux is None: # TODO: find a way to raise error if bad flux flux = self._minus_move_flux(steps) if flux is not None: self._flux = flux if self._flux is None: raise ValueError( "No flux available to TISTransition. Cannot calculate rate" ) flux = self._flux # get the total crossing probability if not force and hasattr(self, 'tcp'): tcp = self.tcp else: tcp = self.total_crossing_probability(steps=steps, force=force) # get the conditional transition probability if outer_ensemble is None: outer_ensemble = self.ensembles[-1] logger.info("outer ensemble: " + outer_ensemble.name + " " + repr(outer_ensemble)) outer_cross_prob = self.histograms['max_lambda'][outer_ensemble] outer_lambda = self.interfaces.get_lambda(self.interfaces[-1]) if outer_lambda is None: outer_lambda = guess_interface_lambda(outer_cross_prob) # lambda_bin = -1 # outer_cp_vals = outer_cross_prob.reverse_cumulative().values() # # should be (almost) 1.0 for anything before correct lambda # while (abs(outer_cp_vals[lambda_bin+1] - 1.0) < 1e-7): # lambda_bin += 1 # outer_lambda = outer_cross_prob.bins[lambda_bin] logger.info("outer lambda: " + str(outer_lambda)) ctp = self.conditional_transition_probability(steps, outer_ensemble, force=force) outer_tcp = tcp(outer_lambda) #print outer_lambda #print flux, outer_tcp, ctp self._rate = flux*outer_tcp*ctp logger.info("RATE = " + str(self._rate)) logger.info("flux * outer_tcp * ctp = " + str(flux) + " * " + str(outer_tcp) + " * " + str(ctp)) return self._rate def to_dict(self): ret_dict = { 'stateA': self.stateA, 'stateB': self.stateB, 'orderparameter': self.orderparameter, 'interfaces': self.interfaces, 'name': self.name, 'name_suffix': self.name_suffix, 'ensembles': self.ensembles, 'minus_ensemble': self.minus_ensemble } return ret_dict @classmethod def from_dict(cls, dct): if 'name' not in dct: dct['name'] = None minus_ensemble = dct.pop('minus_ensemble') ensembles = dct.pop('ensembles') mytrans = TISTransition(**dct) mytrans.minus_ensemble = minus_ensemble mytrans.ensembles = ensembles return mytrans @property def all_ensembles(self): return self.ensembles + [self.minus_ensemble] def _minus_move_flux(self, steps, force=False): """ Calculate the flux based on the minus ensemble trajectories. """ if not force and self._flux != None: return self._flux self.minus_count_sides = {"in": [], "out": []} # NOTE: this assumes that minus mover is the only thing with the # minus mover's signature. TODO: switch this back to being # mover-based when we move all analysis out of the network objects minus_steps = ( step for step in steps if (self.minus_ensemble in [s.ensemble for s in step.change.trials] and step.change.accepted and step.change.mover is not None) ) #for move in minus_moves: #minus_samp = [s for s in move.results #if s.ensemble is self.minus_ensemble][0] minus_movers_used = {} for step in minus_steps: minus_samp = step.active[self.minus_ensemble] minus_trajectory = minus_samp.trajectory minus_summ = minus_sides_summary(minus_trajectory, self.minus_ensemble) for key in self.minus_count_sides.keys(): self.minus_count_sides[key].extend(minus_summ[key]) try: minus_movers_used[step.change.canonical.mover] += 1 except KeyError: minus_movers_used[step.change.canonical.mover] = 1 for key in self.minus_count_sides.keys(): if len(self.minus_count_sides[key]) == 0: logger.warn("No instances of "+str(key)+" for minus move.") # print minus_movers_used t_in_avg = np.array(self.minus_count_sides['in']).mean() t_out_avg = np.array(self.minus_count_sides['out']).mean() if len(set(minus_movers_used)) != 1: # TODO: someday, this may not need to be forbidden, although I # don't think it will be useful. For now, this is important for # testing. Minimum, important that all have the same timestep raise RuntimeError(str(len(minus_movers_used)) + " minus movers for the same ensemble?") engine_dt = list(minus_movers_used.keys())[0].engine.snapshot_timestep flux = 1.0 / (t_in_avg + t_out_avg) / engine_dt self._flux = flux return self._flux