import base64
import importlib
import numpy as np
from openpathsampling.integration_tools import is_simtk_quantity, unit
import math
import abc
from uuid import UUID
import ujson
import marshal
import types
import opcode
from .base import StorableObject
from openpathsampling.tools import word_wrap
from .cache import WeakValueCache
__author__ = 'Jan-Hendrik Prinz'
import sys
if sys.version_info > (3, ):
long = int
unicode = str
builtin_module = 'builtins'
get_code = lambda func: func.__code__
intify_byte = lambda b: b
decodebytes = lambda s: base64.decodebytes(s.encode())
get_callable_name = lambda c: c.__name__
import builtins
else:
builtin_module = '__builtin__'
get_code = lambda func: func.func_code
intify_byte = lambda b: ord(b)
decodebytes = base64.decodestring
get_callable_name = lambda c: c.func_name
import builtins
def make_callable_codec(safemode):
# starting in Python 3.11, we use SimStore to serialize callables
from openpathsampling.experimental.simstore.callable_codec \
import CallableCodec
only_req = ObjectJSON.prevent_unsafe_modules
codec = CallableCodec(settings={
'only_allow_required_modules': only_req,
'required_modules': ObjectJSON.safe_modules,
'safemode': safemode,
})
return codec
# in Python 3.6 the opcodes have changed width
if sys.version_info > (3, 6):
opcode_arg_width = 1
opcode_no_arg_width = 1
else:
opcode_arg_width = 2
opcode_no_arg_width = 0
if int(ujson.__version__.split(".")[0]) <= 2:
ujson_kwargs = dict()
else:
ujson_kwargs = {"reject_bytes": False}
[docs]
class ObjectJSON(object):
"""
A simple implementation of a pickle algorithm to create object that can be
converted to json and back
Attributes
----------
safemode: bool
If set to `True` the recreation of marshalled objects like functions is
switched off and these objects are replaced by None. Can be used to load
from incompatible python versions or potential unsafe trajectory files.
"""
allow_marshal = True
# switch to `true`, if you want more protection
prevent_unsafe_modules = False
allowed_storable_atomic_types = [
int, float, bool, long, str,
np.float32, np.float64,
np.int8, np.int16, np.int32, np.int64,
np.uint8, np.uint16, np.uint32, np.uint64,
]
safe_modules = [
'numpy',
'math',
'pandas',
'mdtraj',
'simtk',
'simtk.unit',
'simtk.openmm'
'openmm',
'openmm.unit',
]
[docs]
def __init__(self, unit_system=None):
self.excluded_keys = []
self.unit_system = unit_system
self.class_list = dict()
self.allowed_storable_types = dict()
self.type_names = {}
self.type_classes = {}
self.safemode = False
self.update_class_list()
def update_class_list(self):
self.class_list = StorableObject.objects()
self.type_names = {
cls.__name__: cls for cls in self.allowed_storable_atomic_types}
self.type_names.update(self.class_list)
self.type_classes = {
cls: name for name, cls in self.type_names.items()}
def simplify_object(self, obj):
return {
'_cls': obj.__class__.__name__,
'_dict': self.simplify(obj.to_dict(), obj.base_cls_name)
}
def simplify(self, obj, base_type=''):
if obj.__class__.__name__ == 'module':
# store an imported module
if obj.__name__.split('.')[0] in self.safe_modules:
return {'_import': obj.__name__}
else:
raise RuntimeError((
'The module reference "%s" you want to store is '
'not allowed!') % obj.__name__)
elif type(obj) is type or type(obj) is abc.ABCMeta:
# store a storable number type
if obj in self.type_classes:
return {'_type': obj.__name__}
else:
return None
elif type(obj) is float and math.isinf(obj):
return {
'_float': str(obj)}
elif type(obj) is int and math.isinf(obj):
return {
'_integer': str(obj)}
elif obj.__class__.__module__ != builtin_module:
if is_simtk_quantity(obj):
# This is number with a unit so turn it into a list
if self.unit_system is not None:
return {
'_value': self.simplify(
obj.value_in_unit_system(self.unit_system)),
'_units': self.unit_to_dict(
obj.unit.in_unit_system(self.unit_system))
}
else:
return {
'_value': self.simplify(obj / obj.unit, base_type),
'_units': self.unit_to_dict(obj.unit)
}
elif obj.__class__ is np.ndarray:
# this is maybe not the best way to store large numpy arrays!
return {
'_numpy': self.simplify(obj.shape),
'_dtype': str(obj.dtype),
'_data': base64.b64encode(obj.copy(order='C'))
}
elif hasattr(obj, 'to_dict'):
# the object knows how to dismantle itself into a json string
if hasattr(obj, '__uuid__'):
return {
'_cls': obj.__class__.__name__,
'_obj_uuid': str(UUID(int=obj.__uuid__)),
'_dict': self.simplify(obj.to_dict(), base_type)}
else:
return {
'_cls': obj.__class__.__name__,
'_dict': self.simplify(obj.to_dict(), base_type)}
elif type(obj) is UUID:
return {
'_uuid': str(UUID(int=obj))}
else:
return None
elif type(obj) is list:
return [self.simplify(o, base_type) for o in obj]
elif type(obj) is tuple:
return {'_tuple': [self.simplify(o, base_type) for o in obj]}
elif type(obj) is dict:
# we want to support storable objects as keys so we need to wrap
# dicts with care and store them using tuples
simple = [
key for key in obj.keys()
if type(key) is str or type(key) is int]
if len(simple) < len(obj):
# other keys than int or str
result = {
'_dict': [
self.simplify(tuple([key, o]))
for key, o in obj.items()
if key not in self.excluded_keys
]}
else:
# simple enough, do it the old way
# FASTER VERSION NORMALLY
result = {
key: self.simplify(o) for key, o in obj.items()
if key not in self.excluded_keys
}
# SLOWER VERSION FOR DEBUGGING
# result = {}
# for key, o in obj.items():
# logger.debug("Making dict entry of " + str(key) + " : "
# + str(o))
# if key not in self.excluded_keys:
# result[key] = self.simplify(o)
# else:
# logger.debug("EXCLUDED")
return result
elif type(obj) is slice:
return {
'_slice': [obj.start, obj.stop, obj.step]}
else:
oo = obj
return oo
@staticmethod
def _unicode2str(s):
if type(s) is unicode:
return str(s)
else:
return s
def build(self, obj):
if type(obj) is dict:
if '_units' in obj and '_value' in obj:
return self.build(
obj['_value']) * self.unit_from_dict(obj['_units'])
elif '_slice' in obj:
return slice(*obj['_slice'])
elif '_numpy' in obj:
return np.frombuffer(
decodebytes(obj['_data']),
dtype=np.dtype(obj['_dtype'])).reshape(
self.build(obj['_numpy'])
)
elif '_float' in obj:
return float(str(obj['_float']))
elif '_integer' in obj:
return float(str(obj['_integer']))
elif '_uuid' in obj:
return int(UUID(obj['_uuid']))
elif '_cls' in obj and '_dict' in obj:
if obj['_cls'] not in self.class_list:
self.update_class_list()
if obj['_cls'] not in self.class_list:
# updating did not help, so there is nothing we can do.
return None
# raise ValueError((
# 'Cannot create obj of class `%s`.\n' +
# 'Class is not registered as creatable! '
# 'You might have to define\n' +
# 'the class locally and call '
# '`update_storable_classes()` on your storage.') %
# obj['_cls'])
attributes = self.build(obj['_dict'])
return self.class_list[obj['_cls']].from_dict(attributes)
elif '_tuple' in obj:
return tuple([self.build(o) for o in obj['_tuple']])
elif '_type' in obj:
# return a type of a _built-in_ `netcdfplus` type
return self.type_names.get(obj['_type'])
elif '_dict' in obj:
return {
self._unicode2str(self.build(key)): self.build(o)
for key, o in self.build(obj['_dict'])
}
elif '_import' in obj:
module = obj['_import']
if module.split('.')[0] in self.safe_modules:
imp = importlib.import_module(module)
return imp
else:
return None
elif '_marshal' in obj or '_module' in obj or '_dilled' in obj:
if self.safemode:
return None
return self.callable_from_dict(obj)
else:
return {
self._unicode2str(key): self.build(o)
for key, o in obj.items()
}
elif type(obj) is list:
return [self.build(o) for o in obj]
elif type(obj) is unicode:
return str(obj)
else:
return obj
@staticmethod
def unit_to_symbol(unit):
return str(1.0 * unit).split()[1]
@staticmethod
def unit_to_dict(unit):
unit_dict = {
p.name: int(fac) for p, fac in unit.iter_base_or_scaled_units()}
return unit_dict
@staticmethod
def unit_from_dict(unit_dict):
# this will *only* work if openmm.unit is installed
this_unit = unit.Unit({})
for unit_name, unit_multiplication in unit_dict.items():
this_unit *= getattr(unit, unit_name) ** unit_multiplication
return this_unit
@staticmethod
def callable_to_dict(c):
"""
Turn a callable function of class into a dictionary
Used for conversion to JSON
Parameters
----------
c : callable (function or class with __call__)
the function to be turned into a dict representation
Returns
-------
dict
the dict representation of the callable
"""
f_module = c.__module__
root_module = f_module.split('.')[0]
# is_class = isinstance(c, (type, types.ClassType))
# try saving known external classes of functions, e.g. `msmbuilder`
if root_module in ObjectJSON.safe_modules:
# only store the function/class and the module
return {
'_module': c.__module__,
'_name': c.__name__
}
# if the easy way did not work, try saving it using bytecode
if ObjectJSON.allow_marshal and callable(c):
if sys.version_info > (3, 11):
codec = make_callable_codec(safemode=False)
return codec.default(c)
# back to normal for older Python versions
# use marshal
global_vars = ObjectJSON._find_var(c, opcode.opmap['LOAD_GLOBAL'])
import_vars = ObjectJSON._find_var(c, opcode.opmap['IMPORT_NAME'])
all_builtins = dir(builtins)
global_vars = list(set(
[var for var in global_vars if var not in all_builtins]))
import_vars = list(set(import_vars))
err = ''
if len(global_vars) > 0:
err += 'The function you try to save relies on globally set ' \
'variables and these cannot be saved since storage ' \
'has no access to the global scope which includes ' \
'imports! \n\n'
err += 'We require that the following globals: ' + \
str(global_vars) + ' either\n'
err += '\n1. be replaced by constants'
err += '\n2. be defined inside your function,' + \
'\n\n' + '\n'.join(
map(lambda x: ' ' * 8 + x + '= ...', global_vars)
) + '\n'
err += '\n3. imports need to be "re"-imported inside your ' \
'function' + \
'\n\n' + '\n'.join(
map(lambda x: ' ' * 8 + 'import ' + x, global_vars)
) + '\n'
err += '\n4. be passed as an external parameter ' \
'(not for imports!)'
err += '\n\n my_cv = FunctionCV("cv_name", ' + \
get_callable_name(c) + ', \n' + \
',\n'.join(
map(lambda x: ' ' * 20 + x + '=' + x, global_vars)
) + ')' + '\n'
err += '\n and change your function definition like this'
err += '\n\n def ' + \
get_callable_name(c) + '(snapshot, ..., ' + \
'\n' + ',\n'.join(
map(lambda x: ' ' * 16 + x, global_vars)
) + '):'
unsafe_modules = [
module for module in import_vars
if module not in ObjectJSON.safe_modules
]
if ObjectJSON.prevent_unsafe_modules and len(unsafe_modules) > 0:
if len(err) > 0:
err += '\n\n'
err += 'The function you try to save requires the following' \
' modules to be installed: ' + str(unsafe_modules) + \
' which are not marked as safe! '
err += 'You can change the list of safe modules using '
err += '\n\n ObjectJSON.safe_modules.extend(['
err += '\n' + ',\n'.join(
map(lambda x: ' ' * 12 + x, unsafe_modules)
)
err += '\n ])'
err += '\n\n'
err += 'include the import statement in your function like'
err += '\n\n' + '\n'.join(
[' ' * 8 + 'import ' + v for v in unsafe_modules])
if len(err) > 0:
raise RuntimeError('Cannot store function! \n\n' +
word_wrap(err, 60))
return {
'_marshal': ObjectJSON._to_marshal(c),
'_global_vars': global_vars,
'_module_vars': import_vars
}
raise RuntimeError('Locally defined classes are not storable yet')
@staticmethod
def _to_marshal(c):
return base64.b64encode(marshal.dumps(get_code(c)))
@staticmethod
def callable_from_dict(c_dict):
"""
Turn a dictionary back in a callable function or class
Used for conversion from JSON
Parameters
----------
c_dict : dict
the dictionary that contains the information
Returns
-------
callable
the reconstructed callable function or class
"""
c = None
if c_dict is not None:
if '_marshal' in c_dict:
if ObjectJSON.allow_marshal:
code = marshal.loads(base64.b64decode(c_dict['_marshal']))
c = types.FunctionType(code, globals(), code.co_name)
elif '_module' in c_dict:
module = c_dict['_module']
packages = module.split('.')
if packages[0] in ObjectJSON.safe_modules:
imp = importlib.import_module(module)
c = getattr(imp, c_dict['_name'])
elif '_dilled' in c_dict:
# safemode has to get handled by original netcdfplus, since
# this is a staticmethod
codec = make_callable_codec(safemode=False)
c = codec.object_hook(c_dict)
return c
@staticmethod
def _to_opcode(code):
"""
Yields the tuple opcode + argument for code
Parameters
----------
code : function
the python bytecode to be searched
Returns
-------
generator of (int, int)
generator the returns tuples of opcode + argument
"""
opcode_stream = get_code(code).co_code
i = 0
extended = 0
length = len(opcode_stream)
while i < length:
int_code = intify_byte(opcode_stream[i])
i += 1
if int_code >= opcode.HAVE_ARGUMENT:
opargs = intify_byte(opcode_stream[i]) + extended
if opcode_arg_width == 2:
# before Python 3.6
opargs += intify_byte(opcode_stream[i + 1]) * 256
extended = 0
i += opcode_arg_width
if int_code == opcode.EXTENDED_ARG:
extended = (256 ** opcode_arg_width) * opargs
continue
else:
i += opcode_no_arg_width
opargs = None
yield int_code, opargs
@staticmethod
def _find_var(code, op):
"""
Helper function to search in python bytecode for specific function calls
Parameters
----------
code : function
the python bytecode to be searched
op : int
the int code of the code to be found
Returns
-------
list of func_code.co_names
a list of co_names used in this function when calling op
"""
code_object = get_code(code)
if op in opcode.hasconst:
variable = code_object.co_consts
elif op in opcode.haslocal:
variable = code_object.co_varnames
elif op in opcode.hasname:
variable = code_object.co_names
elif op in opcode.hasfree:
variable = code_object.co_freevars
else:
return []
return list(set([
variable[arg] for code, arg in ObjectJSON._to_opcode(code)
if code == op and arg is not None]))
def to_json(self, obj, base_type=''):
simplified = self.simplify(obj, base_type)
return ujson.dumps(simplified, **ujson_kwargs)
def to_json_object(self, obj):
if hasattr(obj, 'base_cls') \
and type(obj) is not type and type(obj) is not abc.ABCMeta:
simplified = self.simplify_object(obj)
else:
simplified = self.simplify(obj)
try:
json_str = ujson.dumps(simplified, **ujson_kwargs)
except TypeError as e:
err = (
'Cannot convert object of type `%s` to json. '
'\n__dict__: %s\n'
'\nsimplified: %s\n'
'\nError: %s'
) % (
obj.__class__.__name__,
obj.__dict__,
simplified,
str(e)
)
raise ValueError(err)
return json_str
def from_json(self, json_string):
simplified = ujson.loads(json_string)
return self.build(simplified)
def unit_to_json(self, unit):
simple = self.unit_to_dict(unit)
return self.to_json(simple)
def unit_from_json(self, json_string):
return self.unit_from_dict(self.from_json(json_string))
[docs]
class StorableObjectJSON(ObjectJSON):
[docs]
def __init__(self, storage, unit_system=None):
super(StorableObjectJSON, self).__init__(unit_system)
self.excluded_keys = ['idx', 'json', 'identifier']
self.storage = storage
def simplify(self, obj, base_type=''):
if obj is self.storage:
return {'_storage': 'self'}
if obj.__class__.__module__ != builtin_module:
if obj.__class__ in self.storage._obj_store:
store = self.storage._obj_store[obj.__class__]
if not store.nestable or obj.base_cls_name != base_type:
# this also returns the base class name used for storage
# store objects only if they are not creatable. If so they
# will only be created in their top instance and we use
# the simplify from the super class ObjectJSON
idx = store.save(obj)
if idx is None:
raise RuntimeError(
'cannot store idx None in store %s' % store)
return {
'_idx': idx,
'_store': store.prefix}
return super(StorableObjectJSON, self).simplify(obj, base_type)
def build(self, obj):
if type(obj) is dict:
if '_storage' in obj:
if obj['_storage'] == 'self':
return self.storage
if '_idx' in obj and '_store' in obj:
store = self.storage._stores[obj['_store']]
result = store.load(obj['_idx'])
return result
return super(StorableObjectJSON, self).build(obj)
class UUIDObjectJSON(ObjectJSON):
def __init__(self, storage, unit_system=None):
super(UUIDObjectJSON, self).__init__(unit_system)
self.excluded_keys = ['json']
self.storage = storage
def simplify(self, obj, base_type=''):
if obj is self.storage:
return {'_storage': 'self'}
if obj.__class__.__module__ != builtin_module:
if obj.__class__ in self.storage._obj_store:
store = self.storage._obj_store[obj.__class__]
if not store.nestable or obj.base_cls_name != base_type:
# this also returns the base class name used for storage
# store objects only if they are not creatable. If so
# they will only be created in their top instance and we
# use the simplify from the super class ObjectJSON
store.save(obj)
return {
'_hex_uuid': hex(obj.__uuid__),
'_store': store.prefix}
# return {
# '_obj_uuid': str(UUID(int=obj.__uuid__)),
# '_store': store.prefix}
return super(UUIDObjectJSON, self).simplify(obj, base_type)
def build(self, obj):
if type(obj) is dict:
if '_storage' in obj:
if obj['_storage'] == 'self':
return self.storage
if '_obj_uuid' in obj and '_store' in obj:
store = self.storage._stores[obj['_store']]
result = store.load(int(UUID(obj['_obj_uuid'])))
return result
if '_hex_uuid' in obj and '_store' in obj:
store = self.storage._stores[obj['_store']]
result = store.load(int(obj['_hex_uuid'].strip('L'), 16))
return result
return super(UUIDObjectJSON, self).build(obj)
class CachedUUIDObjectJSON(ObjectJSON):
def __init__(self, unit_system=None):
super(CachedUUIDObjectJSON, self).__init__(unit_system)
self.excluded_keys = ['json']
self.uuid_cache = WeakValueCache()
def simplify(self, obj, base_type=''):
if obj.__class__.__module__ != builtin_module:
if hasattr(obj, 'to_dict') and hasattr(obj, '__uuid__'):
# the object knows how to dismantle itself into a json string
if obj.__uuid__ not in self.uuid_cache:
self.uuid_cache[obj.__uuid__] = obj
return {
'_cls': obj.__class__.__name__,
'_obj_uuid': str(UUID(int=obj.__uuid__)),
'_dict': self.simplify(obj.to_dict(), base_type)}
else:
return {
'_obj_uuid': str(UUID(int=obj.__uuid__))}
return super(CachedUUIDObjectJSON, self).simplify(obj, base_type)
def build(self, jsn):
if type(jsn) is dict:
if '_obj_uuid' in jsn:
uuid = UUID(jsn['_obj_uuid'])
if uuid in self.uuid_cache:
return self.uuid_cache[uuid]
elif '_cls' in jsn and '_dict' in jsn:
if jsn['_cls'] not in self.class_list:
self.update_class_list()
if jsn['_cls'] not in self.class_list:
raise ValueError((
'Cannot create jsn of class `%s`.\n' +
'Class is not registered as creatable! '
'You might have to define\n' +
'the class locally and call '
'`update_storable_classes()` on your storage.'
) % jsn['_cls'])
attributes = self.build(jsn['_dict'])
obj = self.class_list[jsn['_cls']].from_dict(attributes)
obj.__uuid__ = uuid
self.uuid_cache[uuid] = obj
return obj
else:
# this should not happen!
raise ('What happend here. JSN `%s`' % jsn)
pass
return super(CachedUUIDObjectJSON, self).build(jsn)
def to_json(self, obj, base_type=''):
# we need to clear the cache, since we have no idea, what the other end
# still knows. We can only cache stuff we are sending this time
self.uuid_cache.clear()
return super(CachedUUIDObjectJSON, self).to_json(obj, base_type)
# def from_json(self, json_string):
# # here we keep the cache. It could happen that an object is sent in
# # full, but we still have it and so we do not have to rebuild it which
# # saves some time
# simplified = ujson.loads(json_string)
# return self.build(simplified)