Source code for openpathsampling.netcdfplus.netcdfplus

"""
author
"""

import abc
import logging
import os.path
from collections import OrderedDict
from uuid import UUID

import netCDF4
import numpy as np
from .dictify import UUIDObjectJSON
from .stores import NamedObjectStore, ObjectStore, PseudoAttributeStore
from .proxy import LoaderProxy

import sys
if sys.version_info > (3, ):
    unicode = str

logger = logging.getLogger(__name__)
init_log = logging.getLogger('openpathsampling.initialization')

from openpathsampling.integration_tools import unit as u
from openpathsampling.integration_tools import HAS_SIMTK_UNIT


# ==============================================================================
# Extended NetCDF Storage for multiple forked trajectories
# ==============================================================================

[docs] class NetCDFPlus(netCDF4.Dataset): """ Extension of the python netCDF wrapper for easier storage of python objects """ support_simtk_unit = HAS_SIMTK_UNIT @property def _netcdfplus_version_(self): import openpathsampling.netcdfplus.version as v version = v.short_version return version _type_conversion = { 'float': np.float32, 'int': np.int32, 'long': np.int64, 'index': np.int32, 'length': np.int32, 'bool': np.int16, 'str': str, 'json': str, 'jsonobj': str, 'numpy.float32': np.float32, 'numpy.float64': np.float64, 'numpy.int8': np.int8, 'numpy.int16': np.int16, 'numpy.int32': np.int32, 'numpy.int64': np.int64, 'numpy.uint8': np.uint8, 'numpy.uint16': np.uint16, 'numpy.uint32': np.uint32, 'numpy.uint64': np.uint64, 'store': str, 'obj': np.int32, 'lazyobj': np.int32, 'uuid': str } class ValueDelegate(object): """ Value delegate for objects that implement __getitem__ and __setitem__ It will basically just wrap values that are used in a dict like structure with getter and setter function to allow easier conversion delegate[x] is equivalent to delegate.getter(delegate.variable[x]) Attributes ---------- variable : dict-like the dict to be wrapped getter : function the function applied to results from running the __getitem__ on the variable setter : function the function applied to the value to be stored using __setitem__ on the variable store : openpathsampling.netcdfplus.ObjectStore a reference to an object store used for convenience in some cases """ def __init__(self, variable, getter=None, setter=None, store=None): self.variable = variable self.store = store if setter is None: # None should not be used setter = lambda v: v self.__setitem__ = variable.__setitem__ if getter is None: getter = lambda v: v self.__getitem__ = variable.__getitem__ self.getter = getter self.setter = setter if not HAS_SIMTK_UNIT: self.support_simtk_unit = False def __setitem__(self, key, value): self.variable[key] = self.setter(value) def __getitem__(self, key): # print(self.variable[key]) # print(type(self.variable[key])) return self.getter(self.variable[key]) def __getattr__(self, item): return getattr(self.variable, item) def __str__(self): return str(self.variable) def __repr__(self): return repr(self.variable) def __len__(self): return len(self.variable) @property def objects(self): """ Return a dictionary of all objects stored. This is similar to the netcdf `.variables` for all stored variables. This allows to write `storage.objects['samples'][idx]` like we write `storage.variables['ensemble_json'][idx]` """ return self._stores def find_store(self, obj): """ Return the default store used for an storable object Parameters ---------- obj : :class:`openpathsampling.netcdfplus.StorableObject` the storable object to be tested Returns ------- :class:`openpathsampling.netcdfplus.ObjectStore` the store that is used by default to store the given storable obj """ if type(obj) is type or type(obj) is abc.ABCMeta: if obj not in self._obj_store: raise ValueError( 'Objects of class "%s" are not storable in this store.' % obj.__name__) return self._obj_store[obj] else: if obj.__class__ not in self._obj_store: raise ValueError( 'Objects of class "%s" are not storable in this store.' % obj.__class__.__name__) return self._obj_store[obj.__class__] def update_storable_classes(self): self.simplifier.update_class_list() def _create_storages(self): """ Function to be called automatically to register all object stores This will usually only be called in subclassed storages. """ # todo: add CVStore, rename to attribute pass
[docs] def __init__(self, filename, mode=None, fallback=None): """ Create a storage for complex objects in a netCDF file Parameters ---------- filename : string filename of the netcdf file to be used or created mode : str the mode of file creation, one of 'w' (write), 'a' (append) or 'r' (read-only) None, which will append any existing files (equal to append), is the default. fallback : :class:`openpathsampling.Storage` the _fallback_ storage to be loaded from if an object is not present in this storage. By default you will not try to resave objects that could be found in the fallback. Note that the fall back does only work if `use_uuid` is enabled Notes ----- A single file can be opened by multiple storages, but only one can be used for writing """ if mode is None: mode = 'a' self.mode = mode exists = os.path.isfile(filename) if exists and mode == 'a': logger.info( "Open existing netCDF file '%s' for appending - " "appending existing file", filename) elif exists and mode == 'w': logger.info( "Create new netCDF file '%s' for writing - " "deleting existing file", filename) elif not exists and mode == 'a': logger.info( "Create new netCDF file '%s' for appending - " "appending non-existing file", filename) elif not exists and mode == 'w': logger.info( "Create new netCDF file '%s' for writing - " "creating new file", filename) elif not exists and mode == 'r': logger.info( "Open existing netCDF file '%s' for reading - " "file does not exist", filename) raise RuntimeError("File '%s' does not exist." % filename) elif exists and mode == 'r': logger.info( "Open existing netCDF file '%s' for reading - " "reading from existing file", filename) self._filename = os.path.abspath(filename) self.fallback = fallback # this can be set to false to re-store objects present in the fallback self.exclude_from_fallback = True # this can be set to false to re-store proxies from other stores self.exclude_proxy_from_other = False # call netCDF4-python to create or open .nc file super(NetCDFPlus, self).__init__(filename, mode) self._setup_class() if mode == 'w': logger.info("Setup netCDF file and create variables") self.setncattr('format', 'netcdf+') self.setncattr('ncplus_version', self._netcdfplus_version_) self.write_meta() # add shared scalar dimension for everyone self.create_dimension('scalar', 1) self.create_dimension('pair', 2) self.setncattr('use_uuid', 'True') self._create_simplifier() # create the store that holds stores store_stores = NamedObjectStore(ObjectStore) store_stores.name = 'stores' self.register_store('stores', store_stores) self.stores.initialize() self.stores.set_caching(True) self.update_delegates() # now create all storages in subclasses self._create_storages() self.create_store('attributes', PseudoAttributeStore()) # call the subclass specific initialization self._initialize() # this will create all variables in the storage for all new # added stores this is often already call inside of _initialize. # If not we just make sure self.finalize_stores() logger.info("Finished setting up netCDF file") self.sync() elif mode == 'a' or mode == 'r+' or mode == 'r': logger.debug("Restore the dict of units from the storage") self.check_version() # self.reference_by_uuid = hasattr(self, 'use_uuid') # self.reference_by_uuid = True self._create_simplifier() # open the store that contains all stores self.register_store('stores', NamedObjectStore(ObjectStore)) self.stores.set_caching(True) self.create_variable_delegate('stores_json') self.create_variable_delegate('stores_name') self.create_variable_delegate('stores_uuid') self.stores.restore() # Create a dict of openmm.Unit() instances for all netCDF.Variable() for variable_name in self.variables: variable = self.variables[variable_name] if self.support_simtk_unit: if hasattr(variable, 'unit_simtk'): unit_dict = self.simplifier.from_json( getattr(variable, 'unit_simtk')) if unit_dict is not None: unit = self.simplifier.unit_from_dict(unit_dict) else: unit = self.simplifier.unit_from_dict(u.Unit({})) self.units[str(variable_name)] = unit # register all stores that are listed in self.stores for store in self.stores: if store is not None: logger.debug("Register store %s in the storage" % store.name) self.register_store(store.name, store) store.register(self, store.name) self.update_delegates() self._restore_storages() # only if we have a new style file if hasattr(self, 'attributes'): for attribute, store in zip( self.attributes, self.attributes.vars['cache'] ): if store is not None: key_store = self.attributes.key_store(attribute) key_store.attribute_list[attribute] = store # call the subclass specific restore in case there is more stuff # to prepare self._restore() self.set_auto_mask(False)
# self.set_always_mask(False) ## didn't fix; errors older versions def _create_simplifier(self): self.simplifier = UUIDObjectJSON(self) @property def filename(self): return self._filename @property def file_size(self): return os.path.getsize(self.filename) @property def file_size_str(self): current = float(self.file_size) output_prefix = '' for prefix in ["k", "M", "G"]: if current >= 1024: output_prefix = prefix current /= 1024.0 return "{0:.2f}{1}B".format(current, output_prefix) @staticmethod def _cmp_version(v1, v2): # we only look at x.y.z parts def version_parts(v): return v.split('-')[0].split('+')[0].split('.')[:3] q1 = version_parts(v1) q2 = version_parts(v2) # q1 = v1.split('.')[:3] # q2 = v2.split('.')[:3] # q1 = v1.split('-')[0].split('.') # q2 = v2.split('-')[0].split('.') for v1, v2 in zip(q1, q2): if int(v1) > int(v2): return +1 elif int(v1) < int(v2): return -1 return 0 def check_version(self): try: s1 = self.getncattr('ncplus_version') except AttributeError: logger.info( 'Using netcdfplus Pre 1.0 version. ' 'No version detected using 0.0.0') s1 = '0.0.0' s2 = self._netcdfplus_version_ cp = self._cmp_version(s1, s2) if cp != 0: logger.info('Loading different netcdf version. ' 'Installed version is ' '%s and loaded version is %s' % (s2, s1)) if cp > 0: logger.info( 'Loaded version is newer consider upgrading your ' 'conda package!') else: logger.info( 'Loaded version is older. Should be no problem other then ' 'missing features and information') def write_meta(self): pass def _setup_class(self): """ Sets the basic properties for the storage """ self._stores = OrderedDict() self._objects = {} self._obj_store = {} self._storages_base_cls = {} self.vars = dict() self.units = dict() def create_store(self, name, store, register_attr=True): """ Create a special variable type `obj.name` that can hold storable objects Parameters ---------- name : str the name of the store inside this storage store : :class:`openpathsampling.netcdf.ObjectStore` the store to be added to this storage register_attr : bool if `True` the store will be added to the storage as an attribute with name `name` """ self.register_store(name, store, register_attr=register_attr) store.name = name self.stores.save(store) def finalize_stores(self): """ Run initializations for all added stores. This will make sure that all previously added stores are now useable. If you add more stores you need to call this again. The reason this is done at all is that stores might reference each other and so no unique order of creation can be found. Thus you first create stores with all their dependencies and then finalize all of them together. """ for store in self._stores.values(): if not store.is_created(): logger.info("Initializing store '%s'" % store.name) store.initialize() for store in self._stores.values(): if not store.is_created(): logger.info("Initializing store '%s'" % store.name) store.initialize() self.update_delegates() self.simplifier.update_class_list() def register_store(self, name, store, register_attr=True): """ Add a object store to the file An object store is a special type of variable that allows to store python objects Parameters ---------- name : str the name of the store under which the objects are accessible like `store.{name}` store : :class:`openpathsampling.storages.ObjectStore` instance of the object store register_attr : bool if set to false the store will not be accesible as an attribute. `True` is the default. """ store.register(self, name) if register_attr: if hasattr(self, name): raise ValueError('Store name %s is already in use!' % name) setattr(self, store.prefix, store) self._stores[name] = store if store.content_class is not None: self._objects[store.content_class] = store self._obj_store[store.content_class] = store self._obj_store.update( {cls: store for cls in store.content_class.descendants()}) def _initialize(self): """ Function run after a new file is created. This is used to setup all variables in the storage """ pass def _restore(self): """ Function run after an existing file is opened. This is used in special storages to complete reading existing files. """ pass def __repr__(self): return "Storage @ '" + self.filename + "'" def __getattr__(self, item): try: return self.__dict__[item] except KeyError: try: return self.__class__.__dict__[item] except KeyError: raise AttributeError( "'{cls}' object has no attribute '{itm}'".format( cls=self.__class__, itm=item ) ) def __setattr__(self, key, value): self.__dict__[key] = value def _init_storages(self): """ Run the initialization on all added classes Notes ----- Only runs when the storage is created. """ for storage in self._stores.values(): storage.initialize() self.update_delegates() def _restore_storages(self): """ Run the restore method on all added classes Notes ----- Only runs when an existing storage is opened. """ for storage in self._stores.values(): storage.restore() storage._created = True def list_stores(self): """ Return a list of registered stores Returns ------- list of str list of stores that can be accessed using `storage.[store]` """ return [store.prefix for store in self._stores.values()] def list_storable_objects(self): """ Return a list of storable object base classes Returns ------- list of type list of base classes that can be stored using `storage.save(obj)` """ return [ store.content_class for store in self.objects.values() if store.content_class is not None] def save(self, obj, idx=None): """ Save a storable object into the correct Storage in the netCDF file Parameters ---------- obj : :class:`StorableObject` the object to store idx : str the name to be stored by Returns ------- str the class name of the BaseClass of the stored object, which is needed when loading the object to identify the correct storage """ if type(obj) is list: # a list of objects will be stored one by one return [self.save(part, idx) for part in obj] elif type(obj) is tuple: # a tuple will store all parts return [self.save(part, idx) for part in obj] elif obj.__class__ in self._obj_store: # to store we just check if the base_class is present in the # storages also we assume that if a class has no base_cls store = self.find_store(obj) store_idx = self.stores.index[store.__uuid__] return store, store_idx, store.save(obj, idx) # Could not save this object. raise RuntimeWarning("Objects of type '%s' cannot be stored!" % obj.__class__.__name__) def __contains__(self, item): if type(item) is list: # a list of objects will be stored one by one return [part in self for part in item] elif type(item) is tuple: # a tuple will store all parts return tuple([part in self for part in item]) elif item.__class__ in self._obj_store: # to store we just check if the base_class is present in the # storages also we assume that if a class has no base_cls store = self.find_store(item) return item in store return False def load(self, uuid): """ Load an object from the storage Parameters ---------- uuid : uuid.UUID the uuid to be loaded Returns ------- :class:`openpathsampling.netcdfplus.StorableObject` the object loaded from the storage Notes ----- this only works in storages with uuids otherwise load directly from the substores """ for store in self.objects.values(): if uuid in store.index: return store[uuid] raise KeyError("UUID %s not found in storage" % uuid) def idx(self, obj): """ Return the index used to store the given object in this storage Parameters ---------- obj : object The stored object from which the index is to be returned """ if hasattr(obj, 'base_cls'): store = self._objects[obj.base_cls] return store.idx(obj) def repr_json(self, obj): """ Return the JSON representation in the storage if available Parameters ---------- obj : :class:`openpathsampling.netcdfplus.StorableObject` Returns ------- str the JSON string (usually in unicode) from the storage """ if hasattr(obj, 'base_cls'): store = self._objects[obj.base_cls] if store.json: return store.variables['json'][store.idx(obj)] return None def create_dimension(self, dim_name, size=None): """ Initialize a new dimension in the storage. Wraps the netCDF createDimension Parameters ---------- dim_name : str the name for the new dimension size : int the number of elements in this dimension. None (default) means an infinite dimension that extends when more objects are stored """ if dim_name not in self.dimensions: self.createDimension(dim_name, size) def cache_image(self): """ Return an dict containing information about all caches Returns ------- dict a nested dict containing information about the number and types of cached objects """ image = { 'weak': {}, 'strong': {}, 'total': {}, 'file': {}, 'index': {} } total_strong = 0 total_weak = 0 total_file = 0 total_index = 0 for name, store in self.objects.items(): size = store.cache.size count = store.cache.count profile = { 'count': count[0] + count[1], 'count_strong': count[0], 'count_weak': count[1], 'max': size[0], 'size_strong': size[0], 'size_weak': size[1], } total_strong += count[0] total_weak += count[1] total_file += len(store) total_index += len(store.index) image[name] = profile image['strong'][name] = count[0] image['weak'][name] = count[1] image['total'][name] = count[0] + count[1] image['file'][name] = len(store) # if hasattr(store, 'index'): image['index'][name] = len(store.index) # else: # image['index'][name] = 0 image['full'] = total_weak + total_strong image['total_strong'] = total_strong image['total_weak'] = total_weak image['file'] = total_file image['index'] = total_index return image def get_var_types(self): """ List all allowed variable type to be used in `create_variable` Returns ------- list of str the list of variable types """ types = list(NetCDFPlus._type_conversion.keys()) types += ['obj.' + x for x in self.objects.keys()] types += ['lazyobj.' + x for x in self.objects.keys()] types += ['uuid.' + x for x in self.objects.keys()] types += ['lazyuuid.' + x for x in self.objects.keys()] return sorted(types) @staticmethod def var_type_to_nc_type(var_type): """ Return the compatible netCDF variable type for var_type Parameters ---------- var_type : Returns ------- object A object of netcdf compatible varible types """ if 'obj.' in var_type: nc_type = str elif 'uuid.' in var_type: nc_type = str else: nc_type = NetCDFPlus._type_conversion[var_type] return nc_type def create_type_delegate(self, var_type): """ Create a variable value delegator for var_type The delegator will convert automatically between the given variable type and the netcdf compatible one Parameters ---------- var_type : str the variable type Returns ------- NetCDFPlus.Value_Delegate the delegator instance """ getter = None setter = None store = None if 'obj.' in var_type or 'uuid.' in var_type: store_name = str(var_type.split('.')[1]) store = self._stores[store_name] base_type = store.content_class # get_is_iterable = lambda v: \ # v.base_cls is not base_type if hasattr(v, 'base_cls') else \ # hasattr(v, '__iter__') get_numpy_iterable = lambda v: isinstance(v, np.ndarray) set_is_iterable = lambda v: \ v.base_cls is not base_type if hasattr(v, 'base_cls') else \ hasattr(v, '__iter__') if var_type == 'int': getter = lambda v: v.tolist() setter = lambda v: np.array(v) elif var_type == 'bool': getter = lambda v: v.astype(bool).tolist() setter = lambda v: np.array(v, dtype=np.int8) elif var_type == 'index': getter = lambda v: \ [None if int(w) < 0 else int(w) for w in v.tolist()] \ if hasattr(v, '__iter__') else None if int(v) < 0 else int(v) setter = lambda v: \ [-1 if w is None else w for w in v] \ if hasattr(v, '__iter__') else -1 if v is None else v elif var_type == 'float': getter = lambda v: v.tolist() setter = lambda v: np.array(v) elif var_type.startswith('numpy.'): pass elif var_type == 'jsonobj': setter = lambda v: self.simplifier.to_json_object(v) getter = lambda v: self.simplifier.from_json(v) elif var_type == 'json': setter = lambda v: self.simplifier.to_json(v) getter = lambda v: self.simplifier.from_json(v) elif var_type.startswith('obj.'): getter = lambda v: [ None if w[0] == '-' else store.load(int(UUID(w))) for w in v ] if get_numpy_iterable(v) else \ None if v[0] == '-' else store.load(int(UUID(v))) setter = lambda v: \ ''.join(['-' * 36 if w is None else str(UUID(int=store.save(w))) for w in list.__iter__(v)]) \ if set_is_iterable(v) else \ '-' * 36 if v is None else str(UUID(int=store.save(v))) elif var_type.startswith('lazyobj.'): getter = lambda v: [ None if w[0] == '-' else LoaderProxy.new(store, int(UUID(w))) for w in v ] if isinstance(v, np.ndarray) else \ None if v[0] == '-' else LoaderProxy.new(store, int(UUID(v))) setter = lambda v: \ ''.join([ '-' * 36 if w is None else str(UUID(int=store.save(w))) for w in list.__iter__(v) ]) if set_is_iterable(v) else \ '-' * 36 if v is None else str(UUID(int=store.save(v))) elif var_type == 'uuid': getter = lambda v: \ [None if w[0] == '-' else int(UUID(w)) for w in v] \ if type(v) is not unicode else None \ if v[0] == '-' else int(UUID(v)) setter = lambda v: \ ''.join([ '-' * 36 if w is None else str(UUID(int=w)) for w in list.__iter__(v) ]) if hasattr(v, '__iter__') else \ '-' * 36 if v is None else str(UUID(int=v)) elif var_type == 'store': setter = lambda v: v.prefix getter = lambda v: self.objects[v] return getter, setter, store to_uuid_chunks = staticmethod( lambda x: [x[i:i + 36] for i in range(0, len(x), 36)]) def create_variable_delegate(self, var_name): """ Create a delegate property that wraps the netcdf.Variable and takes care of type conversions Parameters ---------- var_name : str the name of the variable for which a delegate should be created """ if var_name not in self.vars: var = self.variables[var_name] if not hasattr(var, 'var_type'): return getter, setter, store = self.create_type_delegate(var.var_type) to_uuid_chunks = NetCDFPlus.to_uuid_chunks # to_uuid_chunks34 = NetCDFPlus.to_uuid_chunks34 if hasattr(var, 'var_vlen'): if var.var_type.startswith('obj.'): getter = lambda v: [[ None if u[0] == '-' else store.load(int(UUID(u))) for u in to_uuid_chunks(w) ] for w in v ] if isinstance(v, np.ndarray) else [ None if u[0] == '-' else store.load(int(UUID(u))) for u in to_uuid_chunks(v) ] elif var.var_type.startswith('lazyobj.'): getter = lambda v: [[ None if u[0] == '-' else LoaderProxy.new(store, int(UUID(u))) for u in to_uuid_chunks(w)] for w in v ] if isinstance(v, np.ndarray) else [ None if u[0] == '-' else LoaderProxy.new(store, int(UUID(u))) for u in to_uuid_chunks(v) ] if True or self.support_simtk_unit: if hasattr(var, 'unit_simtk'): if var_name not in self.units: self.update_simtk_unit(var_name) unit = self.units[var_name] def _get(my_getter): if my_getter is None: return lambda v: u.Quantity(v, unit) else: return lambda v: u.Quantity(my_getter(v), unit) def _set(my_setter): if my_setter is None: return lambda v: v / unit else: return lambda v: my_setter(v / unit) getter = _get(getter) setter = _set(setter) if True: if hasattr(var, 'maskable'): def _get2(my_getter): return lambda v: [ None if hasattr(w, 'mask') else my_getter(w) for w in v ] if type(v) is not str and len(v.shape) > 0 else \ None if hasattr(v, 'mask') else my_getter(v) if getter is not None: getter = _get2(getter) else: getter = _get2(lambda v: v) delegate = NetCDFPlus.ValueDelegate(var, getter, setter, store) # this is a trick to speed up the s/getter. If we do not need # to _cast_ because of python objects of units we can copy # the s/getter of the original var which is still bound to the # right object self.vars[var_name] = delegate else: raise ValueError("Variable '%s' is already taken!" % var_name) def create_variable(self, var_name, var_type, dimensions, description=None, chunksizes=None, simtk_unit=None, maskable=False): """ Create a new variable in the netCDF storage. This is just a helper function to structure the code better and add some convenience to creating more complex variables Parameters ========== var_name : str The name of the variable to be created var_type : str The string representing the type of the data stored in the variable. Allowed are strings of native python types in which case the variables will be treated as python or a string of the form 'numpy.type' which will refer to the numpy data types. Numpy is preferred sinec the api to netCDF uses numpy and thus it is faster. Possible input strings are `int`, `float`, `long`, `str`, `numpy.float32`, `numpy.float64`, `numpy.int8`, `numpy.int16`, `numpy.int32`, `numpy.int64`, `json`, `obj.<store>`, `lazyobj.<store>` dimensions : str or tuple of str A tuple representing the dimensions used for the netcdf variable. If not specified then the default dimension of the storage is used. If the last dimension is `'...'` then it is assumed that the objects are of variable length. In netCDF this is usually referred to as a VLType. We will treat is just as another dimension, but it can only be the last dimension. description : str A string describing the variable in a readable form. chunksizes : tuple of int A tuple of ints per number of dimensions. This specifies in what block sizes a variable is stored. Usually for object related stuff we want to store everything of one object at once so this is often (1, ..., ...) simtk_unit : str A string representing the units used for this variable. Can be used with all var_types although it makes sense only for numeric ones. maskable : bool, default: False If set to `True` the values in this variable can only partially exist and if they have not yet been written they are filled with a fill_value which is treated as a non-set variable. The created variable will interpret this values as `None` when returned """ ncfile = self if type(dimensions) is str: dimensions = [dimensions] dimensions = list(dimensions) new_dimensions = dict() for ix, dim in enumerate(dimensions): if type(dim) is int: dimensions[ix] = var_name + '_dim_' + str(ix) new_dimensions[dimensions[ix]] = dim if dimensions[-1] == '...': # last dimension is simply [] so we allow arbitrary length # and remove the last dimension variable_length = True dimensions = dimensions[:-1] else: variable_length = False if var_type == 'obj' or var_type == 'lazyobj': dimensions.append('pair') if chunksizes is not None: chunksizes = tuple(list(chunksizes) + [2]) nc_type = self.var_type_to_nc_type(var_type) for dim_name, size in new_dimensions.items(): ncfile.create_dimension(dim_name, size) dimensions = tuple(dimensions) # if chunk sizes are strings then replace it by # the actual size of the dimension if chunksizes is not None: chunksizes = list(chunksizes) for ix, dim in enumerate(chunksizes): if dim == -1: chunksizes[ix] = len(ncfile.dimensions[dimensions[ix]]) if type(dim) is str: chunksizes[ix] = len(ncfile.dimensions[dim]) chunksizes = tuple(chunksizes) if variable_length: vlen_t = ncfile.createVLType(nc_type, var_name + '_vlen') ncvar = ncfile.createVariable( var_name, vlen_t, dimensions, chunksizes=chunksizes ) setattr(ncvar, 'var_vlen', 'True') else: ncvar = ncfile.createVariable( var_name, nc_type, dimensions, chunksizes=chunksizes, ) setattr(ncvar, 'var_type', var_type) if self.support_simtk_unit and simtk_unit is not None: if isinstance(simtk_unit, u.Unit): unit_instance = simtk_unit symbol = unit_instance.get_symbol() elif isinstance(simtk_unit, u.BaseUnit): unit_instance = u.Unit({simtk_unit: 1.0}) symbol = unit_instance.get_symbol() elif type(simtk_unit) is str and hasattr(u, simtk_unit): unit_instance = getattr(u, simtk_unit) symbol = unit_instance.get_symbol() else: raise NotImplementedError( 'Unit by abbreviated string representation ' 'is not yet supported') json_unit = self.simplifier.unit_to_json(unit_instance) # store the unit in the dict inside the Storage object self.units[var_name] = unit_instance # Define units for a float variable setattr(ncvar, 'unit_simtk', json_unit) setattr(ncvar, 'unit', symbol) if maskable: setattr(ncvar, 'maskable', 'True') if description is not None: if type(dimensions) is str: dim_names = [dimensions] else: dim_names = ['#ix{0}:{1}'.format(*p) for p in enumerate(dimensions)] idx_desc = '[' + ']['.join(dim_names) + ']' description = var_name + idx_desc + ' is ' + \ description.format(idx=dim_names[0], ix=dim_names) # Define long (human-readable) names for variables. setattr(ncvar, "long_str", description) self.update_delegates() return ncvar def update_delegates(self): """ Updates the set of delegates in `self.vars` Should be called after new variables have been created or loaded. """ for name in self.variables: if name not in self.vars: self.create_variable_delegate(name) @staticmethod def get_value_parameters(value): """ Compute netcdfplus compatible parameters to store a value Parameters ---------- value Returns ------- dict A dictionary containing the approriate input parameters for `var_type`, `dimensions`, `simtk_unit` Notes ----- This is a utility function to create a CV using a template """ dimensions = None storable = True simtk_unit = None test_value = value test_type = value if NetCDFPlus.support_simtk_unit: if type(test_type) is u.Quantity: # could be a Quantity([..]) simtk_unit = test_type.unit test_type = test_type._value if type(test_type) is np.ndarray: dimensions = test_type.shape else: if hasattr(test_value, '__len__'): dimensions = len(test_value) test_type = test_value[0] if NetCDFPlus.support_simtk_unit and type(test_type) \ is u.Quantity: for val in test_value: if isinstance(val._value, type(test_value._value)): # all values must be of same type storable = False else: for val in test_value: if type(val) is not type(test_value): # all values must be of same type storable = False if NetCDFPlus.support_simtk_unit and type(test_type) is u.Quantity: # could also be [Quantity, ...] simtk_unit = test_type.unit test_type = test_type._value if storable: var_type = NetCDFPlus.identify_var_type(test_type) return { 'var_type': var_type, 'dimensions': dimensions, 'simtk_unit': simtk_unit } return { } @staticmethod def identify_var_type(instance): """ Identify common python and numpy types Parameters ---------- instance python variable instance to be tested for it numeric type Returns ------- str a string representation of the variable type """ ty = type(instance) known_types = [float, int, bool, str] if ty in known_types: return ty.__name__ elif hasattr(instance, 'dtype'): return 'numpy.' + instance.dtype.type.__name__ else: return 'None'