Source code for exekall.engine

#! /usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2018, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import inspect
import collections.abc
from collections import OrderedDict
import copy
import itertools
import functools
import lzma
import pathlib
import contextlib
import pickle
import pprint
import pickletools
import re
import importlib
import sys
import io
import datetime
import io
import typing
import types
from operator import attrgetter

import exekall._utils as utils
from exekall._utils import NoValue, OrderedSet, FrozenOrderedSet


[docs] class NoOperatorError(Exception): """ Exception raised when no operator has been found to build objects of a needed type. """ pass
[docs] class IndentationManager: """ Manage the indentation level in a generated script. """ def __init__(self, style): self.style = style self.level = 0
[docs] def indent(self): self.level += 1
[docs] def dedent(self): self.level -= 1
def __str__(self): return str(self.style) * self.level
[docs] class ValueDB: """ Serializable object that contains a graph of :class:`FrozenExprVal`. This allows storing all the objects computed for each subexpression that was executed for later inspection. :param froz_val_seq_list: List of :class:`FrozenExprValSeq` to store at the root of the database. :type froz_val_seq_list: list(FrozenExprValSeq) :param adaptor_cls: A subclass of :class:`exekall.customization.AdaptorBase` that was used when setting up the expressions. This class can provide hooks that are called when deserializing the database. :type adaptor_cls: type The values of each expression is recorded in a root list of :class:`FrozenExprValSeq`. """ # Version 4 is available since Python 3.4 and improves a bit loading and # dumping speed. PICKLE_PROTOCOL = 4 def __init__(self, froz_val_seq_list, adaptor_cls=None): # Avoid storing duplicate FrozenExprVal sharing the same value/excep # UUID self.froz_val_seq_list = self._dedup_froz_val_seq_list(froz_val_seq_list) self.adaptor_cls = adaptor_cls @classmethod def _dedup_froz_val_seq_list(cls, froz_val_seq_list): """ Avoid keeping :class:`FrozenExprVal` that share the same value or excep UUID, since they are duplicates of each-other. """ # First pass: find all frozen values corresponding to a given UUID uuid_map = collections.defaultdict(set) def update_uuid_map(froz_val): uuid_map[froz_val.uuid].add(froz_val) return froz_val cls._froz_val_dfs(froz_val_seq_list, update_uuid_map) # If there is no more than one FrozenExprVal per UUID, we can skip the # rewriting for froz_val_set in uuid_map.values(): if len(froz_val_set) > 1: break else: return froz_val_seq_list # Select one FrozenExprVal for each UUID pair def select_froz_val(froz_val_set): candidates = [ froz_val for froz_val in froz_val_set # We discard candidates that have no parameters, as they # contain less information than the ones that do. This is # typically the case for PrebuiltOperator values if froz_val.param_map ] # At this point, there should be no more than one "original" value, # the other candidates were just values of PrebuiltOperator, or are # completely equivalent to the original value if candidates: return candidates[0] # If there was no better candidate, just return the first one else: return utils.take_first(froz_val_set) uuid_map = { uuid_: select_froz_val(froz_val_set) for uuid_, froz_val_set in uuid_map.items() } # Second pass: only keep one frozen value for each UUID def rewrite_graph(froz_val): if froz_val.uuid is None: return froz_val return uuid_map[froz_val.uuid] return cls._froz_val_dfs(froz_val_seq_list, rewrite_graph, rewrite=True)
[docs] @classmethod def merge(cls, db_list, roots_from=None): """ Merge multiple databases together. :param db_list: Lists of DB to merge :type db_list: list(ValueDB) :param roots_from: Only get the root values from the specified DB. The other DBs are only used to provide subexpressions values for expressions already present in that DB. That DB is also appended to ``db_list``. :type roots_from: ValueDB When two different :class:`FrozenExprVal` are available for a given UUID, the one that contains the most information will be selected. This allows natural removal of values created using an :class:`PrebuiltOperator` when the original object is also available. """ db_list = list(db_list) if not db_list: return cls([]) adaptor_cls_set = { db.adaptor_cls for db in db_list } if len(adaptor_cls_set - {None}) != 1: raise ValueError(f'Cannot merge ValueDB with different adaptor classes: {adaptor_cls_set}') # If None was used, assume it is compatible with anything adaptor_cls_set.discard(None) adaptor_cls = utils.take_first(adaptor_cls_set) if roots_from is not None: db_list.append(roots_from) froz_val_seq_list = list(itertools.chain.from_iterable( db.froz_val_seq_list for db in db_list )) db = cls(froz_val_seq_list, adaptor_cls=adaptor_cls) # Remove all roots that we don't want if roots_from is not None: allowed_roots = { froz_val.uuid for froz_val in roots_from.get_roots() } db.froz_val_seq_list = [ froz_val_seq for froz_val_seq in db.froz_val_seq_list # Only keep FrozenExprValSeq that only contains allowed # FrozenExprVal if {froz_val.uuid for froz_val in froz_val_seq} <= allowed_roots ] return db
[docs] @classmethod def from_path(cls, path, relative_to=None): """ Deserialize a :class:`ValueDB` from a file. At the moment, it is assumed to be an LZMA compressed Pickle file. :param path: Path to the file containing the serialized :class:`ValueDB`. :type path: str or pathlib.Path :param relative_to: If provided, ``path`` is interpreted as being relative to that folder, or to the containing folder if it is a file. This is mainly used to ease generation of scripts that can provide ``relative_to=__file__`` so that the artifact folder can be moved around easily. :type relative_to: str or pathlib.Path """ if relative_to is not None: relative_to = pathlib.Path(relative_to).resolve() if not relative_to.is_dir(): relative_to = pathlib.Path(relative_to).parent path = pathlib.Path(relative_to, path) with lzma.open(str(path), 'rb') as f: # Disabling garbage collection while loading result in significant # speed improvement, since it creates a lot of new objects in a # very short amount of time. with utils.disable_gc(): db = pickle.load(f) assert isinstance(db, cls) # Apply some post-processing on the DB with a known path cls._call_adaptor_reload(db, path=path) return db
@classmethod def _reload_serialized(cls, dct): db = cls.__new__(cls) db.__dict__ = dct # Apply some post-processing on the DB that was just reloaded, with no # path since we don't even know if that method was invoked on something # serialized in a file. cls._call_adaptor_reload(db, path=None) return db
[docs] def __reduce_ex__(self, protocol): """ Provide custom serialization that will call the adaptor's hooks. """ return (self._reload_serialized, (self.__dict__,))
@staticmethod def _call_adaptor_reload(db, path): adaptor_cls = db.adaptor_cls if adaptor_cls: db = adaptor_cls.reload_db(db, path=path) return db
[docs] def to_path(self, path, optimize=True): """ Write the DB to the given file. :param path: path to file to write the DB into :type path: pathlib.Path or str :param optimize: Optimize the representation of the DB. This may increase the dump time and memory consumption, but should speed-up loading/file size. :type optimize: bool """ protocol = self.PICKLE_PROTOCOL if optimize: def dumper(f): bytes_ = utils.ExceptionPickler.dump_bytestring(self, protocol=protocol) bytes_ = pickletools.optimize(bytes_) return f.write(bytes_) else: def dumper(f): return utils.ExceptionPickler.dump_file(f, self, protocol=protocol) with lzma.open(str(path), 'wb') as f: dumper(f)
@property @utils.once def _uuid_map(self): uuid_map = dict() def update_map(froz_val): uuid_map[froz_val.uuid] = froz_val return froz_val self._froz_val_dfs(self.froz_val_seq_list, update_map) return uuid_map @classmethod def _froz_val_dfs(cls, froz_val_seq_list, callback, rewrite=False): if rewrite: def _do_froz_val_dfs(froz_val): updated_froz_val = callback(froz_val) updated_froz_val.param_map = OrderedDict( (param, _do_froz_val_dfs(param_froz_val)) for param, param_froz_val in updated_froz_val.param_map.items() ) return updated_froz_val return [ FrozenExprValSeq( froz_val_list=[ _do_froz_val_dfs(froz_val) for froz_val in froz_val_seq ], param_map=OrderedDict( (param, _do_froz_val_dfs(froz_val)) for param, froz_val in froz_val_seq.param_map.items() ) ) for froz_val_seq in froz_val_seq_list ] # When we don't need to rewrite the graph, just call the callback so we # avoid creating loads of useless objects else: def _do_froz_val_dfs(froz_val): callback(froz_val) for parent in froz_val.param_map.values(): _do_froz_val_dfs(parent) for froz_val_seq in froz_val_seq_list: for froz_val in froz_val_seq: _do_froz_val_dfs(froz_val) for froz_val in froz_val_seq.param_map.values(): _do_froz_val_dfs(froz_val) return None
[docs] def get_by_uuid(self, uuid): """ Get a :class:`FrozenExprVal` by its UUID. """ return self._uuid_map[uuid]
[docs] def get_by_predicate(self, predicate, flatten=True, deduplicate=False): """ Get :class:`FrozenExprVal` matching the predicate. :param predicate: Predicate callable called with an instance of :class:`FrozenExprVal`. If it returns True, the value is selected. :type predicate: collections.abc.Callable :param flatten: If False, return a set of frozenset of objects. There is a frozenset set for each expression result that shared their parameters. If False, the top-level set is flattened into a set of objects matching the predicate. :type flatten: bool :param deduplicate: If True, there won't be duplicates across nested sets. :type deduplicate: bool """ froz_val_set_set = OrderedSet() # When we reload instances of a class from the DB, we don't # want anything else to be able to produce it, since we want to # run on that existing data set # Make sure we don't select the same froz_val twice if deduplicate: visited = set() def wrapped_predicate(froz_val): if froz_val in visited: return False else: visited.add(froz_val) return predicate(froz_val) else: wrapped_predicate = predicate for froz_val_seq in self.froz_val_seq_list: froz_val_set = OrderedSet() for froz_val in itertools.chain( # traverse all values, including the ones from the # parameters, even when there was no value computed # (because of a failed parent for example) froz_val_seq, froz_val_seq.param_map.values() ): froz_val_set.update(froz_val.get_by_predicate(wrapped_predicate)) froz_val_set_set.add(FrozenOrderedSet(froz_val_set)) if flatten: return set(utils.flatten_seq(froz_val_set_set)) else: return froz_val_set_set
[docs] def get_roots(self, flatten=True): """ Get all the root :class:`FrozenExprVal`. :param flatten: If True, a set of :class:`FrozenExprVal` is returned, otherwise a set of frozensets of :class:`FrozenExprVal` is returned. Each set will correspond to an expression, and values inside the frozenset will correspond to the values of that expression. :type flatten: bool Root values are the result of full expression (as opposed to subexpressions). """ froz_val_set_set = { FrozenOrderedSet(froz_val_seq) for froz_val_seq in self.froz_val_seq_list } if flatten: return set(utils.flatten_seq(froz_val_set_set)) else: return froz_val_set_set
[docs] def prune_by_predicate(self, predicate): """ Create a new :class:`ValueDB` with all :class:`FrozenExprVal` matching the predicate replaced by a terminal :class:`PrunedFrozVal`. :param predicate: Predicate callable called with an instance of :class:`FrozenExprVal`. If it returns True, the value is pruned out. :type predicate: collections.abc.Callable This allows trimming a :class:`ValueDB` to a smaller size by removing non-necessary content. """ def prune(froz_val, do_prune): if isinstance(froz_val, PrunedFrozVal): return froz_val elif do_prune: return PrunedFrozVal(froz_val) else: # Edit the param_map in-place, so we keep it potentially shared # if possible. for param, param_froz_val in list(froz_val.param_map.items()): froz_val.param_map[param] = prune( param_froz_val, do_prune=predicate(param_froz_val) ) return froz_val def make_froz_val_seq(froz_val_seq): froz_val_list = [ prune(copy_graph(froz_val), do_prune=False) for froz_val in froz_val_seq # Just remove the root PrunedFrozVal, since they are useless at # this level (i.e. nothing depends on them) if not predicate(froz_val) ] # All param_map will be the same in the list by construction try: param_map = froz_val_list[0].param_map except IndexError: param_map = OrderedDict() return FrozenExprValSeq( froz_val_list=froz_val_list, param_map=param_map, ) # Memoize the function so that all references to the same froz_val are # replaced by the exact same instance @functools.lru_cache(maxsize=None, typed=True) def copy_graph(froz_val): froz_val = copy.copy(froz_val) froz_val.param_map = OrderedDict( (param, copy_graph(param_froz_val)) for param, param_froz_val in froz_val.param_map.items() ) return froz_val return self.__class__( froz_val_seq_list=[ make_froz_val_seq(froz_val_seq) for froz_val_seq in self.froz_val_seq_list ], adaptor_cls=self.adaptor_cls, )
[docs] def get_all(self, **kwargs): """ Get all :class:`FrozenExprVal` contained in this database. :Variable keyword arguments: Forwarded to :meth:`ValueDB.get_by_predicate` """ return self.get_by_predicate(lambda froz_val: True, **kwargs)
[docs] def get_by_type(self, cls, include_subclasses=True, **kwargs): """ Get all :class:`FrozenExprVal` contained in this database which value has the specified type. :param cls: Class to match. :type cls: type :param include_subclasses: If True, the check is done using ``isinstance``, otherwise an exact type check is done using ``is``. :type include_subclasses: bool :Variable keyword arguments: Forwarded to :meth:`ValueDB.get_by_predicate` .. note:: If a subexpressions had a :class:`exekall._utils.NoValue` value, it will not be selected as type matching is done on the value itself, not the return type of the callable used for that sub expression. """ if include_subclasses: def predicate(froz_val): return isinstance(froz_val.value, cls) else: def predicate(froz_val): return froz_val.type_ is cls return self.get_by_predicate(predicate, **kwargs)
[docs] def get_by_id(self, id_pattern, qual=False, full_qual=False, **kwargs): """ Get all :class:`FrozenExprVal` contained in this database which ID matches the given pattern. :param id_pattern: :func:`fnmatch.fnmatch` pattern used to match the ID. :type id_pattern: str :param qual: If True, the match will be performed on the qualified ID. :type qual: bool :param full_qual: If True, the match will be performed on the fully qualified ID. :type full_qual: bool :Variable keyword arguments: Forwarded to :meth:`ValueDB.get_by_predicate` """ def predicate(froz_val): return utils.match_name( froz_val.get_id(qual=qual, full_qual=full_qual), [id_pattern] ) return self.get_by_predicate(predicate, **kwargs)
[docs] class ScriptValueDB: """ Class tying together a generated script and a :class:`ValueDB`. :param db: :class:`ValueDB` used. :type db: ValueDB :param var_name: Name of the variable used to represent the :class:`ValueDB` in the generated script. :type var_name: str """ def __init__(self, db, var_name='db'): self.db = db self.var_name = var_name
[docs] def get_snippet(self, expr_val, attr): return '{db}.get_by_uuid({uuid}).{attr}'.format( db=self.var_name, uuid=repr(expr_val.uuid), attr=attr, )
[docs] class CycleError(Exception): """ Exception raised when a cyclic dependency is detected when building the :class:`Expression` out of a set of callables. """ pass
[docs] class AlreadyVisitedException(Exception): """ Exception raised when a node has already been visited during a graph traversal. """ pass
[docs] class ExprHelpers(collections.abc.Mapping): """ Helper class used by all expression-like classes. It mainly implements the mapping protocol, with keys being parameters and values being subexpressions computing the value of these parameters. """
[docs] def __getitem__(self, k): return self.param_map[k]
[docs] def __len__(self): return len(self.param_map)
[docs] def __iter__(self): return iter(self.param_map)
# Keep the default behavior, to override the one from Mapping def __eq__(self, other): return self is other def __hash__(self): # consistent with definition of __eq__ return id(self)
[docs] def fold(self, f, init=None, visit_once=False): """ Fold the function ``f`` over the instance and all its parents listed in ``param_map`` attribute, deep first. :param f: Function to execute for each instance. It must take two parameters: * as first parameter: the return value of the previous invocation of ``f``. For the first call, ``init`` value is used. * as second parameter: the instance of :class:`ExprHelpers` that is being visited. :type f: collections.abc.Callable :param init: Initial value passed to ``f``. :type init: object :param visit_once: If ``True``, each :class:`ExprHelpers` will only be visited once. :type visit_once: bool """ visited = set() if visit_once else None return self._fold(f, init, visited=visited)
def _fold(self, f, x, visited): if visited is not None: if self in visited: raise AlreadyVisitedException else: visited.add(self) x = f(x, self) for param, param_expr in self.param_map.items(): with contextlib.suppress(AlreadyVisitedException): x = param_expr._fold(f, x, visited) return x
[docs] class ExpressionBase(ExprHelpers): """ Base class of all expressions proper. :param op: Operator to call for that expression :type op: Operator :param param_map: Mapping of parameter names to other :class:`ExpressionBase`. The mapping must maintain its order, so that it is possible to get the parameter list in the order it was defined in the sources. :type param_map: collections.OrderedDict """ def __init__(self, op, param_map): self.op = op # Map of parameters to other Expression self.param_map = param_map
[docs] @classmethod def cse(cls, expr_list): """ Apply a flavor of common subexpressions elimination to the list of :class:`ExpressionBase`. """ expr_map = {} return [ expr._cse(expr_map) for expr in expr_list ]
def _cse(self, expr_map): # Deep first self.param_map = OrderedDict( (param, param_expr._cse(expr_map=expr_map)) for param, param_expr in self.param_map.items() ) key = ( self.op.callable_, # get a nested tuple sorted by param name with the shape: # ((param, val), ...) tuple(sorted(self.param_map.items(), key=lambda k_v: k_v[0])) ) return expr_map.setdefault(key, self) def _find_shared_op_set(self, predicate, shared): shared_op_set = set() # propagate shared-ness to all parents if we are shared # otherwise, we call the clone predicate if not shared: shared = not predicate(self) if shared: shared_op_set.add(self.op) shared_op_set.update(utils.flatten_seq( param_expr._find_shared_op_set(predicate, shared=shared) for param_expr in self.param_map.values() )) return shared_op_set
[docs] def clone_by_predicate(self, predicate): """ Create a new :class:`ExpressionBase`, with the outer levels cloned and the inner sub expressions shared with the original one. :param predicate: Predicate called on :class:`ExpressionBase` used to know at what level the sub expressions should not be cloned anymore, but instead shared with the original :class:`ExpressionBase`. All parents of a shared expression will be shared no matter what to ensure consistent expressions. :type predicate: collections.abc.Callable """ shared_op_set = self._find_shared_op_set(predicate, False) return self._clone(shared_op_set)
def _clone(self, shared_op_set): op = self.op if op in shared_op_set: return self param_map = OrderedDict( (param, param_expr._clone(shared_op_set)) for param, param_expr in self.param_map.items() ) # create a new clone, with different UUID and ExprData return self.__class__( op=op, param_map=param_map, ) def __repr__(self): return '<Expression of {name} at {id}>'.format( name=self.op.get_name(full_qual=True, pretty=True), id=hex(id(self)) )
[docs] def format_structure(self, full_qual=True, graphviz=False): """ Format the expression in a human readable way. :param full_qual: If True, use fully qualified IDs. :type full_qual: bool :param graphviz: If True, return a graphviz description suitable for the ``dot`` graph rendering tool. :param graphviz: bool """ if graphviz: return self._format_graphviz_structure(full_qual, level=0, visited=set()) else: return self._format_structure(full_qual=full_qual)
@staticmethod def _format_structure_op_name(op): if isinstance(op, ConsumerOperator): return op.get_name(full_qual=True, pretty=True) elif isinstance(op, PrebuiltOperator): return '<provided>' else: return op.get_name(full_qual=True, pretty=True) def _format_structure(self, full_qual=True, indent=1): indent_str = 4 * ' ' * indent op_name = self._format_structure_op_name(self.op) out = '{op_name} ({value_type_name})'.format( op_name=op_name, value_type_name=utils.get_name(self.op.value_type, full_qual=full_qual, pretty=True, ), ) if self.param_map: out += ':\n' + indent_str + ('\n' + indent_str).join( '{param}: {desc}'.format(param=param, desc=desc._format_structure( full_qual=full_qual, indent=indent + 1 )) for param, desc in self.param_map.items() ) return out def _format_graphviz_structure(self, full_qual, level, visited): if self in visited: return '' else: visited.add(self) op_name = self._format_structure_op_name(self.op) # Use the Python id as it is guaranteed to be unique during the lifetime of # the object, so it is a good candidate to refer to a node uid = id(self) src_file, src_line = self.op.src_loc if src_file and src_line: src_loc = f'({src_file}:{src_line})' else: src_loc = '' out = ['{uid} [label="{op_name} {reusable}\\ntype: {value_type_name}\\n{loc}"]'.format( uid=uid, op_name=op_name, reusable='(reusable)' if self.op.reusable else '(non-reusable)', value_type_name=utils.get_name(self.op.value_type, full_qual=full_qual, pretty=True, ), loc=src_loc, )] if self.param_map: for param, param_expr in self.param_map.items(): out.append( '{param_uid} -> {uid} [label="{param}"]'.format( param_uid=id(param_expr), uid=uid, param=param, ) ) out.append( param_expr._format_graphviz_structure( full_qual=full_qual, level=level + 1, visited=visited, ) ) if level == 0: title = 'Structure of ' + self.get_id(qual=False) node_out = 'digraph structure {{\n{}\nlabel="' + title + '"\n}}' else: node_out = '{}' # dot seems to dislike empty line with just ";" return node_out.format(';\n'.join(line for line in out if line.strip()))
[docs] def get_id(self, *args, marked_expr_val_set=set(), **kwargs): """ Return the ID of the expression. :param marked_expr_val_set: If True, return a two-line strings with the second line containing marker characters under the ID of all :class:`ExpressionBase` specified in that set. :type marked_expr_val_set: set(ExpressionBase) :param with_tags: Add the tags extracted from the values of each :class:`ExprVal`. :type with_tags: bool :param remove_tags: Do not add the specified tags values. :type remove_tags: set(str) :param qual: If True, return the qualified ID. :type qual: bool :param full_qual: If True, return the fully qualified ID. :type full_qual: bool :param style: If ``"rst"``, return a Sphinx reStructuredText string with references to types. :type style: str or None :param hidden_callable_set: Hide the ID of all callables given in that set, including their parent's ID. :type hidden_callable_set: set(collections.abc.Callable) """ id_, marker = self._get_id(*args, marked_expr_val_set=marked_expr_val_set, **kwargs ) if marked_expr_val_set: return '\n'.join((id_, marker)) else: return id_
def _get_id(self, with_tags=True, remove_tags=set(), full_qual=True, qual=True, style=None, expr_val=None, marked_expr_val_set=None, hidden_callable_set=None): if hidden_callable_set is None: hidden_callable_set = set() # We always hide the Consumer operator since it does not add anything # to the ID. It is mostly an implementation detail. hidden_callable_set.update((Consumer, ExprData)) if expr_val is None: param_map = OrderedDict() # If we were asked about the ID of a specific value, make sure we # don't explore other paths that lead to different values else: param_map = expr_val.param_map return self._get_id_internal( param_map=param_map, expr_val=expr_val, with_tags=with_tags, remove_tags=remove_tags, marked_expr_val_set=marked_expr_val_set, hidden_callable_set=hidden_callable_set, full_qual=full_qual, qual=qual, style=style, ) def _get_id_internal(self, param_map, expr_val, with_tags, remove_tags, marked_expr_val_set, hidden_callable_set, full_qual, qual, style): separator = ':' marker_char = '^' get_id_kwargs = dict( full_qual=full_qual, qual=qual, style=style ) if marked_expr_val_set is None: marked_expr_val_set = set() # We only get the ID's of the parameter ExprVal that lead to the # ExprVal we are interested in param_id_map = OrderedDict( (param, param_expr._get_id( **get_id_kwargs, with_tags=with_tags, remove_tags=remove_tags, # Pass None when there is no value available, so we will get # a non-tagged ID when there is no value computed expr_val=param_map.get(param), marked_expr_val_set=marked_expr_val_set, hidden_callable_set=hidden_callable_set, )) for param, param_expr in self.param_map.items() if ( param_expr.op.callable_ not in hidden_callable_set # If the value is marked, the ID will not be hidden or param_map.get(param) in marked_expr_val_set ) ) def get_tags(expr_val): if expr_val is not None: if with_tags: tag = expr_val.format_tags(remove_tags) else: tag = '' return tag else: return '' def get_marker_char(expr_val): return marker_char if expr_val in marked_expr_val_set else ' ' tag_str = get_tags(expr_val) # No parameter to worry about if not param_id_map: id_ = self.op.get_id(**get_id_kwargs) + tag_str marker_str = get_marker_char(expr_val) * len(id_) return (id_, marker_str) # Recursively build an ID else: # Make a copy to be able to pop items from it _param_id_map = copy.copy(param_id_map) # Extract the first parameter to always use the prefix # notation, i.e. its value preceding the ID of the current # Expression param, (param_id, param_marker) = _param_id_map.popitem(last=False) # If the first param was not hidden, we handle it with the prefix # notation if param_id and param == utils.take_first(self.param_map.keys()): separator_spacing = ' ' * len(separator) param_str = param_id + separator param_id_map = _param_id_map else: separator_spacing = '' param_str = '' op_str = '{op}{tags}'.format( op=self.op.get_id(**get_id_kwargs), tags=tag_str, ) id_ = '{param_str}{op_str}'.format( param_str=param_str, op_str=op_str, ) marker_str = '{param_marker}{separator}{op_marker}'.format( param_marker=param_marker, separator=separator_spacing, op_marker=len(op_str) * get_marker_char(expr_val) ) # If there are some remaining parameters, show them in # parenthesis at the end of the ID if param_id_map: param_str = '(' + ','.join( param + '=' + param_id for param, (param_id, param_marker) # Sort by parameter name to have a stable ID in param_id_map.items() if param_id ) + ')' id_ += param_str param_marker = ' '.join( ' ' * (len(param) + 1) + param_marker for param, (param_id, param_marker) # Sort by parameter name to have a stable ID in param_id_map.items() if param_id ) + ' ' marker_str += ' ' + param_marker return (id_, marker_str)
[docs] def get_script(self, *args, **kwargs): """ Return a script equivalent to that :class:`ExpressionBase`. :Variable keyword arguments: Forwarded to :meth:`get_all_script`. """ return self.get_all_script([self], *args, **kwargs)
[docs] @classmethod def get_all_script(cls, expr_list, prefix='value', db_path='VALUE_DB.pickle.xz', db_relative_to=None, db=None, adaptor_cls=None): """ Return a script equivalent to executing the specified :class:`ExpressionBase`. :param expr_list: List of :class:`ExpressionBase` to turn into a script :type expr_list: list(ExpressionBase) :param prefix: Prefix used to name variables containing the values of expressions of ``expr_list``. :type prefix: str :param db_path: Path to the serialized :class:`ValueDB` that contains the :class:`FrozenExprVal` of the expressions in ``expr_list``. This is used to generate commented-out code allowing to deserialize values instead of calling the operator again. :type db_path: str :param relative_to: Passed to :meth:`ValueDB.from_path` when the :class:`ValueDB` is opened at the beginning of the script. This can typically be set to ``__file__``, so that the script will be able to refer to the :class:`ValueDB` using a relative path. :type relative_to: str :param db: :class:`ValueDB` containing the :class:`FrozenExprVal` that were computed when computing expressions of ``expr_list``. If None is provided, a new :class:`ValueDB` object will be built assuming ``expr_list`` is a list of :class:`ComputableExpression`. :type db: ValueDB or None :param adaptor_cls: If ``db=None``, used to build a new :class:`ValueDB`. :type adaptor_cls: type """ assert expr_list if db is None: froz_val_seq_list = FrozenExprValSeq.from_expr_list(expr_list) script_db = ScriptValueDB(ValueDB( froz_val_seq_list, adaptor_cls=adaptor_cls, )) else: script_db = ScriptValueDB(db) def make_comment(txt): joiner = '\n# ' return joiner + joiner.join( line for line in txt.splitlines() if line.strip() ) module_name_set = set() plain_name_cls_set = set() script = '' result_name_map = dict() reusable_outvar_map = dict() for i, expr in enumerate(expr_list): script += ( '#' * 80 + '\n# Computed expressions:' + make_comment(expr.get_id(mark_excep=True, full_qual=False)) + '\n' + make_comment(expr.format_structure()) + '\n\n' ) idt = IndentationManager(' ' * 4) expr_val_set = set(expr.get_all_vals()) result_name, snippet = expr._get_script( reusable_outvar_map=reusable_outvar_map, prefix=prefix + str(i), script_db=script_db, module_name_set=module_name_set, idt=idt, expr_val_set=expr_val_set, consumer_expr_stack=[], ) # ExprData must be printable to a string representation that can be # fed back to eval() expr_data = pprint.pformat(expr.data) expr_data_snippet = cls.EXPR_DATA_VAR_NAME + ' = ' + expr_data + '\n' script += ( expr_data_snippet + snippet + '\n' ) plain_name_cls_set.update(type(x) for x in expr.data.values()) result_name_map[expr] = result_name # Add all the imports header = ( '#! /usr/bin/env python3\n\n' + '\n'.join( f'import {name}' for name in sorted(module_name_set) if name != '__main__' ) + '\n' ) # Since the __repr__ output of ExprData will usually return snippets # assuming the class is directly available by its name, we need to make # sure it is imported properly for cls_ in plain_name_cls_set: mod_name = cls_.__module__ if mod_name == 'builtins': continue header += 'from {mod} import {cls}\n'.format( cls=cls_.__qualname__, mod=mod_name ) header += '\n\n' # If there is no ExprVal referenced by that script, we don't need # to access any ValueDB if expr_val_set: if db_relative_to is not None: db_relative_to = ', relative_to=' + db_relative_to else: db_relative_to = '' header += '{db} = {db_loader}({path}{db_relative_to})\n'.format( db=script_db.var_name, db_loader=utils.get_name(ValueDB.from_path, full_qual=True), path=repr(str(db_path)), db_relative_to=db_relative_to ) script = header + '\n' + script return (result_name_map, script)
EXPR_DATA_VAR_NAME = 'EXPR_DATA' def _get_script(self, reusable_outvar_map, *args, **kwargs): with contextlib.suppress(KeyError): outvar = reusable_outvar_map[self] return (outvar, '') outvar, script = self._get_script_internal( reusable_outvar_map, *args, **kwargs ) if self.op.reusable: reusable_outvar_map[self] = outvar return (outvar, script) def _get_script_internal(self, reusable_outvar_map, prefix, script_db, module_name_set, idt, expr_val_set, consumer_expr_stack, expr_val_seq_list=[]): def make_method_self_name(expr): return expr.op.value_type.__name__.replace('.', '') def make_var(name): # If the variable name already contains a double underscore, we use # 3 of them for the separator between the prefix and the name, so # it will avoid ambiguity between these cases: # prefix="prefix", name="my__name": # prefix___my__name # prefix="prefix__my", name="name": # prefix__my__name # Find the longest run of underscores nr_underscore = 0 current_counter = 0 for letter in name: if letter == '_': current_counter += 1 else: nr_underscore = max(current_counter, nr_underscore) current_counter = 0 sep = (nr_underscore + 1) * '_' name = sep + name if name else '' return prefix + name def make_comment(code, idt): prefix = idt + '# ' return prefix + prefix.join(code.splitlines(True)) + '\n' def make_serialized(expr_val, attr): obj = getattr(expr_val, attr) utils.is_serializable(obj, raise_excep=True) # When the ExprVal is from an Expression of the Consumer # operator, we directly print out the name of the function that was # selected since it is not serializable op = expr_val.expr.op if attr == 'value' and isinstance(op, ConsumerOperator): return Operator(obj).get_name(full_qual=True) elif attr == 'value' and isinstance(op, ExprDataOperator): return self.EXPR_DATA_VAR_NAME else: return script_db.get_snippet(expr_val, attr) def format_build_param(param_map): out = list() for param, expr_val in param_map.items(): try: value = format_expr_val(expr_val) # Cannot be serialized, so we skip it except utils.NotSerializableError: continue out.append('{param} = {value}'.format( param=param, value=value )) return '\n' + ',\n'.join(out) def format_expr_val(expr_val, com=lambda x: ' # ' + x): excep = expr_val.excep value = expr_val.value if excep is NoValue: comment = expr_val.get_id(full_qual=False) + ' (' + type(value).__name__ + ')' obj = make_serialized(expr_val, 'value') else: comment = type(excep).__name__ + ' raised when executing ' + expr_val.get_id() # Add extra comment marker for exception so the whole block can # be safely uncommented, without risking getting an exception # instead of the actual object. obj = '#' + make_serialized(expr_val, 'excep') comment = com(comment) if comment else '' return obj + comment # The parameter we are trying to compute cannot be computed and we will # just output a skeleton with a placeholder for the user to fill it is_user_defined = isinstance(self.op, PrebuiltOperator) and not expr_val_seq_list # Consumer operator is special since we don't compute anything to # get its value, it is just the name of a function if isinstance(self.op, ConsumerOperator): if not len(consumer_expr_stack) >= 2: return ('None', '') else: return (consumer_expr_stack[-2].op.get_name(full_qual=True), '') elif isinstance(self.op, ExprDataOperator): # When we actually have an ExprVal, use it so we have the right # UUID. if expr_val_set: # They should all have be computed using the same ExprData, # so we check that all values are the same expr_val_list = [expr_val.value for expr_val in expr_val_set] assert expr_val_list[1:] == expr_val_list[:-1] expr_data = utils.take_first(expr_val_set) return (format_expr_val(expr_data, lambda x: ''), '') # Prior to execution, we don't have an ExprVal yet else: is_user_defined = True if not prefix: prefix = self.op.get_name(full_qual=True) # That is not completely safe, but very unlikely to break in # practice prefix = prefix.replace('.', '_') script = '' # Create the code to build all the parameters and get their variable # name snippet_list = list() param_var_map = OrderedDict() # Reusable parameter values are output first, so that non-reusable # parameters will be inside the for loops if any to be recomputed # for every combination of reusable parameters. def get_param_map(reusable): return OrderedDict( (param, param_expr) for param, param_expr in self.param_map.items() if bool(param_expr.op.reusable) == reusable ) param_map_chain = itertools.chain( get_param_map(reusable=True).items(), get_param_map(reusable=False).items(), ) first_param = utils.take_first(self.param_map.keys()) for param, param_expr in param_map_chain: # Rename "self" parameter for more natural-looking output if param == first_param and self.op.is_method: is_meth_first_param = True pretty_param = make_method_self_name(param_expr) else: is_meth_first_param = False pretty_param = param param_prefix = make_var(pretty_param) # Get the set of ExprVal that were used to compute the # ExprVal given in expr_val_set param_expr_val_set = set() for expr_val in expr_val_set: # When there is no value for that parameter, that means it # could not be computed and therefore we skip that result with contextlib.suppress(KeyError): param_expr_val = expr_val.param_map[param] param_expr_val_set.add(param_expr_val) # Do a deep first traversal of the expression. param_outvar, param_out = param_expr._get_script( reusable_outvar_map, param_prefix, script_db, module_name_set, idt, param_expr_val_set, consumer_expr_stack=consumer_expr_stack + [self], ) snippet_list.append(param_out) if is_meth_first_param: # Save a reference for future manipulation obj = param_outvar else: param_var_map[pretty_param] = param_outvar script += ''.join(snippet_list) # We now know our current indentation. The parameters will have indented # us if they are generator functions. idt_str = str(idt) if param_var_map: param_spec = ', '.join( f'{param}={varname}' for param, varname in param_var_map.items() ) else: param_spec = '' do_not_call_callable = is_user_defined or isinstance(self.op, PrebuiltOperator) op_callable = self.op.get_name(full_qual=True) is_genfunc = self.op.is_genfunc # If it is a prebuilt operator and only one value is available, we just # replace the operator by it. That is valid since we will never end up # needing to call that operator in a different way. if ( isinstance(self.op, PrebuiltOperator) and ( not expr_val_seq_list or ( len(expr_val_seq_list) == 1 and len(expr_val_seq_list[0].expr_val_list) == 1 ) ) ): is_genfunc = False # The call expression is <obj>.<method>(...) instead of # <method>(self=<obj>, ...) elif self.op.is_method: op_callable = obj + '.' + self.op.callable_.__name__ module_name_set.add(self.op.mod_name) # If the operator is a true generator function, we need to indent all # the code that depdends on us if is_genfunc: idt.indent() # Name of the variable holding the result of this expression outname = make_var('') # Dump the source file and line information src_file, src_line = self.op.src_loc if src_file and src_line: src_loc = '({src_file}:{src_line})'.format( src_line=src_line, src_file=src_file, ) else: src_loc = '' script += '\n' script += make_comment('{id}{src_loc}'.format( id=self.get_id(with_tags=False, full_qual=False), src_loc='\n' + src_loc if src_loc else '' ), idt_str) # If no serialized value is available if is_user_defined: script += make_comment('User-defined:', idt_str) script += '{idt}{outname} = \n'.format( outname=outname, idt=idt_str, ) # Dump the serialized value for expr_val_seq in expr_val_seq_list: # Make a copy to allow modifying the parameter names param_map = copy.copy(expr_val_seq.param_map) expr_val_list = expr_val_seq.expr_val_list # Restrict the list of ExprVal we are considering to the ones # we were asked about expr_val_list = [ expr_val for expr_val in expr_val_list if expr_val in expr_val_set ] # Filter out values where nothing was computed and there was # no exception at this step either expr_val_list = [ expr_val for expr_val in expr_val_list if ( (expr_val.value is not NoValue) or (expr_val.excep is not NoValue) ) ] if not expr_val_list: continue # Rename "self" parameter to the name of the variable we are # going to apply the method on if self.op.is_method: first_param = utils.take_first(param_map) param_expr_val = param_map.pop(first_param) self_param = make_var(make_method_self_name(param_expr_val.expr)) param_map[self_param] = param_expr_val # Multiple values to loop over try: if is_genfunc: serialized_list = '\n' + idt.style + ('\n' + idt.style).join( format_expr_val(expr_val, lambda x: ', # ' + x) for expr_val in expr_val_list ) + '\n' serialized_instance = 'for {outname} in ({values}):'.format( outname=outname, values=serialized_list ) # Just one value elif expr_val_list: serialized_instance = '{outname} = {value}'.format( outname=outname, value=format_expr_val(expr_val_list[0]) ) # The values cannot be serialized so we hide them except utils.NotSerializableError: pass else: # Prebuilt operators use that code to restore the serialized # value, since they don't come from the execution of anything. if do_not_call_callable: script += ( idt_str + serialized_instance.replace('\n', '\n' + idt_str) + '\n' ) else: script += make_comment(serialized_instance, idt_str) # Show the origin of the values we have shown if param_map: origin = 'Built using:' + format_build_param( param_map ) + '\n' script += make_comment(origin, idt_str) # Dump the code to compute the values, unless it is a prebuilt op since it # has already been done if not do_not_call_callable: if is_genfunc: script += '{idt}for {output} in {op}({param}):\n'.format( output=outname, op=op_callable, param=param_spec, idt=idt_str ) else: script += '{idt}{output} = {op}({param})\n'.format( output=outname, op=op_callable, param=param_spec, idt=idt_str, ) return outname, script
[docs] class ComputableExpression(ExpressionBase): """ Expression that also contains its computed values. :param data: :class:`ExprData` to use when computing the values of the expression. The ``data`` of the root :class:`ComputableExpression` will be used for all the subexpressions as well during the execution. :type data: ExprData or None .. seealso:: :class:`ExpressionBase` Instances of this class contains values, whereas :class:`Expression` do not. """ def __init__(self, op, param_map, data=None): self.uuid = utils.create_uuid() self.expr_val_seq_list = list() self.data = data if data is not None else ExprData() super().__init__(op=op, param_map=param_map)
[docs] @classmethod def from_expr(cls, expr, **kwargs): """ Build an instance from an :class:`ExpressionBase` :Variable keyword arguments: Forwarded to ``__init__`` """ param_map = OrderedDict( (param, cls.from_expr(param_expr)) for param, param_expr in expr.param_map.items() ) return cls( op=expr.op, param_map=param_map, **kwargs, )
[docs] @classmethod def from_expr_list(cls, expr_list): """ Build an a list of instances from a list of :class:`ExpressionBase`. .. note:: Common Subexpression Elimination using :meth:`ExpressionBase.cse` will be applied on the resulting list. """ # Apply Common Subexpression Elimination to ExpressionBase before they # are run return cls.cse( cls.from_expr(expr) for expr in expr_list )
def _get_script(self, *args, **kwargs): return super()._get_script(*args, **kwargs, expr_val_seq_list=self.expr_val_seq_list )
[docs] def get_id(self, mark_excep=False, marked_expr_val_set=set(), **kwargs): """ Return the ID of the expression. :param mark_excep: Mark expressions listed by :meth:`get_excep`. :type mark_excep: bool :param marked_expr_val_set: If ``mark_excep=False`` mark these exceptions, otherwise it is ignored. :type marked_expr_val_set: bool .. seealso:: :meth:`ExpressionBase.get_id` """ # Mark all the values that failed to be computed because of an # exception marked_expr_val_set = self.get_excep() if mark_excep else marked_expr_val_set return super().get_id( marked_expr_val_set=marked_expr_val_set, **kwargs )
[docs] def find_expr_val_seq_list(self, param_map): """ Return a list of :class:`ExprValSeq` that were computed using the given parameters. :param param_map: Mapping of parameter names to values :type param_map: collections.OrderedDict .. note:: ``param_map`` will be checked as a subset of the parameters. """ def value_map(param_map): return ExprValParamMap( # Extract the actual value from ExprVal (param, expr_val.value) for param, expr_val in param_map.items() ) param_map = value_map(param_map) # Find the results that are matching the param_map return [ expr_val_seq for expr_val_seq in self.expr_val_seq_list # Check if param_map is a subset of the param_map # of the ExprVal. That allows checking for reusable parameters # only. if param_map.items() <= value_map(expr_val_seq.param_map).items() ]
[docs] @classmethod def execute_all(cls, expr_list, *args, **kwargs): """ Execute all expressions of ``expr_list`` after applying Common Expression Elimination and yield tuples of (:class:`ExpressionBase`, :class:`ExprVal`). :param expr_list: List of expressions to execute :type expr_list: list(ExpressionBase) :Variable keyword arguments: Forwarded to :meth:`execute`. .. seealso: :meth:`execute` and :meth:`from_expr_list`. """ for comp_expr in cls.from_expr_list(expr_list): for expr_val in comp_expr.execute(*args, **kwargs): yield (comp_expr, expr_val)
def _clone_consumer(self, consumer_expr_stack): expr = self if isinstance(expr.op, ConsumerOperator): try: consumer = consumer_expr_stack[-2].op.callable_ except IndexError: consumer = None # Build a new ConsumerOperator that references the right consumer, # and a new Expression to go with it expr = expr.__class__( op=ConsumerOperator(consumer), param_map=OrderedDict(), ) else: # Clone the Expressions referencing their consumer, so each of # their consumer will get a clone of them. Each clone will # reference a different consumer through its parameter. if any( isinstance(param_expr.op, ConsumerOperator) for param_expr in expr.param_map.values() ): expr = copy.copy(expr) expr.param_map = OrderedDict( (param, param_expr._clone_consumer( consumer_expr_stack + [expr], )) for param, param_expr in expr.param_map.items() ) return expr def _clone_expr_data(self, data): # Make sure that ExprDataOperator expressions are cloned, so we don't # end up sharing the ExprData between different expressions. if isinstance(self.op, ExprDataOperator): expr = ComputableExpression(ExprDataOperator(data), {}) else: expr = self self.param_map = OrderedDict( (param, param_expr._clone_expr_data(data)) for param, param_expr in expr.param_map.items() ) return expr
[docs] def prepare_execute(self): """ Prepare the expression for execution. This includes appropriate cloning of expressions using :class:`ConsumerOperator` and :class:`ExprData`. .. note:: Calling this method manually is only useful to get more accurate graphs when showing the structure of the expression, since it is done in any case by :meth:`execute`.` """ # Make sure the Expressions referencing their Consumer get # appropriately cloned. self._clone_consumer([]) self._clone_expr_data(self.data) return self
[docs] def execute(self, post_compute_cb=None): """ Execute the expression and yield its :class:`ExprVal`. :param post_compute_cb: Callback called after every computed value. It takes two parameters: 1) the :class:`ExprVal` that was just computed and 2) a boolean that is ``True`` if the :class:`ExprVal` was merely reused and ``False`` if it was actually computed. :type post_compute_cb: collections.abc.Callable .. note:: The :meth:`prepare_execute` is called prior to executing. """ # Call it in case it was not already done. self.prepare_execute() return self._execute(post_compute_cb)
def _execute(self, post_compute_cb): # Lazily compute the values of the Expression, trying to use # already computed values when possible # Check if we are allowed to reuse an instance that has already # been produced reusable = self.op.reusable def filter_param_exec_map(param_map, reusable): return OrderedDict( ((param, param_expr), param_expr._execute( post_compute_cb=post_compute_cb, )) for param, param_expr in param_map.items() if param_expr.op.reusable == reusable ) # Get all the generators for reusable parameters reusable_param_exec_map = filter_param_exec_map(self.param_map, True) # Consume all the reusable parameters, since they are generators for param_map in ExprValParamMap.from_gen_map_product(reusable_param_exec_map): # Check if some ExprVal are already available for the current # set of reusable parameters. Non-reusable parameters are not # considered since they would be different every time in any case. if reusable and not param_map.is_partial(ignore_error=True): # Check if we have already computed something for that # Expression and that set of parameter values expr_val_seq_list = self.find_expr_val_seq_list(param_map) if expr_val_seq_list: # Reusable objects should have only one ExprValSeq # that was computed with a given param_map assert len(expr_val_seq_list) == 1 expr_val_seq = expr_val_seq_list[0] yield from expr_val_seq.iter_expr_val() continue # Only compute the non-reusable parameters if all the reusable one # are available, otherwise that is pointless if not param_map.is_partial(): # Non-reusable parameters must be computed every time, and we # don't take their cartesian product since we have fresh values # for all operator calls. nonreusable_param_exec_map = filter_param_exec_map(self.param_map, False) param_map.update(ExprValParamMap.from_gen_map(nonreusable_param_exec_map)) # Propagate exceptions if some parameters did not execute # successfully. if param_map.is_partial(): expr_val = ExprVal(self, param_map) expr_val_seq = ExprValSeq.from_one_expr_val(expr_val) self.expr_val_seq_list.append(expr_val_seq) yield expr_val continue # If no value has been found, compute it and save the results in # a list. expr_val_seq = ExprValSeq.from_expr( expr=self, param_map=param_map, post_compute_cb=post_compute_cb ) self.expr_val_seq_list.append(expr_val_seq) yield from expr_val_seq.iter_expr_val()
[docs] def get_all_vals(self): """ Get all :class:`ExprVal` that were computed for that expression. """ return utils.flatten_seq( expr_val_seq.expr_val_list for expr_val_seq in self.expr_val_seq_list )
[docs] def get_excep(self): """ Get all :class:`ExprVal` containing an exception that are reachable from :class:`ExprVal` computed for that expression. """ return set(utils.flatten_seq( expr_val.get_excep() for expr_val in self.get_all_vals() ))
[docs] class ClassContext: """ Collect callables and types that put together will be used to create :class:`Expression`. :param op_map: Mapping of types to list of :class:`Operator` that can produce that type. :type op_map: dict(type, list(Operator)) :param cls_map: Mapping of types to a list of compatible types. A common "compatibility" relation is ``issubclass``. In that case, keys are classes and values are the list of all (direct and indirect) subclasses. :type cls_map: dict(type, list(type)) """ COMPAT_CLS = issubclass """ Callable defining the compatibility relation between two classes. It will be called on two classes and shall return ``True`` if the classes are compatible, ``False`` otherwise. """ def __init__(self, op_map, cls_map): self.op_map = op_map self.cls_map = cls_map @staticmethod def _build_cls_map(op_set, compat_cls): # Pool of classes that can be produced by the ops produced_pool = {op.value_type for op in op_set} # Set of all types that can be depended upon. All base class of types that # are actually produced are also part of this set, since they can be # dependended upon as well. cls_set = set() for produced in produced_pool: cls_set.update(utils.get_mro(produced)) cls_set.discard(object) cls_set.discard(type(None)) # Map all types to the subclasses that can be used when the type is # requested. return { # Make sure the list is deduplicated by building a set first cls: sorted({ subcls for subcls in produced_pool if compat_cls(subcls, cls) }, key=lambda cls: cls.__qualname__) for cls in cls_set } # Map of all produced types to a set of what operator can create them @staticmethod def _build_op_map(op_set, cls_map): # Make sure that the provided PrebuiltOperator will be the only ones used # to provide their types only_prebuilt_cls = set(itertools.chain.from_iterable( # Augment the list of classes that can only be provided by a prebuilt # Operator with all the compatible classes cls_map[op.obj_type] for op in op_set if isinstance(op, PrebuiltOperator) )) op_map = OrderedDict() for op in op_set: param_map, produced = op.prototype is_prebuilt_op = isinstance(op, PrebuiltOperator) if is_prebuilt_op or produced not in only_prebuilt_cls: op_map.setdefault(produced, OrderedSet()).add(op) return op_map @staticmethod def _filter_op_map(op_map, cls_map, restricted_pattern_set, forbidden_pattern_set): cls_map = copy.copy(cls_map) # Restrict the production of some types to a set of operators. restricted_op_set = { # Make sure that we only use what is available op for op in itertools.chain.from_iterable(op_map.values()) if utils.match_name(op.get_name(full_qual=True), restricted_pattern_set) } def apply_restrict(produced, op_set, restricted_op_set, cls_map): restricted_op_set = { op for op in restricted_op_set if op.value_type is produced } if restricted_op_set: # Make sure there is no other compatible type, so the only # operators that will be used to satisfy that dependency will # be one of the restricted_op_set item. cls_map[produced] = [produced] return restricted_op_set else: if utils.match_base_cls(produced, forbidden_pattern_set): return set() else: return op_set op_map = { produced: apply_restrict(produced, op_set, restricted_op_set, cls_map) for produced, op_set in op_map.items() } # Remove entries with empty op_set op_map = { produced: op_set for produced, op_set in op_map.items() if op_set } return (op_map, cls_map)
[docs] @classmethod def from_op_set(cls, op_set, forbidden_pattern_set=set(), restricted_pattern_set=set(), compat_cls=COMPAT_CLS): """ Build an :class:`ClassContext` out of a set of :class:`Operator`. :param op_set: Set of :class:`Operator` to consider. :type op_set: set(Operator) :param forbidden_pattern_set: Set :func:`fnmatch.fnmatch` type name patterns that are not allowed to be produced. :type forbidden_pattern_set: set(str) :parm restricted_pattern_set: Set of :func:`fnmatch.fnmatch` :class:`Operator` ID pattern. Operators matching that pattern will be the only one allowed to produce the type they are producing, or any other compatible type. :type restricted_pattern_set: set(str) :param compat_cls: Callable defining the compatibility relation between two classes. It will be called on two classes and shall return ``True`` if the classes are compatible, ``False`` otherwise. :type compat_cls: collections.abc.Callable """ # Build the mapping of compatible classes cls_map = cls._build_cls_map(op_set, compat_cls) # Build the mapping of classes to producing operators op_map = cls._build_op_map(op_set, cls_map) op_map, cls_map = cls._filter_op_map(op_map, cls_map, restricted_pattern_set, forbidden_pattern_set ) return cls( op_map=op_map, cls_map=cls_map )
[docs] def build_expr_list(self, result_op_set, non_produced_handler='raise', cycle_handler='raise'): """ Build a list of consistent :class:`Expression`. :param result_op_set: Set of :class:`Operator` that will constitute the roots of expressions. :type result_op_set: set(Operator) :param non_produced_handler: Handler to be used when a needed type is produced by no :class:`Operator`: * ``raise``: will raise a :class:`NoOperatorError` exception * ``ignore``: the expression will be ignored * a callback: called with the following parameters: * __qualname__ of the type that cannot be produced. * name of the :class:`Operator` for which a value of the type was needed. * name of the parameter for which a value of the type was needed. * a stack (tuple) of callables which is the path leading from the root expression to the operator for which the type was needed. :type non_produced_handler: str or collections.abc.Callable :param cycle_handler: Handler to be used when a cycle is detected in the built :class:`Expression`: * ``raise``: will raise a :class:`CycleError` exception * ``ignore``: the expression will be ignored * a callback: called with a tuple of callables constituting the cycle. :type cycle_handler: str or collections.abc.Callable All combinations of compatible classes and operators will be generated. """ op_map = copy.copy(self.op_map) cls_map = { cls: compat_cls_set for cls, compat_cls_set in self.cls_map.items() # If there is at least one compatible subclass that is produced, we # keep it, otherwise it will mislead _build_expr into thinking the # class can be built where in fact it cannot if compat_cls_set & op_map.keys() } # Dummy placeholders that will get fixed up later right before # execution op_map[Consumer] = {ConsumerOperator()} op_map[ExprData] = {ExprDataOperator()} cls_map[ExprData] = [ExprData] cls_map[Consumer] = [Consumer] expr_list = list() for result_op in result_op_set: expr_gen = self._build_expr(result_op, op_map, cls_map, op_stack=[], non_produced_handler=non_produced_handler, cycle_handler=cycle_handler, ) for expr in expr_gen: if expr.validate(cls_map): expr_list.append(expr) # Apply CSE to get a cleaner result return Expression.cse(expr_list)
@classmethod def _build_expr(cls, op, op_map, cls_map, op_stack, non_produced_handler, cycle_handler): new_op_stack = [op] + op_stack # We detected a cyclic dependency if op in op_stack: if cycle_handler == 'ignore': return elif callable(cycle_handler): cycle_handler(tuple(op.callable_ for op in new_op_stack)) return elif cycle_handler == 'raise': raise CycleError('Cyclic dependency found: {path}'.format( path=' -> '.join( op.name for op in new_op_stack ) )) else: raise ValueError('Invalid cycle_handler') op_stack = new_op_stack param_cls_map = op.prototype[0] if param_cls_map: param_list, cls_list = zip(*param_cls_map.items()) # When no parameter is needed else: yield Expression(op, OrderedDict()) return # Build all the possible combinations of types suitable as parameters cls_combis = [ cls_map.get(cls, [None]) for cls in cls_list ] # Only keep the classes for "self" on which the method can be applied if op.is_method: def keep_cls(cls): # When UnboundMethod are used, it is expected that they are # defined for each class where a given method is available. # Therefore we can directly check if it should be selected or # not if isinstance(op.callable_, UnboundMethod): return cls is op.callable_.cls try: looked_up = getattr(cls, op.callable_.__name__) except AttributeError: return True else: return looked_up is op.callable_ cls_combis[0] = [ cls for cls in cls_combis[0] # If the method with the same name would resolve to "op", then # we keep this class as a candidate for "self", otherwise we # discard it if cls is None or keep_cls(cls) ] param_map_list = [] # For all possible combinations of types for cls_combi in itertools.product(*cls_combis): cls_combi = list(cls_combi) op_combis = [ op_map[cls] if cls is not None else {None} for cls in cls_combi ] # For all the possible combinations of operators returning these # types for op_combi in itertools.product(*op_combis): op_combi = list(op_combi) # Build all the expressions that can produce a value for each # parameter expr_combis = [] for param, param_op in zip(param_list, op_combi): optional = param in op.optional_params if param_op is None: expr_list = [] else: expr_list = list(cls._build_expr( param_op, op_map, cls_map, op_stack, non_produced_handler if not optional else 'ignore', cycle_handler if not optional else 'ignore', )) # Use a None placeholder for the cases where we could # not build any expression. # We really don't want to add an empty expr_list to # expr_combis, since that would kill all the otherwise # possible expressions. The empty list is the "0" of the # cartesian product: # [] == list(itertools.product(..., [], ...)) expr_list = expr_list if expr_list else [None] expr_combis.append(expr_list) # For all the possible combinations of expressions producing a # value for the parameters for expr_combi in itertools.product(*expr_combis): param_map_list.append(OrderedDict(zip(param_list, expr_combi))) # Select the expressions to be run (handling parameters with default values) # and log errors for expressions that we cannot build for param in param_list: optional = param in op.optional_params # If we are never able to create a value for that optional # parameter, just ignore it by removing it from all the # param_map as it has a default value. if optional and all( param_map[param] is None for param_map in param_map_list ): for param_map in param_map_list: del param_map[param] # Otherwise, we just get rid of the expressions that would use # the default value. # If the parameter is not optional, we cannot do anything if there # is no expression to generate it, so we also get rid of these # expressions. else: param_map_list = [ param_map for param_map in param_map_list if param_map[param] is not None ] # If we just made the list empty because of removing some # expressions, we have an issue that needs to be logged if not param_map_list: wanted_cls = param_cls_map[param] if non_produced_handler == 'ignore': pass elif callable(non_produced_handler): non_produced_handler(wanted_cls.__qualname__, op.name, param, tuple(op.resolved_callable for op in op_stack) ) elif non_produced_handler == 'raise': raise NoOperatorError('No operator can produce instances of {cls} needed for {op} (parameter "{param}" along path {path})'.format( cls=wanted_cls.__qualname__, op=op.name, param=param, path=' -> '.join(op.name for op in op_stack) )) else: raise ValueError('Invalid non_produced_handler') continue for param_map in param_map_list: yield Expression(op, param_map)
[docs] class Expression(ExpressionBase): """ Static subclass :class:`ExpressionBase` tying :class:`Operator` with its parameters. Instances of this class do not contain any computed values, they can be considered as read-only structure. .. seealso:: :class:`ComputableExpression`. """
[docs] def validate(self, cls_map): """ Check that the Expression does not involve two classes that are compatible. This ensures that only one class of each "category" will be used in each expression, so that all references to that class will point to the same expression after :meth:`ExpressionBase.cse` is applied. """ valid, cls_used = self._get_used_cls() if not valid: return False # Use sets for faster inclusion test cls_map = { cls: set(cls_list) for cls, cls_list in cls_map.items() } return all( cls1 not in cls_map[cls2] and cls2 not in cls_map[cls1] for cls1, cls2 in itertools.combinations(cls_used, 2) )
def _get_used_cls(self): def go(expr, type_map): value_type = expr.op.value_type # If there was already an Expression producing that type, the Expression # is not valid try: found_callable = type_map[value_type] except KeyError: pass else: if found_callable is not expr.op.callable_: return False type_map[value_type] = expr.op.callable_ return all( go(param_expr, type_map) for param_expr in expr.param_map.values() ) type_map = {} valid = go(self, type_map) return (valid, set(type_map.keys()))
[docs] class AnnotationError(Exception): """ Exception raised when there is a missing or invalid PEP 484 annotation. """ pass
[docs] class PartialAnnotationError(AnnotationError): """ Exception raised when there is a missing PEP 484 annotation, but other parameters are annotated. This usually indicates a missing annotation in a function otherwise supposed to be annotated. """ pass
[docs] class ForcedParamType: """ Base class for types placeholders used when forcing the value of a parameter using :meth:`Operator.force_param`. """ pass
[docs] class UnboundMethod: """ Wrap a function in a similar way to Python 2 unbound methods. :param callable_: method to wrap. :type callable_: collections.abc.Callable :param cls: Class on which the method is available. :type cls: type .. note:: It is generally assumed that if a given method is wrapped in an :class:`UnboundMethod`, all subclasses will also have that method wrapped the same way. .. note:: :class:`UnboundMethod` can wrap things such as :class:`staticmethod` or :class:`classmethod` as well, it is simply a wrapper used to attach the class of origin. """ def __init__(self, callable_, cls): self.cls = cls self.__wrapped__ = callable_ self.__module__ = callable_.__module__ self.__name__ = callable_.__name__ self.__qualname__ = callable_.__qualname__ # Use a non-regular name for "self" so that "self" can be used again in the # kwargs def __call__(__UnboundMethod_self__, *args, **kwargs): return __UnboundMethod_self__.__wrapped__(*args, **kwargs) def __eq__(self, other): return ( isinstance(other, type(self)) and self.__wrapped__ == other.__wrapped__ and self.cls == other.cls ) def __hash__(self): return hash(self.__wrapped__) ^ hash(self.cls) def __repr__(self): return '<UnboundMethod of {cls}.{meth} at {id}>'.format( cls=self.cls.__qualname__, meth=self.__wrapped__.__name__, id=hex(id(self)) )
[docs] class Operator: """ Wrap a callable. :param callable_: callable to represent. :type callable_: collections.abc.Callable :param non_reusable_type_set: Set of non reusable types. If the callable produces a subclass of these types, it will be considered as non-reusable. :type non_reusable_type_set: set(type) :param tags_getter: Callback used to get the tags for the objects returned by the callable. It takes the object as argument, and is expected to return a mapping of tags names to values. :type tags_getter: collections.abc.Callable """ def __init__(self, callable_, non_reusable_type_set=None, tags_getter=None): if non_reusable_type_set is None: non_reusable_type_set = set() if not tags_getter: def tags_getter(v): return {} self.tags_getter = tags_getter assert callable(callable_) self.callable_ = callable_ if isinstance(callable_, UnboundMethod): sig_f = callable_.__wrapped__ elif inspect.isclass(callable_): sig_f = callable_.__init__ else: sig_f = callable_ def has_annotations(x): while True: try: x.__annotations__ except AttributeError: try: x = x.__wrapped__ except AttributeError: return False else: return True # This has functionally no use but provides a massive speedup by # skipping all the callables that have arguments without default values # but no annotation try: code = sig_f.__code__ except AttributeError: pass else: non_default_args = ( code.co_argcount + code.co_kwonlyargcount - # Discount the parameters that have a default value, as they don't # necessarily need an annotation to be useful len(sig_f.__defaults__ or tuple()) ) if non_default_args and not has_annotations(sig_f): raise AnnotationError( 'Missing annotation for operator "{op}"'.format( op=self.name, ) ) signature = inspect.signature(sig_f) annotations = { param.name: param.annotation for param in signature.parameters.values() if param.annotation != param.empty } if signature.return_annotation != signature.empty: annotations['return'] = signature.return_annotation self.annotations = annotations self.signature = signature self.ignored_param = { param for param, param_spec in self.signature.parameters.items() # Ignore the parameters that have a default value without any # annotation if ( param_spec.default != inspect.Parameter.empty and param_spec.annotation == inspect.Parameter.empty ) } self.optional_params = { param for param, param_spec in self.signature.parameters.items() # Parameters with a default value and and an annotation are # optional. if ( param_spec.default != inspect.Parameter.empty and param_spec.annotation != inspect.Parameter.empty ) } def check_prototype(prototype): # make sure we got some usable type annotations that only consist # in classes, rather than things coming from the typing module param_map, value_type = prototype def is_hint(annot): # Prevent things like typing.Dict[], since it will raise # exceptions when being used along issubclass() # # Note: We cannot use isinstance/issubclass as this would # result in: TypeError: Class typing.Generic cannot be used # with class or instance checks return ( typing.Generic in inspect.getmro(annot) or # Normal classes like dict can now be parametrized, e.g. # dict[int, str]. get_origin(dict) == None but # get_origin(dict[str, int]) == dict typing.get_origin(annot) is not None ) if not all( (hasattr(typing, 'Self') and annot == typing.Self) or isinstance(annot, type) and ( isinstance(annot, typing.TypeVar) or not is_hint(annot) ) for annot in {value_type, *param_map.values()} ): raise ValueError('Annotations must be classes or typing.TypeVar') # Tentative prototype, as it is needed by is_factory_cls_method and # value_type self.prototype = self._get_prototype() # Check the prototype before attempting to use it with # is_factory_cls_method or anything else check_prototype(self.prototype) # Special support of return type annotation for factory classmethod if self.is_factory_cls_method: # If the return annotation type is an (indirect) base class of # the original annotation, we replace the annotation by the # subclass That allows implementing factory classmethods # easily. try: self_cls = self.resolved_callable.__self__ except AttributeError: if inspect.isclass(self.callable_): self_cls = self.callable_ else: raise TypeError(f'Could not determine the return type of the factory method {self.callable_.__qualname__}') self.annotations['return'] = self.resolved_callable.__self__ # Refresh the prototype self.prototype = self._get_prototype() self.reusable = self.value_type not in non_reusable_type_set check_prototype(self.prototype)
[docs] @property def callable_globals(self): """ Returns a dictionnary of global variables as seen by the callable. """ def make_namespace(name, obj): splitted = name.split('.', 1) try: name, remainder = splitted except ValueError: name = splitted[0] remainder = '' if remainder: obj = types.SimpleNamespace( **make_namespace(remainder, obj) ) return {name: obj} try: globals_ = self.resolved_callable.__globals__ or {} except AttributeError: globals_ = {} if isinstance(self.callable_, UnboundMethod): globals_ = { **globals_, **self.callable_.cls.__dict__, } else: globals_ = globals_.copy() # Make sure the class name can be resolved if isinstance(self.callable_, UnboundMethod): # If we have a nested class, it will need a dummy container that it # can be looked up on ns = make_namespace( name=self.callable_.cls.__qualname__, obj=self.callable_.cls, ) # Only update the globals_ if it was not there already if not ns.keys() & globals_.keys(): globals_.update(ns) # Work around this bug: # https://bugs.python.org/issue43102 try: globals_['__builtins__'] = globals_['__builtins__'] or {} except KeyError: pass return globals_
def __repr__(self): return '<Operator of ' + str(self.callable_) + '>'
[docs] def force_param(self, param_value_map, tags_getter=None): """ Force the value of a given parameter of the callable. :param param_value_map: Mapping of parameter names to list of values that this parameter should take. :type param_value_map: dict(str, list(object)) :param tags_getter: Callable used to return the tags for the values of the parameter. Same as :class:`Operator`'s ``__init__`` parameter. :type tags_getter: collections.abc.Callable """ param_set = list(self.signature.parameters.keys()) unknown_param = sorted( param for param in param_value_map.keys() if param not in param_set ) if unknown_param: raise KeyError(unknown_param) prebuilt_op_set = set() for param, value_list in param_value_map.items(): # Get the most derived class that is in common between all # instances value_type = utils.get_common_base(type(v) for v in value_list) try: param_annot = self.annotations[param] except KeyError: pass else: # If there was an annotation, make sure the type we computed is # compatible with what the annotation specifies. assert ClassContext.COMPAT_CLS(value_type, param_annot) # We do not inherit from value_type, since it may not always work, # e.g. subclassing bool is forbidden. Therefore, it is purely used # as a unique marker. class ParamType(ForcedParamType): pass # References to this type won't be serializable with pickle, but # instances will be. This is because pickle checks that only one # type exists with a given __module__ and __qualname__. ParamType.__name__ = value_type.__name__ ParamType.__qualname__ = value_type.__qualname__ ParamType.__module__ = value_type.__module__ # Create an artificial new type that will only be produced by # the PrebuiltOperator self.annotations[param] = ParamType prebuilt_op_set.add( PrebuiltOperator(ParamType, value_list, tags_getter=tags_getter )) # Make sure the parameter is not optional anymore self.optional_params.discard(param) self.ignored_param.discard(param) return prebuilt_op_set
[docs] @property def resolved_callable(self): """ Fully unwrapped callable. If the callable is a class, it's ``__init__`` will be returned. """ unwrapped = self.unwrapped_callable # We use __init__ when confronted to a class if inspect.isclass(unwrapped): return unwrapped.__init__ return unwrapped
[docs] @property def unwrapped_callable(self): """ Fully unwrapped callable. .. seealso:: :func:`functools.wraps` """ return inspect.unwrap(self.callable_)
[docs] def get_name(self, *args, **kwargs): """ Get the name of the callable, or None if no name can be retrieved. """ try: return utils.get_name(self._unwrapped_unbound, *args, **kwargs) except AttributeError: return None
[docs] def get_id(self, full_qual=True, qual=True, style=None): """ Get the ID of the operator. :param full_qual: Fully qualified name, including the module name. :type full_qual: bool :param qual: Qualified name. :type qual: bool :param style: If ``rst``, a Sphinx reStructuredText string is returned. :type style: str or None """ if style == 'rst': if self.is_factory_cls_method: qualname = utils.get_name(self.value_type, full_qual=True, pretty=True) else: qualname = self.get_name(full_qual=True, pretty=True) name = self.get_id(full_qual=full_qual, qual=qual, style=None) if self.is_class: role = 'class' elif self.is_method or self.is_static_method or self.is_cls_method: role = 'meth' else: role = 'func' return f':{role}:`{name}<{qualname}>`' else: # Factory classmethods are replaced by the class name when not # asking for a qualified ID if not (qual or full_qual) and self.is_factory_cls_method: return utils.get_name(self.value_type, full_qual=full_qual, qual=qual, pretty=True) else: return self.get_name(full_qual=full_qual, qual=qual, pretty=True)
[docs] @property def name(self): """ Same as :meth:`get_name` """ return self.get_name()
[docs] @property def mod_name(self): """ Name of the module the callable is defined in. If the callable is an :class:`UnboundMethod`, the module of its class is returned. .. note:: The callable is first unwrapped. """ try: if isinstance(self.callable_, UnboundMethod): module = inspect.getmodule(self.callable_.cls) else: module = inspect.getmodule(self.unwrapped_callable) except Exception: name = self.callable_globals['__name__'] else: name = module.__name__ return name
[docs] @property def src_loc(self): """ Get the source location of the unwrapped callable. .. seealso:: :func:`exekall._utils.get_src_loc` """ return utils.get_src_loc(self.unwrapped_callable)
[docs] @property def value_type(self): """ Annotated return type of the callable. """ return self.prototype[1]
[docs] @property def is_genfunc(self): """ ``True`` if the callable is a generator function. """ x = self._unwrapped_unbound while True: if inspect.isgeneratorfunction(x): return True else: try: x = x.__wrapped__ except AttributeError: return False
[docs] @property def is_class(self): """ ``True`` if the callable is a class. """ return inspect.isclass(self.unwrapped_callable)
[docs] @property def is_static_method(self): """ ``True`` if the callable is a ``staticmethod``. """ callable_ = self.unwrapped_callable try: callable_globals = callable_.__globals__ # __globals__ is only defined for functions except AttributeError: return False try: cls = utils._get_class_from_name( callable_.__qualname__.rsplit('.', 1)[0], namespace=callable_globals ) except ValueError: return False if not inspect.isclass(cls): return False # We retrieve the function as it was defined in the class body, not as # it appears when accessed as a class attribute. That means we bypass # the descriptor protocol by reading the class' __dict__ directly, and # the staticmethod will not have a chance to "turn itself" into a # function. orig_callable = inspect.getattr_static(cls, callable_.__name__) return isinstance(orig_callable, staticmethod)
[docs] @property def is_method(self): """ ``True`` if the callable is a plain method. """ if self.is_cls_method or self.is_static_method: return False else: qualname = self.unwrapped_callable.__qualname__ # Get the rightmost group, in case the callable has been defined in # a function qualname = qualname.rsplit('<locals>.', 1)[-1] # Dots in the qualified name means this function has been defined # in a class. This could also happen for closures, and they would # get "<locals>." somewhere in their name, but we handled that # already. return '.' in qualname
@property def _unwrapped_unbound(self): callable_ = self.callable_ if isinstance(callable_, UnboundMethod): return callable_.__wrapped__ else: return callable_
[docs] @property def is_cls_method(self): """ ``True`` if the callable is a ``classmethod``. """ # Class methods appear as a bound method object when referenced through # their class. The method is bound to a class, which is not the case # if this is not a class method. callable_ = self._unwrapped_unbound return ( inspect.ismethod(callable_) and inspect.isclass(callable_.__self__) )
[docs] @property def is_factory_cls_method(self): """ ``True`` if the callable is a factory ``classmethod``, i.e. a classmethod that returns objects of the class it is defined in (or of a subclass of it), or has a return annotation of typing.Self. """ callable_ = self._unwrapped_unbound try: from typing import Self except ImportError: returns_self = False else: returns_self = (self.annotations.get('return') == Self) return self.is_cls_method and ( returns_self or issubclass(callable_.__self__, self.value_type) )
[docs] def make_expr_val_iter(self, expr, param_map): """ Make an iterator that will yield the computed :class:`ExprVal`. """ if self.is_genfunc: @functools.wraps(self.callable_) def genf(**kwargs): try: has_yielded = False for res in self.callable_(**kwargs): has_yielded = True yield (res, NoValue) # If no value at all were produced, we still need to yield # something if not has_yielded: yield (NoValue, NoValue) except Exception as e: yield (NoValue, e) else: @functools.wraps(self.callable_) def genf(**kwargs): # yield one value and then return try: val = self.callable_(**kwargs) except Exception as e: val = NoValue excep = e else: excep = NoValue yield (val, excep) kwargs = OrderedDict( # Extract the actual computed values wrapped in ExprVal (param, param_expr_val.value) for param, param_expr_val in param_map.items() ) for utc, log_map, (duration, (value, excep)) in utils.capture_log(utils.measure_time(genf(**kwargs))): log = ExprValLog(log_map=log_map, utc_datetime=utc) yield ExprVal( expr=expr, param_map=param_map, value=value, excep=excep, uuid=utils.create_uuid(), duration=duration, log=log, )
def _resolve_annotations(self, annotations): """ Basic reimplementation of typing.get_type_hints. Some Python versions do not have a typing module available, and it also avoids creating ``Optional[]`` when the parameter has a None default value. """ module_vars = self.callable_globals def resolve(x): # If we get a string, evaluate it in the global namespace of the # module in which the callable was defined if isinstance(x, str): try: x = eval(x, module_vars) except Exception as e: raise AnnotationError(f'Cannot handle annotation "{x}": {e}') # Handle associated types: # If the annotation is a typing.TypeVar, assume it was defined as a # class attribute, which has been overridden by the subclass, so we # can look it up on the subclass. if ( isinstance(x, typing.TypeVar) and isinstance(self.callable_, UnboundMethod) ): cls = self.callable_.cls try: _x = getattr(cls, x.__name__) except AttributeError: pass else: # Only use the class attribute if it turns out to be a # type, to avoid unfortunate clashes between TypeVar and # unrelated attributes if isinstance(_x, type): x = _x return x return { param: resolve(cls) for param, cls in annotations.items() } def _get_prototype(self): """ Return the prototype of the callable as a tuple of: * map of parameter names to types * return type """ sig = self.signature first_param = utils.take_first(sig.parameters) annotations = self.annotations annotation_map = self._resolve_annotations(annotations) pristine_annotation_map = copy.copy(annotation_map) extra_ignored_param = set() # If it is a class if self.is_class: produced = self.unwrapped_callable # Get rid of "self", since annotating it with the class would lead to # infinite recursion when computing it signature. It will be handled # by execute() directly. Also, "self" is not a parameter of the # class when it is called, so it makes sense not to include it. annotation_map.pop(first_param, None) extra_ignored_param.add(first_param) # If it is any other callable else: # When we have a method, we fill the annotations of the 1st # parameter with the name of the class it is defined in if ( self.is_method and first_param is not NoValue # If there is a valid annotation already, we don't want to mess # with it and first_param not in annotations ): if isinstance(self.callable_, UnboundMethod): cls_name = self.callable_.cls.__qualname__ else: cls_name = self.resolved_callable.__qualname__.split('.')[0] # Avoid modifying the original annotations = copy.copy(annotations) annotations[first_param] = cls_name # No return annotation is accepted and is equivalent to None return # annotation produced = annotation_map.get('return') # "None" annotation is accepted, even though it is not a type # strictly speaking if produced is None: produced = type(None) # Recompute after potentially modifying the annotations annotation_map = self._resolve_annotations(annotations) # Remove the return annotation, since we are handling that separately annotation_map.pop('return', None) # Check that we have annotations for all parameters that are not ignored for param, param_spec in sig.parameters.items(): if ( param not in annotation_map and param not in extra_ignored_param and param not in self.ignored_param and param_spec.kind not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD) ): # If some parameters are annotated but not all, we raise a # slightly different exception to allow better reporting if pristine_annotation_map: excep_cls = PartialAnnotationError else: excep_cls = AnnotationError raise excep_cls('Missing annotation for "{param}" parameters of operator "{op}"'.format( param=param, op=self.name, )) # Iterate over keys and values of "mapping" in the same order as "keys" def iter_by_keys(mapping, keys): for key in keys: try: yield key, mapping[key] except KeyError: pass # Use an OrderedDict to retain the declaration order of parameters param_map = OrderedDict( (name, annotation) for name, annotation in iter_by_keys(annotation_map, sig.parameters.keys()) if not ( name in self.ignored_param or name in extra_ignored_param ) ) return (param_map, produced)
[docs] class PrebuiltOperator(Operator): """ :class:`Operator` that injects prebuilt objects. :param obj_type: Type of the objects that are injected. :type obj_type: type :param obj_list: List of objects to inject :type obj_list: list(object) :param id_: ID of the operator. :type id_: str or None :Variable keyword arguments: Forwarded to :class:`Operator` constructor. """ def __init__(self, obj_type, obj_list, id_=None, **kwargs): def make_info(obj): # Transparently copy the UUID to avoid having multiple UUIDs # refering to the same actual value. if isinstance(obj, FrozenExprVal): value = obj.value uuid_ = obj.uuid duration = obj.duration log = obj.log else: value = obj uuid_ = utils.create_uuid() duration = None log = None return dict( value=value, uuid=uuid_, duration=duration, log=log, ) self.values_info = [ make_info(obj) for obj in obj_list ] self.obj_type = obj_type self._id = id_ # Placeholder for the signature def callable_() -> self.obj_type: pass super().__init__(callable_, **kwargs)
[docs] def get_name(self, *args, **kwargs): return None
[docs] def get_id(self, *args, style=None, **kwargs): return self._id or utils.get_name(self.obj_type, *args, **kwargs)
[docs] @property def src_loc(self): return utils.get_src_loc(self.value_type)
[docs] @property def is_genfunc(self): return len(self.values_info) > 1
[docs] @property def is_method(self): return False
[docs] def make_expr_val_iter(self, expr, param_map): assert not param_map for kwargs in self.values_info: yield ExprVal( expr=expr, excep=NoValue, param_map=ExprValParamMap(), **kwargs )
[docs] class ConsumerOperator(PrebuiltOperator): """ Placeholder operator used to represent the consumer of the an expression asking for it. :param consumer: Callable that will consume the value of the expression refering to its consumer. :type consumer: collections.abc.Callable """ def __init__(self, consumer=None): obj_type = Consumer super().__init__( obj_type, [consumer], tags_getter=self._get_tag, ) # That allows easily making it hidden self.callable_ = obj_type def _get_tag(self, expr_val): return {'consumer': self.get_name()}
[docs] def get_name(self, *args, **kwargs): obj = Consumer if self.consumer is None else self.consumer return utils.get_name(obj, *args, **kwargs)
[docs] @property def consumer(self): return self.values_info[0]['value']
[docs] class ExprDataOperator(PrebuiltOperator): """ Placeholder operator for :class:`ExprData`. The :class:`ExprData` that will be used is the same throughout an expression, and is the one of the root expression. """ def __init__(self, data=None): obj_type = ExprData super().__init__( obj_type, [data], ) # That allows easily making it hidden self.callable_ = obj_type
[docs] @property def data(self): return self.values_info[0]['value']
@property def uuid_list(self): return [self.data.uuid] if self.data is not None else []
[docs] @uuid_list.setter def uuid_list(self, val): pass
[docs] class ExprValSeq: """ Sequence of :class:`ExprVal` produced by an :class:`ComputableExpression`. :param expr: :class:`ComputableExpression` that was used to compute the values. :type expr: ComputableExpression :param iterator: Iterator that yields the :class:`ExprVal`. This is used when the expressions are being executed. :type iterator: collections.abc.Iterator :param param_map: Ordered mapping of parameters name to :class:`ExprVal` used to compute the recored :class:`ExprVal`. :type param_map: collections.OrderedDict :param post_compute_cb: See :meth:`ComputableExpression.execute` :type post_compute_cb: collections.abc.Callable Since :class:`ComputableExpression` can represent generator functions, they are allowed to create multiple :class:`ExprVal`. """ def __init__(self, expr, iterator, param_map, post_compute_cb=None): self.expr = expr assert isinstance(iterator, collections.abc.Iterator) self.iterator = iterator self.expr_val_list = [] self.param_map = param_map self.post_compute_cb = post_compute_cb
[docs] @classmethod def from_one_expr_val(cls, expr_val): """ Build an :class:`ExprValSeq` out of a single :class:`ExprVal`. .. seealso:: :class:`ExprValSeq` for parameters description. """ new = cls( expr=expr_val.expr, iterator=iter([expr_val]), param_map=expr_val.param_map, # no post_compute_cb, since we are not really going to compute # anything post_compute_cb=None, ) # consume the iterator to make sure new.expr_val_list is updated for _ in new.iter_expr_val(): pass return new
[docs] @classmethod def from_expr(cls, expr, param_map, **kwargs): """ Build an :class:`ExprValSeq` out of a single :class:`ComputableExpression`. .. seealso:: :class:`ExprValSeq` for parameters description. """ iterator = expr.op.make_expr_val_iter(expr, param_map) return cls( expr=expr, iterator=iterator, param_map=param_map, **kwargs, )
[docs] def iter_expr_val(self): """ Iterate over the iterator and yield :class:`ExprVal`. ``post_compute_cb`` will be called when a value is computed or reused. """ callback = self.post_compute_cb if not callback: def callback(x, reused): return None def yielder(iterable, reused): for x in iterable: callback(x, reused=reused) yield x # Yield existing values yield from yielder(self.expr_val_list, True) # Then compute the remaining ones if self.iterator: for expr_val in self.iterator: callback(expr_val, reused=False) self.expr_val_list.append(expr_val) expr_val_list_len = len(self.expr_val_list) yield expr_val # If expr_val_list length has changed, catch up with the values # that were computed behind our back, so that this generator is # reentrant. if expr_val_list_len != len(self.expr_val_list): # This will yield all values, even if the list grows while # we are yielding the control back to another piece of code. yield from yielder( self.expr_val_list[expr_val_list_len:], True ) self.iterator = None
[docs] class ExprValParamMap(OrderedDict): """ Mapping of parameters to :class:`ExprVal` used when computing the value of a :class:`ComputableExpression`. """
[docs] def is_partial(self, ignore_error=False): """ Return ``True`` if the map is partial, i.e. some parameters don't have a value. That could be because one of them could not be computed due to an exception, or because it was skipped since it could not lead to a result anyway. """ def is_partial(expr_val): # Some arguments are missing: there was no attempt to compute # them because another argument failed to be computed if isinstance(expr_val, UnEvaluatedExprVal): return True # Or computation did take place but failed if expr_val.value is NoValue and not ignore_error: return True return False return any( is_partial(expr_val) for expr_val in self.values() )
[docs] @classmethod def from_gen_map(cls, param_gen_map): """ Build a :class:`ExprValParamMap` out of a mapping of parameters names and expressions to generators. :param param_gen_map: Mapping of tuple(param_name, param_expr) to an iterator that is ready to generate the possible values for the generator. :type param_gen_map: collections.OrderedDict Generators are assumed to only yield once. .. seealso:: :meth:`from_gen_map_product` for cases where the generator is expected to yield more than once. """ # Pre-fill UnEvaluatedExprVal with in case we exit the loop early param_map = cls( (param, UnEvaluatedExprVal(param_expr)) for (param, param_expr), _ in param_gen_map.items() ) for (param, param_expr), generator in param_gen_map.items(): val = next(generator) # There is no point in computing values of the other generators if # one failed to produce a useful value if val.value is NoValue: break else: param_map[param] = val return param_map
[docs] @classmethod def from_gen_map_product(cls, param_gen_map): """ Yield :class:`collections.OrderedDict` for each combination of parameter values. :param param_gen_map: Mapping of tuple(param_name, param_expr) to an iterator that is ready to generate the possible values for the generator. :type param_gen_map: collections.OrderedDict """ if not param_gen_map: yield cls() else: # Since param_gen_map is an OrderedDict, we will always consume # parameters in the same order param_spec_list, gen_list = zip(*param_gen_map.items()) param_list, param_expr_list = zip(*param_spec_list) for values in cls._product(gen_list): # We need to pad since we may truncate the list of values we # yield if we detect an error in one of them. values.extend( UnEvaluatedExprVal(param_expr) for param_expr in param_expr_list[len(values):] ) yield cls(zip(param_list, values))
@classmethod def _product(cls, gen_list): """ Similar to the cartesian product provided by itertools.product, with special handling of NoValue and some checks on the yielded sequences. It will only yield the combinations of values that are validated by :meth:`validate`. """ def validated(generator): """ Ensure we only yield valid lists of :class:`ExprVal` """ for expr_val_list in generator: if ExprVal.validate(expr_val_list): yield expr_val_list else: continue def acc_product(product_generator, generator): """ Combine a "cartesian-product-style" generator with a plain generator, giving a new "cartesian-product-style" generator. """ # We will need to use it more than once in the inner loop, so it # has to be "restartable" (like a list, and unlike a plain # iterator) product_iter = utils.RestartableIter(product_generator) for expr_val in generator: # The value is not useful, we can return early without calling # the other generators. That avoids spending time computing # parameters if they won't be used anyway. if expr_val.value is NoValue: # Returning an incomplete list will make the calling code # aware that some values were not computed at all yield [expr_val] else: for expr_val_list in product_iter: # prepend to the list to counter-act the effects of # reversed(gen_list) yield [expr_val] + expr_val_list def reducer(product_generator, generator): yield from validated(acc_product(product_generator, generator)) def initializer(): yield [] # reverse the gen_list so we get the rightmost generator varying the # fastest. Typically, margins-like parameter on which we do sweeps are # on the right side of the parameter list (to have a default value) return functools.reduce(reducer, reversed(gen_list), initializer())
[docs] class ExprValBase(ExprHelpers): """ Base class for classes representing the value of an expression. :param param_map: Map of parameter names of the :class:`Operator` that gave this value to their :class:`ExprValBase` value. :type param_map: dict(str, ExprValBase) :param value: Value that was computed. If no value was computed, :attr:`exekall._utils.NoValue` will be used. :type value: object :param excep: Exception that was raised while computing the value. If no excpetion was raised, :attr:`exekall._utils.NoValue` will be used. :type value: Exception :param uuid: UUID of the :class:`ExprValBase` :type uuid: str :param duration: Time it took to compute the value or the exception in seconds. :type duration: float or None :param log: Log collected during the computation of the value. :type log: ExprValLog """ def __init__(self, param_map, value, excep, uuid, duration, log): self.param_map = param_map self.value = value self.excep = excep self.uuid = uuid self.duration = duration self.log = log
[docs] @property def cumulative_duration(self): """ Sum of the duration of all :class:`ExprValBase` that were involved in the computation of that one. """ def f(duration, expr_val): return duration + (expr_val.duration or 0) return self.fold(f, 0, visit_once=True)
[docs] def get_full_log(self, level): """ Reconstruct a consistent log output at the given level by stitching the logs of all parent :class:`ExprValBase` that were involved in the computation of that value. :param level: Logging level to reconstruct. :type level: str """ def f(logs, expr_val): logs.add(expr_val.log) return logs logs = self.fold(f, set(), visit_once=True) logs.discard(None) logs = sorted(logs, key=attrgetter('utc_datetime')) level = level.upper() return '\n'.join( log.log_map[level] for log in logs if log.log_map.get(level) )
[docs] def get_by_predicate(self, predicate): """ Get a list of parents :class:`ExprValBase` for which the predicate returns ``True``. :type predicate: collections.abc.Callable """ def _get_by_predicate(self, predicate): if predicate(self): yield self for val in self.param_map.values(): yield from _get_by_predicate(val, predicate) return list(_get_by_predicate(self, predicate))
[docs] def get_excep(self): """ Get all the parents :class:`ExprValBase` for which an exception was raised. """ def predicate(val): return val.excep is not NoValue return self.get_by_predicate(predicate)
[docs] def get_by_type(self, cls, include_subclasses=True, **kwargs): """ Get a list of parents :class:`ExprValBase` having a value of the given type. :param cls: Type to look for. :type cls: type :param include_subclasses: If True, the check is done using ``isinstance``, otherwise an exact type check is done using ``is``. :type include_subclasses: bool """ if include_subclasses: def predicate(expr_val): return isinstance(expr_val.value, cls) else: def predicate(expr_val): return type(expr_val.value) is cls return self.get_by_predicate(predicate, **kwargs)
[docs] def format_structure(self, full_qual=True): """ Format the value and its parents in a human readable way. :param full_qual: If True, use fully qualified IDs. :type full_qual: bool """ value = self.value if type(value).__str__ is not object.__str__: value_str = str(value) else: value_str = '' value_str = value_str if '\n' not in value_str else '' idt = ' ' * 4 joiner = '\n' + idt params = joiner.join( '{param}: {value}'.format( param=param, value=expr_val.format_structure(full_qual=full_qual) ).replace('\n', joiner) for param, expr_val in self.param_map.items() ) return '{id} ({type}) UUID={uuid}{value}{joiner}{params}'.format( id=self.get_id(full_qual=full_qual), value=f' ({value_str})' if value_str else '', params=params, joiner=':' + joiner if params else '', uuid=self.uuid, type=utils.get_name(type(value), full_qual=full_qual, pretty=True), )
[docs] class FrozenExprVal(ExprValBase): """ Serializable version of :class:`ExprVal`. :param uuid: UUID of the :class:`ExprVal` :type uuid: str :param duration: Time it took to compute the value or the exception in seconds. :type duration: float or None :param log: Log collected during the computation of the value. :type log: ExprValLog :param callable_qualname: Qualified name of the callable that was used to compute the value, including module name. :type callable_qualname: str :param callable_name: Name of the callable that was used to compute the value. :type callable_name: str :param recorded_id_map: Mapping of :meth:`ExprVal.get_id` parameters to the corresponding ID. The parameters :type recorded_id_map: dict The most important instance attributes are: .. list-table:: :widths: auto * - ``value`` - Value that was computed, or :attr:`~exekall._utils.NoValue` if it was not computed. This could be because of an exception when computing it, or because computing the value was skipped. * - ``excep`` - Exception that was raised when trying to compute the value, or :attr:`~exekall._utils.NoValue`. * - ``uuid`` - String UUID of that value. This is unique and can be used to correlate with logs, or deduplicate across multiple :class:`ValueDB`. * - ``duration`` - Time it took to compute the value in seconds. Since it is a subclass of :class:`ExprValBase`, the :class:`FrozenExprVal` value of the parameters of the callable that was used to compute it can be accessed using the subscript operator ``[]``. Instances of this class will not refer to the callable that was used to create the values, and will record the ID of the values instead of recomputing it from the graph of :class:`ExpressionBase`. This allows manipulating :class:`FrozenExprVal` as standalone objects, with minimal references to the code, which improves robustness against API change that would make deserializing them impossible. .. seealso:: :class:`ExprValBase` """ def __init__(self, param_map, value, excep, uuid, duration, log, callable_qualname, callable_name, recorded_id_map, tags, ): self.callable_qualname = callable_qualname self.callable_name = callable_name self.recorded_id_map = recorded_id_map self.tags = tags super().__init__( param_map=param_map, value=value, excep=excep, uuid=uuid, duration=duration, log=log, ) if self.excep is not NoValue: self.excep_tb = utils.format_exception(self.excep) else: self.excep_tb = None
[docs] @property def callable_(self): """ Callable that produced the value. If it cannot be found, an :exc:`AttributeError` is raised. """ assert self.callable_qualname.endswith(self.callable_name) # This could also include the class name mod_name = self.callable_qualname[:-len(self.callable_name)] mod_name = mod_name.rstrip('.') # Given a.b.c, try in order and stop at first import: # a.b.c # a.b # a for mod_path in reversed(list(utils.powerset(mod_name.split('.')))): mod_name = '.'.join(mod_path) # Exception raised changed in 3.7: # https://docs.python.org/3/library/importlib.html#importlib.util.find_spec if sys.version_info >= (3, 7): try: mod = importlib.import_module(mod_name) except ModuleNotFoundError: continue else: try: mod = importlib.import_module(mod_name) # More or less the equivalent of ModuleNotFoundError except ImportError as e: if e.path is None: continue else: raise break # No "break" statement was executed else: return AttributeError(f'Producer of {self} not found') qualname = self.callable_qualname[len(mod_name):].lstrip('.') qualname = qualname.split('.') attr = mod attr_path = list(functools.accumulate(getattr)) if ( len(attr_path) > 1 and isinstance( attr, ( # Instance and static methods types.FunctionType, # Class methods types.MethodType, ) ) ): attr = UnboundMethod(attr, attr_path[-2]) return attr
[docs] @property @functools.lru_cache(maxsize=None) def type_(self): """ Type of the ``value``, as reported by the return annotation of the callable that produced it. If the callable cannot be found, the actual value type is used. If the value type is compatible with the return annotation of the callable (i.e. is a subclass), the value type is returned as well. """ value_type = type(self.value) try: callable_ = self.callable_ # If we cannot import the module or if the name has disappeared, we # have no choice except (ImportError, AttributeError) as e: return value_type # Use Operator to properly resolve the annotations op = Operator(callable_) return_type = op.value_type if return_type != inspect.Signature.empty: # If the function actually returned an instance of what it claimed # to return from its annotation, we take the actual type of the # value if ClassContext.COMPAT_CLS(value_type, return_type): return value_type # Otherwise, the type is probably a "phantom" type that is actually # never instantiated, but is used to give an existence at the type # level to some assumptions or some meaning attached to the value. else: return return_type else: return value_type
[docs] @property def type_names(self): return [ utils.get_name(type_, full_qual=True) for type_ in utils.get_mro(self.type_) if type_ is not object ]
[docs] @classmethod def from_expr_val(cls, expr_val, hidden_callable_set=None): """ Build a :class:`FrozenExprVal` from one :class:`ExprVal`. :param hidden_callable_set: Set of callables that should not appear in the ID. :type hidden_callable_set: set(collections.abc.Callable) """ value = expr_val.value if utils.is_serializable(expr_val.value) else NoValue excep = expr_val.excep if utils.is_serializable(expr_val.excep) else NoValue op = expr_val.expr.op # Reloading these values will lead to issues, and they are regenerated # for any new Expression that would be created anyway. if isinstance(op, (ExprDataOperator, ConsumerOperator)): value = NoValue excep = NoValue callable_qualname = op.get_name(full_qual=True) callable_name = op.get_name(full_qual=False, qual=False) # Pre-compute all the IDs so they are readily available once the value # is deserialized recorded_id_map = dict() for full_qual, qual, with_tags in itertools.product((True, False), repeat=3): key = cls._make_id_key( full_qual=full_qual, qual=qual, with_tags=with_tags ) recorded_id_map[key] = expr_val.get_id( **dict(key), hidden_callable_set=hidden_callable_set, ) param_map = ExprValParamMap( (param, cls.from_expr_val( param_expr_val, hidden_callable_set=hidden_callable_set, )) for param, param_expr_val in expr_val.param_map.items() ) froz_val = cls( uuid=expr_val.uuid, value=value, excep=excep, callable_qualname=callable_qualname, callable_name=callable_name, recorded_id_map=recorded_id_map, param_map=param_map, tags=expr_val.get_tags(), duration=expr_val.duration, log=expr_val.log, ) return froz_val
@staticmethod # Since tuples are immutable, reuse the same tuple by memoizing the # function. That allows more compact serialized representation in both YAML # and Pickle. @utils.once def _make_id_key(**kwargs): return tuple(sorted(kwargs.items()))
[docs] def get_id(self, full_qual=True, qual=True, with_tags=True, remove_tags=set()): """ Return recorded IDs generated using :meth:`ExprVal.get_id`. """ full_qual = full_qual and qual key = self._make_id_key( full_qual=full_qual, qual=qual, with_tags=with_tags ) id_ = self.recorded_id_map[key] for tag in remove_tags: id_ = re.sub(fr'\[{tag}=.*?\]', '', id_) return id_
[docs] def get_tags(self): return self.tags
[docs] class PrunedFrozVal(FrozenExprVal): """ Placeholder introduced by :meth:`ValueDB.prune_by_predicate` when a :class:`FrozenExprVal` is pruned. """ def __init__(self, froz_val): super().__init__( param_map=OrderedDict(), value=NoValue, excep=NoValue, uuid=froz_val.uuid, callable_qualname=froz_val.callable_qualname, callable_name=froz_val.callable_name, recorded_id_map=copy.copy(froz_val.recorded_id_map), tags=froz_val.get_tags(), duration=froz_val.duration, log=None, ) self._cumulative_duration = froz_val.cumulative_duration
[docs] @property def cumulative_duration(self): return self._cumulative_duration
[docs] class FrozenExprValSeq(collections.abc.Sequence): """ Sequence of :class:`FrozenExprVal` analogous to :class:`ExprValSeq`. :param froz_val_list: List of :class:`FrozenExprVal`. :type froz_val_list: list(FrozenExprVal) :param param_map: Parameter map that was used to compute the :class:`ExprVal`. See :class:`ExprValSeq`. :type param_map: dict Since it inherits from :class:`collections.abc.Sequence`, it can be iterated over directly. """ def __init__(self, froz_val_list, param_map): self.froz_val_list = froz_val_list self.param_map = param_map
[docs] def __getitem__(self, k): return self.froz_val_list[k]
[docs] def __len__(self): return len(self.froz_val_list)
[docs] @classmethod def from_expr_val_seq(cls, expr_val_seq, **kwargs): """ Build a :class:`FrozenExprValSeq` from an :class:`ExprValSeq`. :Variable keyword arguments: Forwarded to :meth:`FrozenExprVal.from_expr_val`. """ return cls( froz_val_list=[ FrozenExprVal.from_expr_val(expr_val, **kwargs) for expr_val in expr_val_seq.expr_val_list ], param_map=OrderedDict( (param, FrozenExprVal.from_expr_val(expr_val, **kwargs)) for param, expr_val in expr_val_seq.param_map.items() ) )
[docs] @classmethod def from_expr_list(cls, expr_list, **kwargs): """ Build a list of :class:`FrozenExprValSeq` from an list of :class:`ComputableExpression`. :param expr_list: List of :class:`ComputableExpression` to extract the :class:`ExprVal` from. :type expr_list: list(ComputableExpression) :Variable keyword arguments: Forwarded to :meth:`from_expr_val_seq`. """ expr_val_seq_list = utils.flatten_seq(expr.expr_val_seq_list for expr in expr_list) return [ cls.from_expr_val_seq(expr_val_seq, **kwargs) for expr_val_seq in expr_val_seq_list ]
[docs] class ExprValLog: """ Logging output created when computing an :class:`ExprValBase`. :param log_map: Mapping of log level name to log content. :type log_map: dict(str, str) :param utc_datetime: UTC timestamp as a datetime object corresponding to the beginning of the log. :type utc_datetime: datetime.datetime """ def __init__(self, log_map, utc_datetime): self.log_map = log_map self.utc_datetime = utc_datetime
[docs] class ExprVal(ExprValBase): """ Value computed when executing :class:`ComputableExpression`. :param expr: Expression for which this value was computed :type expr: ExpressionBase :param uuid: UUID of the value. :type uuid: str .. seealso:: :class:`ExprValBase` for the other parameters. """ def __init__(self, expr, param_map, value=NoValue, excep=NoValue, uuid=None, duration=None, log=None, ): uuid = uuid if uuid is not None else utils.create_uuid() self.expr = expr super().__init__( param_map=param_map, value=value, excep=excep, uuid=uuid, duration=duration, log=log, )
[docs] def get_tags(self): """ Return a dictionary of the tags. """ return { # Make sure there are no brackets in tag values, since that # would break all regex parsing done on IDs. k: str(v).replace('[', '').replace(']', '') for k, v in self.expr.op.tags_getter(self.value).items() }
[docs] def format_tags(self, remove_tags=set()): """ Return a formatted string for the tags of that :class:`ExprVal`. :param remove_tags: Do not include those tags :type remove_tags: set(str) """ tag_map = self.get_tags() if tag_map: return ''.join( f'[{k}={v}]' if k else f'[{v}]' for k, v in sorted(tag_map.items()) if k not in remove_tags ) else: return ''
[docs] @classmethod def validate(cls, expr_val_list): """ Check that the list contains only one :class:`ExprVal` for each :class:`ComputableExpression`, unless it is non reusable. """ def update_map(expr_map, expr_val1): # The check does not apply for non-reusable operators, since it is # expected that the same expression may reference multiple values # of the same Expression. if not expr_val1.expr.op.reusable: return expr_map expr_val2 = expr_map.setdefault(expr_val1.expr, expr_val1) # Check that there is only one ExprVal per Expression, for all # expressions that were (indirectly) involved into computation of # expr_val_list if expr_val2 is not expr_val1: raise ValueError return expr_map expr_map = {} try: for expr_val in expr_val_list: expr_val.fold(update_map, expr_map, visit_once=True) except ValueError: return False else: return True
[docs] def get_id(self, *args, with_tags=True, **kwargs): """ See :class:`ExpressionBase.get_id`. """ return self.expr.get_id( with_tags=with_tags, expr_val=self, *args, **kwargs )
[docs] class UnEvaluatedExprVal(ExprVal): """ Placeholder :class:`ExprVal` created when computing the value was known to not lead to anything useful. """ def __init__(self, expr): super().__init__( expr=expr, # Having an empty param_map is important to avoid selecting it as # an already-computed value param_map=ExprValParamMap(), uuid=None, value=NoValue, excep=NoValue, duration=None, log=None, )
[docs] class Consumer: """ Placeholder type used in PEP 484 annotations by callables to refer to the callable that will use their value. .. note:: This leads to cloning the expression refering to its consumer for each different consumer. """ def __init__(self): pass
[docs] class ExprData(dict): """ Placeholder type used in PEP 484 annotations by callables to refer to the expression-wide data dictionnary. """ def __init__(self): super().__init__() self.uuid = utils.create_uuid()