Source code for openpathsampling.netcdfplus.stores.named

from openpathsampling.netcdfplus.base import StorableNamedObject

from .object import ObjectStore

import logging

logger = logging.getLogger(__name__)
init_log = logging.getLogger('openpathsampling.initialization')

import sys
if sys.version_info > (3, ):
    long = int


[docs] class NamedObjectStore(ObjectStore):
[docs] def __init__(self, content_class, json=True, nestable=False): super(NamedObjectStore, self).__init__( content_class=content_class, json=json, nestable=nestable ) self._names_loaded = False self._name_idx = dict() if self.content_class is not None \ and not issubclass(self.content_class, StorableNamedObject): raise ValueError(( 'Content class "%s" must be subclassed from ' 'StorableNamedObject.') % self.content_class.__name__ )
def initialize(self): """ Initialize the associated storage to allow for object storage. Mainly creates an index dimension with the name of the object. """ super(NamedObjectStore, self).initialize() self.create_variable( "name", 'str', description='The name of the object', chunksizes=tuple([65536]) ) @property def name_idx(self): """ Returns a dictionary of all names pointing to stored indices Returns ------- dict of str : set A dictionary that has all stored names as keys and the values are a set of indices where an object with this name is found. """ # if not done already cache names once # if not self._names_loaded: # self.update_name_cache() return self._name_idx def load_indices(self): super(NamedObjectStore, self).load_indices() self.update_name_cache() def update_name_cache(self): """ Update the internal name cache with all stored names in the store. This allows to load by name for named objects """ if not self._names_loaded: for idx, name in enumerate( self.storage.variables[self.prefix + "_name"][:]): self._update_name_in_cache(name, idx) self._names_loaded = True def _update_name_in_cache(self, name, idx): # make sure to cast unicode to str name = str(name) if name: if name not in self._name_idx: self._name_idx[name] = {idx} else: if idx not in self._name_idx[name]: self._name_idx[name].add(idx) def cache_all(self): """Load all samples as fast as possible into the cache """ if not self._cached_all: idxs = range(len(self)) jsons = self.variables['json'][:] names = self.variables['name'][:] map(self.add_single_to_cache_named, idxs, names, jsons) # [self.add_single_to_cache(i, n, j) for i, n, j in zip( # idxs, # names, # jsons)] self._cached_all = True def add_single_to_cache_named(self, idx, name, json): """ Add a single object to cache by json Parameters ---------- idx : int the index where the object was stored name : str the name of the object if it exists json : str json string the represents a serialized version of the stored object """ if idx not in self.cache: obj = self.simplifier.from_json(json) self._get_id(idx, obj) self.cache[idx] = obj self.index[obj.__uuid__] = idx setattr(obj, '_name', name) return obj def find(self, name): """ Return last object with a given name Parameters ---------- name : str the name to be searched for Returns ------- :py:class:`openpathsampling.netcdfplus.base.StorableObject` the last object with a given name. This is to mimic immutable objects. Once you (re-)save with the same name you replace the old one and hence you leed to load the last stored one. """ return self.load(name) def find_indices(self, name): """ Return indices for all objects with a given name Parameters ---------- name : str the name to be searched for Returns ------- list of int a list of indices in the storage for all found objects, can be empty [] if no objects with that name exist """ return sorted(list(self._name_idx[name])) def find_all(self, name): if len(self._name_idx[name]) > 0: return self[sorted(list(self._name_idx[name]))] # ========================================================================== # LOAD/SAVE DECORATORS FOR CACHE HANDLING # ========================================================================== def load(self, idx): """ Returns an object from the storage. Parameters ---------- idx : int or str either the integer index of the object to be loaded or a string (name) for named objects. This will always return the last object found with the specified name. This allows to effectively change existing objects. Returns ------- :py:class:`openpathsampling.netcdfplus.base.StorableNamedObject` the loaded object """ if type(idx) is str: # we want to load by name and it was not in cache. if idx in self.name_idx: if len(self.name_idx[idx]) > 1: if self._log_debug: logger.debug(( 'Found name "%s" multiple (%d) times in storage! ' 'Loading last!') % ( idx, len(self.cache[idx]))) n_idx = sorted(list(self.name_idx[idx]))[-1] else: raise ValueError('str "' + idx + '" not found in storage') # --- start super of ObjectStore --- elif isinstance(idx, (int, long)): if idx < 1000000000: n_idx = idx elif idx in self.index: n_idx = self.index[idx] else: if self.fallback_store is not None: return self.fallback_store.load(idx) elif self.storage.fallback is not None: return self.storage.fallback.stores[self.name].load(idx) else: raise ValueError( 'str %s not found in storage or fallback' % idx) # elif type(idx) is not int: # raise ValueError(( # 'indices of type "%s" are not allowed in named storage ' # '(only str and int)') % type(idx).__name__ # ) # else: # n_idx = int(idx) if n_idx < 0: return None # if it is in the cache, return it try: obj = self.cache[n_idx] if self._log_debug: logger.debug( 'Found IDX #' + str(idx) + ' in cache. Not loading!') return obj except KeyError: pass if self._log_debug: logger.debug( 'Calling load object of type `%s` @ IDX #%d' % (self.content_class.__name__, n_idx)) if n_idx >= len(self): logger.warning( 'Trying to load from IDX #%d > number of object %d' % (n_idx, len(self))) return None elif n_idx < 0: logger.warning(( 'Trying to load negative IDX #%d < 0. ' 'This should never happen!!!') % n_idx) raise RuntimeError( 'Loading of negative int should result in no object. ' 'This should never happen!') else: obj = self._load(n_idx) if self._log_debug: logger.debug( 'Calling load object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, n_idx)) if obj is not None: self._get_id(n_idx, obj) setattr(obj, '_name', self.storage.variables[self.prefix + '_name'][n_idx]) # make sure that you cannot change the name of loaded objects obj.fix_name() # finally store the name of a named object in cache # self._update_name_in_cache(obj._name, n_idx) # update cache there might have been a change due to naming self.cache[n_idx] = obj if self._log_debug: logger.debug( 'Try loading UUID object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, n_idx)) if self._log_debug: logger.debug( 'Finished load object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, n_idx)) # --- end --- return obj def save(self, obj, idx=None): """ Saves an object to the storage. Parameters ---------- obj : :py:class:`openpathsampling.netcdfplus.base.StorableNamedObject` the object to be stored idx : int or string or `None` the index to be used for storing. This is highly discouraged since it changes an immutable object (at least in the storage). It is better to store also the new object and just ignore the previously stored one. """ is_str = type(idx) is str if not is_str and idx is not None: raise ValueError( 'Unsupported index type (only str or None allowed).') name = obj._name if is_str: obj.name = idx name = obj._name if name is None: # this should not happen! logger.debug( "Nameable object has not been initialized correctly. " "Has None in _name") raise AttributeError( '_name needs to be a string for nameable objects.') obj_fixed = obj._name_fixed obj_name = obj._name # we fix the name just in case we try in recursive saving to store # one object twice with different names. Storing with the same name # is fine! obj.fix_name() try: reference = super(NamedObjectStore, self).save(obj) except: # if saving did not work unlock the name if is was un-fixed before obj._name_fixed = obj_fixed obj._name = obj_name raise n_idx = self.index[obj.__uuid__] self.storage.variables[self.prefix + '_name'][n_idx] = name self._update_name_in_cache(name, n_idx) return reference
[docs] class UniqueNamedObjectStore(NamedObjectStore): # ========================================================================== # LOAD/SAVE DECORATORS FOR CACHE HANDLING # ==========================================================================
[docs] def __init__(self, content_class, json=True, nestable=False): super(UniqueNamedObjectStore, self).__init__( content_class=content_class, json=json, nestable=nestable) self._free_name = set()
def reserve_name(self, name): """ Locks a name as used Parameters ---------- name : str the name to be locked for storage """ if name != "": self._free_name.add(name) def release_name(self, name): """ Releases a locked name Parameters ---------- name : str the name to be released for being used as a name """ self._free_name.discard(name) def is_name_locked(self, name): """ Test whether in a unique name store a name is already taken Parameters ---------- name : str or `None` the name to be tested. Returns ------- bool the result of the test. If the name exists or is reserved during a saving event this will return `True` and return `False` if the name is free. """ if name is None: return False return name in self.name_idx or name in self._free_name def save(self, obj, idx=None): """ Saves an object to the storage. Parameters ---------- obj : :py:class:`openpathsampling.netcdfplus.base.StorableNamedObject` the object to be stored idx : string or `None` the index to be used for storing. This is highly discouraged since it changes an immutable object (at least in the storage). It is better to store also the new object and just ignore the previously stored one. """ is_str = type(idx) is str if not is_str and idx is not None: raise ValueError( 'Unsupported index type (only str or None allowed).') name = obj._name fixed = obj._name_fixed err = list() if is_str: if fixed: if name != idx: # saving fixed under different name is not possible. if obj in self: err.append( ('Cannot rename object to "%s". ' 'Already saved with name "%s" !') % (idx, name) ) else: err.append( ('Cannot rename object to "%s". ' 'Already fixed name "%s" !') % (idx, name) ) if self.is_name_locked(name): err.append( ('Current name "%s" is also already taken in ' 'unique name store. This means you cannot ' 'save object "%s" at all. ' 'In general this should not happen to unsaved ' 'objects unless you fixed the name of the ' 'object yourself. Check your code ' 'for the generation of objects of the same ' 'name.') % (name, obj) ) else: err.append( ('Current name "%s" is still free. Saving ' 'without giving a specific name ' 'should work. If that is what you want ' 'to do.') % name ) else: # already fixed, but with same name. Okay. Check if stored if obj in self.index: return self.reference(obj) else: # name is not fixed yet. Check, if we can save or name is taken if self.is_name_locked(idx): err.append( ('New name "%s" already taken in unique name store. ' + 'Try different name instead.') % idx ) if self.is_name_locked(name): err.append(( 'Current name "%s" already taken in unique ' 'name store. ') % name ) else: err.append( ('Current name "%s" is still free. Saving without ' 'giving a specific name should work') % name ) else: if fixed: # no new name, but fixed. Check if already stored. if obj.__uuid__ in self.index: return self.reference(obj) # if not stored yet check if we could if self.is_name_locked(name): err.append( ('Current name "%s" is already taken in unique name ' 'store. This means you cannot save object "%s" at ' 'all. In general this should not happen to unsaved ' 'objects unless you fixed the name of the object ' 'yourself. Check your code for the generation of ' 'objects of the same name.') % (name, obj) ) else: # no new name and not fixed. Just check if current name is taken if self.is_name_locked(name): err.append( ('Current name "%s" is already taken in unique name ' 'store %s. Try renaming object or saving using other ' 'name.') % (name, self.name) ) if len(err) > 0: raise RuntimeWarning('/n'.join(err)) # no errors, reserve the name for nested saving and actually call save self.reserve_name(name) try: reference = super(UniqueNamedObjectStore, self).save(obj, idx) finally: self.release_name(name) return reference