Source code for glom.core

"""*glom gets results.*

The ``glom`` package has one central entrypoint,
:func:`glom.glom`. Everything else in the package revolves around that
one function. Sometimes, big things come in small packages.

A couple of conventional terms you'll see repeated many times below:

* **target** - glom is built to work on any data, so we simply
  refer to the object being accessed as the *"target"*
* **spec** - *(aka "glomspec", short for specification)* The
  accompanying template used to specify the structure of the return
  value.

Now that you know the terms, let's take a look around glom's powerful
semantics.

"""

from __future__ import print_function

import os
import sys
import pdb
import copy
import warnings
import weakref
import operator
from abc import ABCMeta
from pprint import pprint
import string
from collections import OrderedDict
import traceback

from face.helpers import get_wrap_width
from boltons.typeutils import make_sentinel
from boltons.iterutils import is_iterable
#from boltons.funcutils import format_invocation

basestring = str
_AbstractIterableBase = ABCMeta('_AbstractIterableBase', (object,), {})
from collections import ChainMap
from reprlib import Repr, recursive_repr

GLOM_DEBUG = os.getenv('GLOM_DEBUG', '').strip().lower()
GLOM_DEBUG = False if (GLOM_DEBUG in ('', '0', 'false')) else True

TRACE_WIDTH = max(get_wrap_width(max_width=110), 50)   # min width

PATH_STAR = True
# should * and ** be interpreted as parallel traversal in Path.from_text()?
# Changed to True in 23.1, this option to disable will go away soon

_type_type = type

_MISSING = make_sentinel('_MISSING')
SKIP =  make_sentinel('SKIP')
SKIP.__doc__ = """
The ``SKIP`` singleton can be returned from a function or included
via a :class:`~glom.Val` to cancel assignment into the output
object.

>>> target = {'a': 'b'}
>>> spec = {'a': lambda t: t['a'] if t['a'] == 'a' else SKIP}
>>> glom(target, spec)
{}
>>> target = {'a': 'a'}
>>> glom(target, spec)
{'a': 'a'}

Mostly used to drop keys from dicts (as above) or filter objects from
lists.

.. note::

   SKIP was known as OMIT in versions 18.3.1 and prior. Versions 19+
   will remove the OMIT alias entirely.
"""
OMIT = SKIP  # backwards compat, remove in 19+

STOP = make_sentinel('STOP')
STOP.__doc__ = """
The ``STOP`` singleton can be used to halt iteration of a list or
execution of a tuple of subspecs.

>>> target = range(10)
>>> spec = [lambda x: x if x < 5 else STOP]
>>> glom(target, spec)
[0, 1, 2, 3, 4]
"""

LAST_CHILD_SCOPE = make_sentinel('LAST_CHILD_SCOPE')
LAST_CHILD_SCOPE.__doc__ = """
Marker that can be used by parents to keep track of the last child
scope executed.  Useful for "lifting" results out of child scopes
for scopes that want to chain the scopes of their children together
similar to tuple.
"""

NO_PYFRAME = make_sentinel('NO_PYFRAME')
NO_PYFRAME.__doc__ = """
Used internally to mark scopes which are no longer wrapped
in a recursive glom() call, so that they can be cleaned up correctly
in case of exceptions
"""

MODE =  make_sentinel('MODE')

MIN_MODE =  make_sentinel('MIN_MODE')

CHILD_ERRORS = make_sentinel('CHILD_ERRORS')
CHILD_ERRORS.__doc__ = """
``CHILD_ERRORS`` is used by glom internals to keep track of
failed child branches of the current scope.
"""

CUR_ERROR = make_sentinel('CUR_ERROR')
CUR_ERROR.__doc__ = """
``CUR_ERROR`` is used by glom internals to keep track of
thrown exceptions.
"""

_PKG_DIR_PATH = os.path.dirname(os.path.abspath(__file__))

[docs]class GlomError(Exception): """The base exception for all the errors that might be raised from :func:`glom` processing logic. By default, exceptions raised from within functions passed to glom (e.g., ``len``, ``sum``, any ``lambda``) will not be wrapped in a GlomError. """ @classmethod def wrap(cls, exc): # TODO: need to test this against a wide array of exception types # this approach to wrapping errors works for exceptions # defined in pure-python as well as C exc_type = type(exc) bases = (GlomError,) if issubclass(GlomError, exc_type) else (exc_type, GlomError) exc_wrapper_type = type("GlomError.wrap({})".format(exc_type.__name__), bases, {}) try: wrapper = exc_wrapper_type(*exc.args) wrapper.__wrapped = exc return wrapper except Exception: # maybe exception can't be re-created return exc def _set_wrapped(self, exc): self.__wrapped = exc def _finalize(self, scope): # careful when changing how this functionality works; pytest seems to mess with # the traceback module or sys.exc_info(). we saw different stacks when originally # developing this in June 2020. etype, evalue, _ = sys.exc_info() tb_lines = traceback.format_exc().strip().splitlines() limit = 0 for line in reversed(tb_lines): if _PKG_DIR_PATH in line: limit -= 1 break limit += 1 self._tb_lines = tb_lines[-limit:] # if the first line is trying to put a caret at a byte-code location on a line that # isn't being displayed, skip it if set(self._tb_lines[0]) <= {' ', '^', '~'}: self._tb_lines = self._tb_lines[1:] self._scope = scope def __str__(self): if getattr(self, '_finalized_str', None): return self._finalized_str elif getattr(self, '_scope', None) is not None: self._target_spec_trace = format_target_spec_trace(self._scope, self.__wrapped) parts = ["error raised while processing, details below.", " Target-spec trace (most recent last):", self._target_spec_trace] parts.extend(self._tb_lines) self._finalized_str = "\n".join(parts) return self._finalized_str # else, not finalized try: exc_get_message = self.get_message except AttributeError: exc_get_message = super(GlomError, self).__str__ return exc_get_message()
def _unpack_stack(scope, only_errors=True): """ convert scope to [[scope, spec, target, error, [children]]] this is a convenience method for printing stacks only_errors=True means ignore branches which may still be hanging around which were not involved in the stack trace of the error only_errors=False could be useful for debugger / introspection (similar to traceback.print_stack()) """ stack = [] scope = scope.maps[0] while LAST_CHILD_SCOPE in scope: child = scope[LAST_CHILD_SCOPE] branches = scope[CHILD_ERRORS] if branches == [child]: branches = [] # if there's only one branch, count it as linear stack.append([scope, scope[Spec], scope[T], scope.get(CUR_ERROR), branches]) # NB: this id() business is necessary to avoid a # nondeterministic bug in abc's __eq__ see #189 for details if id(child) in [id(b) for b in branches]: break # if child already covered by branches, stop the linear descent scope = child.maps[0] else: # if break executed above, cur scope was already added stack.append([scope, scope[Spec], scope[T], scope.get(CUR_ERROR), []]) # push errors "down" to where they were first raised / first observed for i in range(len(stack) - 1): cur, nxt = stack[i], stack[i + 1] if cur[3] == nxt[3]: cur[3] = None if only_errors: # trim the stack to the last error # leave at least 1 to not break formatting func below # TODO: make format_target_spec_trace() tolerate an "empty" stack cleanly while len(stack) > 1 and stack[-1][3] is None: stack.pop() return stack def _format_trace_value(value, maxlen): s = bbrepr(value).replace("\\'", "'") if len(s) > maxlen: try: suffix = '... (len=%s)' % len(value) except Exception: suffix = '...' s = s[:maxlen - len(suffix)] + suffix return s def format_target_spec_trace(scope, root_error, width=TRACE_WIDTH, depth=0, prev_target=_MISSING, last_branch=True): """ unpack a scope into a multi-line but short summary """ segments = [] indent = " " + "|" * depth tick = "| " if depth else "- " def mk_fmt(label, t=None): pre = indent + (t or tick) + label + ": " fmt_width = width - len(pre) return lambda v: pre + _format_trace_value(v, fmt_width) fmt_t = mk_fmt("Target") fmt_s = mk_fmt("Spec") fmt_b = mk_fmt("Spec", "+ ") recurse = lambda s, last=False: format_target_spec_trace(s, root_error, width, depth + 1, prev_target, last) tb_exc_line = lambda e: "".join(traceback.format_exception_only(type(e), e))[:-1] fmt_e = lambda e: indent + tick + tb_exc_line(e) for scope, spec, target, error, branches in _unpack_stack(scope): if target is not prev_target: segments.append(fmt_t(target)) prev_target = target if branches: segments.append(fmt_b(spec)) segments.extend([recurse(s) for s in branches[:-1]]) segments.append(recurse(branches[-1], last_branch)) else: segments.append(fmt_s(spec)) if error is not None and error is not root_error: last_line_error = True segments.append(fmt_e(error)) else: last_line_error = False if depth: # \ on first line, X on last line remark = lambda s, m: s[:depth + 1] + m + s[depth + 2:] segments[0] = remark(segments[0], "\\") if not last_branch or last_line_error: segments[-1] = remark(segments[-1], "X") return "\n".join(segments) # TODO: not used (yet) def format_oneline_trace(scope): """ unpack a scope into a single line summary (shortest summary possible) """ # the goal here is to do a kind of delta-compression -- # if the target is the same, don't repeat it segments = [] prev_target = _MISSING for scope, spec, target, error, branches in _unpack_stack(scope, only_errors=False): segments.append('/') if type(spec) in (TType, Path): segments.append(bbrepr(spec)) else: segments.append(type(spec).__name__) if target != prev_target: segments.append('!') segments.append(type(target).__name__) if Path in scope: segments.append('<') segments.append('->'.join([str(p) for p in scope[Path]])) segments.append('>') prev_target = target return "".join(segments)
[docs]class PathAccessError(GlomError, AttributeError, KeyError, IndexError): """This :exc:`GlomError` subtype represents a failure to access an attribute as dictated by the spec. The most commonly-seen error when using glom, it maintains a copy of the original exception and produces a readable error message for easy debugging. If you see this error, you may want to: * Check the target data is accurate using :class:`~glom.Inspect` * Catch the exception and return a semantically meaningful error message * Use :class:`glom.Coalesce` to specify a default * Use the top-level ``default`` kwarg on :func:`~glom.glom()` In any case, be glad you got this error and not the one it was wrapping! Args: exc (Exception): The error that arose when we tried to access *path*. Typically an instance of KeyError, AttributeError, IndexError, or TypeError, and sometimes others. path (Path): The full Path glom was in the middle of accessing when the error occurred. part_idx (int): The index of the part of the *path* that caused the error. >>> target = {'a': {'b': None}} >>> glom(target, 'a.b.c') Traceback (most recent call last): ... PathAccessError: could not access 'c', part 2 of Path('a', 'b', 'c'), got error: ... """ def __init__(self, exc, path, part_idx): self.exc = exc self.path = path self.part_idx = part_idx def get_message(self): path_part = Path(self.path).values()[self.part_idx] return ('could not access %r, part %r of %r, got error: %r' % (path_part, self.part_idx, self.path, self.exc)) def __repr__(self): cn = self.__class__.__name__ return '%s(%r, %r, %r)' % (cn, self.exc, self.path, self.part_idx)
[docs]class PathAssignError(GlomError): """This :exc:`GlomError` subtype is raised when an assignment fails, stemming from an :func:`~glom.assign` call or other :class:`~glom.Assign` usage. One example would be assigning to an out-of-range position in a list:: >>> assign(["short", "list"], Path(5), 'too far') # doctest: +SKIP Traceback (most recent call last): ... PathAssignError: could not assign 5 on object at Path(), got error: IndexError(... Other assignment failures could be due to assigning to an ``@property`` or exception being raised inside a ``__setattr__()``. """ def __init__(self, exc, path, dest_name): self.exc = exc self.path = path self.dest_name = dest_name def get_message(self): return ('could not assign %r on object at %r, got error: %r' % (self.dest_name, self.path, self.exc)) def __repr__(self): cn = self.__class__.__name__ return '%s(%r, %r, %r)' % (cn, self.exc, self.path, self.dest_name)
[docs]class CoalesceError(GlomError): """This :exc:`GlomError` subtype is raised from within a :class:`Coalesce` spec's processing, when none of the subspecs match and no default is provided. The exception object itself keeps track of several values which may be useful for processing: Args: coal_obj (Coalesce): The original failing spec, see :class:`Coalesce`'s docs for details. skipped (list): A list of ignored values and exceptions, in the order that their respective subspecs appear in the original *coal_obj*. path: Like many GlomErrors, this exception knows the path at which it occurred. >>> target = {} >>> glom(target, Coalesce('a', 'b')) Traceback (most recent call last): ... CoalesceError: no valid values found. Tried ('a', 'b') and got (PathAccessError, PathAccessError) ... .. note:: Coalesce is a *branching* specifier type, so as of v20.7.0, its exception messages feature an error tree. See :ref:`branched-exceptions` for details on how to interpret these exceptions. """ def __init__(self, coal_obj, skipped, path): self.coal_obj = coal_obj self.skipped = skipped self.path = path def __repr__(self): cn = self.__class__.__name__ return '%s(%r, %r, %r)' % (cn, self.coal_obj, self.skipped, self.path) def get_message(self): missed_specs = tuple(self.coal_obj.subspecs) skipped_vals = [v.__class__.__name__ if isinstance(v, self.coal_obj.skip_exc) else '<skipped %s>' % v.__class__.__name__ for v in self.skipped] msg = ('no valid values found. Tried %r and got (%s)' % (missed_specs, ', '.join(skipped_vals))) if self.coal_obj.skip is not _MISSING: msg += ', skip set to %r' % (self.coal_obj.skip,) if self.coal_obj.skip_exc is not GlomError: msg += ', skip_exc set to %r' % (self.coal_obj.skip_exc,) if self.path is not None: msg += ' (at path %r)' % (self.path,) return msg
[docs]class BadSpec(GlomError, TypeError): """Raised when a spec structure is malformed, e.g., when a specifier type is invalid for the current mode."""
[docs]class UnregisteredTarget(GlomError): """This :class:`GlomError` subtype is raised when a spec calls for an unsupported action on a target type. For instance, trying to iterate on an non-iterable target: >>> glom(object(), ['a.b.c']) Traceback (most recent call last): ... UnregisteredTarget: target type 'object' not registered for 'iterate', expected one of registered types: (...) It should be noted that this is a pretty uncommon occurrence in production glom usage. See the :ref:`setup-and-registration` section for details on how to avoid this error. An UnregisteredTarget takes and tracks a few values: Args: op (str): The name of the operation being performed ('get' or 'iterate') target_type (type): The type of the target being processed. type_map (dict): A mapping of target types that do support this operation path: The path at which the error occurred. """ def __init__(self, op, target_type, type_map, path): self.op = op self.target_type = target_type self.type_map = type_map self.path = path super(UnregisteredTarget, self).__init__(op, target_type, type_map, path) def __repr__(self): cn = self.__class__.__name__ # <type %r> is because Python 3 inexplicably changed the type # repr from <type *> to <class *> return ('%s(%r, <type %r>, %r, %r)' % (cn, self.op, self.target_type.__name__, self.type_map, self.path)) def get_message(self): if not self.type_map: return ("glom() called without registering any types for operation '%s'. see" " glom.register() or Glommer's constructor for details." % (self.op,)) reg_types = sorted([t.__name__ for t, h in self.type_map.items() if h]) reg_types_str = '()' if not reg_types else ('(%s)' % ', '.join(reg_types)) msg = ("target type %r not registered for '%s', expected one of" " registered types: %s" % (self.target_type.__name__, self.op, reg_types_str)) if self.path: msg += ' (at %r)' % (self.path,) return msg
if getattr(__builtins__, '__dict__', None) is not None: # pypy's __builtins__ is a module, as is CPython's REPL, but at # normal execution time it's a dict? __builtins__ = __builtins__.__dict__ _BUILTIN_ID_NAME_MAP = dict([(id(v), k) for k, v in __builtins__.items()]) class _BBRepr(Repr): """A better repr for builtins, when the built-in repr isn't roundtrippable. """ def __init__(self): super().__init__() # turn up all the length limits very high for name in self.__dict__: if not isinstance(getattr(self, name), int): continue setattr(self, name, 1024) def repr1(self, x, level): ret = Repr.repr1(self, x, level) if not ret.startswith('<'): return ret return _BUILTIN_ID_NAME_MAP.get(id(x), ret) bbrepr = recursive_repr()(_BBRepr().repr) class _BBReprFormatter(string.Formatter): """ allow format strings to be evaluated where {!r} will use bbrepr instead of repr """ def convert_field(self, value, conversion): if conversion == 'r': return bbrepr(value).replace("\\'", "'") return super(_BBReprFormatter, self).convert_field(value, conversion) bbformat = _BBReprFormatter().format # TODO: push this back up to boltons with repr kwarg def format_invocation(name='', args=(), kwargs=None, **kw): """Given a name, positional arguments, and keyword arguments, format a basic Python-style function call. >>> print(format_invocation('func', args=(1, 2), kwargs={'c': 3})) func(1, 2, c=3) >>> print(format_invocation('a_func', args=(1,))) a_func(1) >>> print(format_invocation('kw_func', kwargs=[('a', 1), ('b', 2)])) kw_func(a=1, b=2) """ _repr = kw.pop('repr', bbrepr) if kw: raise TypeError('unexpected keyword args: %r' % ', '.join(kw.keys())) kwargs = kwargs or {} a_text = ', '.join([_repr(a) for a in args]) if isinstance(kwargs, dict): kwarg_items = [(k, kwargs[k]) for k in sorted(kwargs)] else: kwarg_items = kwargs kw_text = ', '.join(['%s=%s' % (k, _repr(v)) for k, v in kwarg_items]) all_args_text = a_text if all_args_text and kw_text: all_args_text += ', ' all_args_text += kw_text return '%s(%s)' % (name, all_args_text)
[docs]class Path(object): """Path objects specify explicit paths when the default ``'a.b.c'``-style general access syntax won't work or isn't desirable. Use this to wrap ints, datetimes, and other valid keys, as well as strings with dots that shouldn't be expanded. >>> target = {'a': {'b': 'c', 'd.e': 'f', 2: 3}} >>> glom(target, Path('a', 2)) 3 >>> glom(target, Path('a', 'd.e')) 'f' Paths can be used to join together other Path objects, as well as :data:`~glom.T` objects: >>> Path(T['a'], T['b']) T['a']['b'] >>> Path(Path('a', 'b'), Path('c', 'd')) Path('a', 'b', 'c', 'd') Paths also support indexing and slicing, with each access returning a new Path object: >>> path = Path('a', 'b', 1, 2) >>> path[0] Path('a') >>> path[-2:] Path(1, 2) To build a Path object from a string, use :meth:`Path.from_text()`. This is the default behavior when the top-level :func:`~glom.glom` function gets a string spec. """ def __init__(self, *path_parts): if not path_parts: self.path_t = T return if isinstance(path_parts[0], TType): path_t = path_parts[0] offset = 1 else: path_t = T offset = 0 for part in path_parts[offset:]: if isinstance(part, Path): part = part.path_t if isinstance(part, TType): sub_parts = part.__ops__ if sub_parts[0] is not T: raise ValueError('path segment must be path from T, not %r' % sub_parts[0]) i = 1 while i < len(sub_parts): path_t = _t_child(path_t, sub_parts[i], sub_parts[i + 1]) i += 2 else: path_t = _t_child(path_t, 'P', part) self.path_t = path_t _CACHE = {True: {}, False: {}} _MAX_CACHE = 10000 _STAR_WARNED = False @classmethod def from_text(cls, text): """Make a Path from .-delimited text: >>> Path.from_text('a.b.c') Path('a', 'b', 'c') This is the default behavior when :func:`~glom.glom` gets a string spec. """ def create(): segs = text.split('.') if PATH_STAR: segs = [ _T_STAR if seg == '*' else _T_STARSTAR if seg == '**' else seg for seg in segs] elif not cls._STAR_WARNED: if '*' in segs or '**' in segs: warnings.warn( "'*' and '**' have changed behavior in glom version 23.1." " Recommend switch to T['*'] or T['**'].") cls._STAR_WARNED = True return cls(*segs) cache = cls._CACHE[PATH_STAR] # remove this when PATH_STAR is default if text not in cache: if len(cache) > cls._MAX_CACHE: return create() cache[text] = create() return cache[text] def glomit(self, target, scope): # The entrypoint for the Path extension return _t_eval(target, self.path_t, scope) def __len__(self): return (len(self.path_t.__ops__) - 1) // 2 def __eq__(self, other): if type(other) is Path: return self.path_t.__ops__ == other.path_t.__ops__ elif type(other) is TType: return self.path_t.__ops__ == other.__ops__ return False def __ne__(self, other): return not self == other def values(self): """ Returns a tuple of values referenced in this path. >>> Path(T.a.b, 'c', T['d']).values() ('a', 'b', 'c', 'd') """ cur_t_path = self.path_t.__ops__ return cur_t_path[2::2] def items(self): """ Returns a tuple of (operation, value) pairs. >>> Path(T.a.b, 'c', T['d']).items() (('.', 'a'), ('.', 'b'), ('P', 'c'), ('[', 'd')) """ cur_t_path = self.path_t.__ops__ return tuple(zip(cur_t_path[1::2], cur_t_path[2::2])) def startswith(self, other): if isinstance(other, basestring): other = Path(other) if isinstance(other, Path): other = other.path_t if not isinstance(other, TType): raise TypeError('can only check if Path starts with string, Path or T') o_path = other.__ops__ return self.path_t.__ops__[:len(o_path)] == o_path def from_t(self): '''return the same path but starting from T''' t_path = self.path_t.__ops__ if t_path[0] is S: new_t = TType() new_t.__ops__ = (T,) + t_path[1:] return Path(new_t) return self def __getitem__(self, i): cur_t_path = self.path_t.__ops__ try: step = i.step start = i.start if i.start is not None else 0 stop = i.stop start = (start * 2) + 1 if start >= 0 else (start * 2) + len(cur_t_path) if stop is not None: stop = (stop * 2) + 1 if stop >= 0 else (stop * 2) + len(cur_t_path) except AttributeError: step = 1 start = (i * 2) + 1 if i >= 0 else (i * 2) + len(cur_t_path) if start < 0 or start > len(cur_t_path): raise IndexError('Path index out of range') stop = ((i + 1) * 2) + 1 if i >= 0 else ((i + 1) * 2) + len(cur_t_path) new_t = TType() new_path = cur_t_path[start:stop] if step is not None and step != 1: new_path = tuple(zip(new_path[::2], new_path[1::2]))[::step] new_path = sum(new_path, ()) new_t.__ops__ = (cur_t_path[0],) + new_path return Path(new_t) def __repr__(self): return _format_path(self.path_t.__ops__[1:])
def _format_path(t_path): path_parts, cur_t_path = [], [] i = 0 while i < len(t_path): op, arg = t_path[i], t_path[i + 1] i += 2 if op == 'P': if cur_t_path: path_parts.append(cur_t_path) cur_t_path = [] path_parts.append(arg) else: cur_t_path.append(op) cur_t_path.append(arg) if path_parts and cur_t_path: path_parts.append(cur_t_path) if path_parts or not cur_t_path: return 'Path(%s)' % ', '.join([_format_t(part) if type(part) is list else repr(part) for part in path_parts]) return _format_t(cur_t_path)
[docs]class Spec(object): """Spec objects serve three purposes, here they are, roughly ordered by utility: 1. As a form of compiled or "curried" glom call, similar to Python's built-in :func:`re.compile`. 2. A marker as an object as representing a spec rather than a literal value in certain cases where that might be ambiguous. 3. A way to update the scope within another Spec. In the second usage, Spec objects are the complement to :class:`~glom.Val`, wrapping a value and marking that it should be interpreted as a glom spec, rather than a literal value. This is useful in places where it would be interpreted as a value by default. (Such as T[key], Call(func) where key and func are assumed to be literal values and not specs.) Args: spec: The glom spec. scope (dict): additional values to add to the scope when evaluating this Spec """ def __init__(self, spec, scope=None): self.spec = spec self.scope = scope or {} def glom(self, target, **kw): scope = dict(self.scope) scope.update(kw.get('scope', {})) kw['scope'] = ChainMap(scope) glom_ = scope.get(glom, glom) return glom_(target, self.spec, **kw) def glomit(self, target, scope): scope.update(self.scope) return scope[glom](target, self.spec, scope) def __repr__(self): cn = self.__class__.__name__ if self.scope: return '%s(%s, scope=%r)' % (cn, bbrepr(self.spec), self.scope) return '%s(%s)' % (cn, bbrepr(self.spec))
[docs]class Coalesce(object): """Coalesce objects specify fallback behavior for a list of subspecs. Subspecs are passed as positional arguments, and keyword arguments control defaults. Each subspec is evaluated in turn, and if none match, a :exc:`CoalesceError` is raised, or a default is returned, depending on the options used. .. note:: This operation may seem very familar if you have experience with `SQL`_ or even `C# and others`_. In practice, this fallback behavior's simplicity is only surpassed by its utility: >>> target = {'c': 'd'} >>> glom(target, Coalesce('a', 'b', 'c')) 'd' glom tries to get ``'a'`` from ``target``, but gets a KeyError. Rather than raise a :exc:`~glom.PathAccessError` as usual, glom *coalesces* into the next subspec, ``'b'``. The process repeats until it gets to ``'c'``, which returns our value, ``'d'``. If our value weren't present, we'd see: >>> target = {} >>> glom(target, Coalesce('a', 'b')) Traceback (most recent call last): ... CoalesceError: no valid values found. Tried ('a', 'b') and got (PathAccessError, PathAccessError) ... Same process, but because ``target`` is empty, we get a :exc:`CoalesceError`. .. note:: Coalesce is a *branching* specifier type, so as of v20.7.0, its exception messages feature an error tree. See :ref:`branched-exceptions` for details on how to interpret these exceptions. If we want to avoid an exception, and we know which value we want by default, we can set *default*: >>> target = {} >>> glom(target, Coalesce('a', 'b', 'c'), default='d-fault') 'd-fault' ``'a'``, ``'b'``, and ``'c'`` weren't present so we got ``'d-fault'``. Args: subspecs: One or more glommable subspecs default: A value to return if no subspec results in a valid value default_factory: A callable whose result will be returned as a default skip: A value, tuple of values, or predicate function representing values to ignore skip_exc: An exception or tuple of exception types to catch and move on to the next subspec. Defaults to :exc:`GlomError`, the parent type of all glom runtime exceptions. If all subspecs produce skipped values or exceptions, a :exc:`CoalesceError` will be raised. For more examples, check out the :doc:`tutorial`, which makes extensive use of Coalesce. .. _SQL: https://en.wikipedia.org/w/index.php?title=Null_(SQL)&oldid=833093792#COALESCE .. _C# and others: https://en.wikipedia.org/w/index.php?title=Null_coalescing_operator&oldid=839493322#C# """ def __init__(self, *subspecs, **kwargs): self.subspecs = subspecs self._orig_kwargs = dict(kwargs) self.default = kwargs.pop('default', _MISSING) self.default_factory = kwargs.pop('default_factory', _MISSING) if self.default and self.default_factory: raise ValueError('expected one of "default" or "default_factory", not both') self.skip = kwargs.pop('skip', _MISSING) if self.skip is _MISSING: self.skip_func = lambda v: False elif callable(self.skip): self.skip_func = self.skip elif isinstance(self.skip, tuple): self.skip_func = lambda v: v in self.skip else: self.skip_func = lambda v: v == self.skip self.skip_exc = kwargs.pop('skip_exc', GlomError) if kwargs: raise TypeError('unexpected keyword args: %r' % (sorted(kwargs.keys()),)) def glomit(self, target, scope): skipped = [] for subspec in self.subspecs: try: ret = scope[glom](target, subspec, scope) if not self.skip_func(ret): break skipped.append(ret) except self.skip_exc as e: skipped.append(e) continue else: if self.default is not _MISSING: ret = arg_val(target, self.default, scope) elif self.default_factory is not _MISSING: ret = self.default_factory() else: raise CoalesceError(self, skipped, scope[Path]) return ret def __repr__(self): cn = self.__class__.__name__ return format_invocation(cn, self.subspecs, self._orig_kwargs, repr=bbrepr)
[docs]class Inspect(object): """The :class:`~glom.Inspect` specifier type provides a way to get visibility into glom's evaluation of a specification, enabling debugging of those tricky problems that may arise with unexpected data. :class:`~glom.Inspect` can be inserted into an existing spec in one of two ways. First, as a wrapper around the spec in question, or second, as an argument-less placeholder wherever a spec could be. :class:`~glom.Inspect` supports several modes, controlled by keyword arguments. Its default, no-argument mode, simply echos the state of the glom at the point where it appears: >>> target = {'a': {'b': {}}} >>> val = glom(target, Inspect('a.b')) # wrapping a spec --- path: ['a.b'] target: {'a': {'b': {}}} output: {} --- Debugging behavior aside, :class:`~glom.Inspect` has no effect on values in the target, spec, or result. Args: echo (bool): Whether to print the path, target, and output of each inspected glom. Defaults to True. recursive (bool): Whether or not the Inspect should be applied at every level, at or below the spec that it wraps. Defaults to False. breakpoint (bool): This flag controls whether a debugging prompt should appear before evaluating each inspected spec. Can also take a callable. Defaults to False. post_mortem (bool): This flag controls whether exceptions should be caught and interactively debugged with :mod:`pdb` on inspected specs. All arguments above are keyword-only to avoid overlap with a wrapped spec. .. note:: Just like ``pdb.set_trace()``, be careful about leaving stray ``Inspect()`` instances in production glom specs. """ def __init__(self, *a, **kw): self.wrapped = a[0] if a else Path() self.recursive = kw.pop('recursive', False) self.echo = kw.pop('echo', True) breakpoint = kw.pop('breakpoint', False) if breakpoint is True: breakpoint = pdb.set_trace if breakpoint and not callable(breakpoint): raise TypeError('breakpoint expected bool or callable, not: %r' % breakpoint) self.breakpoint = breakpoint post_mortem = kw.pop('post_mortem', False) if post_mortem is True: post_mortem = pdb.post_mortem if post_mortem and not callable(post_mortem): raise TypeError('post_mortem expected bool or callable, not: %r' % post_mortem) self.post_mortem = post_mortem def __repr__(self): return '<INSPECT>' def glomit(self, target, scope): # stash the real handler under Inspect, # and replace the child handler with a trace callback scope[Inspect] = scope[glom] scope[glom] = self._trace return scope[glom](target, self.wrapped, scope) def _trace(self, target, spec, scope): if not self.recursive: scope[glom] = scope[Inspect] if self.echo: print('---') # TODO: switch from scope[Path] to the Target-Spec format trace above # ... but maybe be smart about only printing deltas instead of the whole # thing print('path: ', scope[Path] + [spec]) print('target:', target) if self.breakpoint: # TODO: real debugger here? self.breakpoint() try: ret = scope[Inspect](target, spec, scope) except Exception: if self.post_mortem: self.post_mortem() raise if self.echo: print('output:', ret) print('---') return ret
[docs]class Call(object): """:class:`Call` specifies when a target should be passed to a function, *func*. :class:`Call` is similar to :func:`~functools.partial` in that it is no more powerful than ``lambda`` or other functions, but it is designed to be more readable, with a better ``repr``. Args: func (callable): a function or other callable to be called with the target :class:`Call` combines well with :attr:`~glom.T` to construct objects. For instance, to generate a dict and then pass it to a constructor: >>> class ExampleClass(object): ... def __init__(self, attr): ... self.attr = attr ... >>> target = {'attr': 3.14} >>> glom(target, Call(ExampleClass, kwargs=T)).attr 3.14 This does the same as ``glom(target, lambda target: ExampleClass(**target))``, but it's easy to see which one reads better. .. note:: ``Call`` is mostly for functions. Use a :attr:`~glom.T` object if you need to call a method. .. warning:: :class:`Call` has a successor with a fuller-featured API, new in 19.10.0: the :class:`Invoke` specifier type. """ def __init__(self, func=None, args=None, kwargs=None): if func is None: func = T if not (callable(func) or isinstance(func, (Spec, TType))): raise TypeError('expected func to be a callable or T' ' expression, not: %r' % (func,)) if args is None: args = () if kwargs is None: kwargs = {} self.func, self.args, self.kwargs = func, args, kwargs def glomit(self, target, scope): 'run against the current target' r = lambda spec: arg_val(target, spec, scope) return r(self.func)(*r(self.args), **r(self.kwargs)) def __repr__(self): cn = self.__class__.__name__ return '%s(%s, args=%r, kwargs=%r)' % (cn, bbrepr(self.func), self.args, self.kwargs)
def _is_spec(obj, strict=False): # a little util for codifying the spec type checking in glom if isinstance(obj, TType): return True if strict: return type(obj) is Spec return _has_callable_glomit(obj) # pragma: no cover
[docs]class Invoke(object): """Specifier type designed for easy invocation of callables from glom. Args: func (callable): A function or other callable object. ``Invoke`` is similar to :func:`functools.partial`, but with the ability to set up a "templated" call which interleaves constants and glom specs. For example, the following creates a spec which can be used to check if targets are integers: >>> is_int = Invoke(isinstance).specs(T).constants(int) >>> glom(5, is_int) True And this composes like any other glom spec: >>> target = [7, object(), 9] >>> glom(target, [is_int]) [True, False, True] Another example, mixing positional and keyword arguments: >>> spec = Invoke(sorted).specs(T).constants(key=int, reverse=True) >>> target = ['10', '5', '20', '1'] >>> glom(target, spec) ['20', '10', '5', '1'] Invoke also helps with evaluating zero-argument functions: >>> glom(target={}, spec=Invoke(int)) 0 (A trivial example, but from timestamps to UUIDs, zero-arg calls do come up!) .. note:: ``Invoke`` is mostly for functions, object construction, and callable objects. For calling methods, consider the :attr:`~glom.T` object. """ def __init__(self, func): if not callable(func) and not _is_spec(func, strict=True): raise TypeError('expected func to be a callable or Spec instance,' ' not: %r' % (func,)) self.func = func self._args = () # a registry of every known kwarg to its freshest value as set # by the methods below. the **kw dict is used as a unique marker. self._cur_kwargs = {}
[docs] @classmethod def specfunc(cls, spec): """Creates an :class:`Invoke` instance where the function is indicated by a spec. >>> spec = Invoke.specfunc('func').constants(5) >>> glom({'func': range}, (spec, list)) [0, 1, 2, 3, 4] """ return cls(Spec(spec))
[docs] def constants(self, *a, **kw): """Returns a new :class:`Invoke` spec, with the provided positional and keyword argument values stored for passing to the underlying function. >>> spec = Invoke(T).constants(5) >>> glom(range, (spec, list)) [0, 1, 2, 3, 4] Subsequent positional arguments are appended: >>> spec = Invoke(T).constants(2).constants(10, 2) >>> glom(range, (spec, list)) [2, 4, 6, 8] Keyword arguments also work as one might expect: >>> round_2 = Invoke(round).constants(ndigits=2).specs(T) >>> glom(3.14159, round_2) 3.14 :meth:`~Invoke.constants()` and other :class:`Invoke` methods may be called multiple times, just remember that every call returns a new spec. """ ret = self.__class__(self.func) ret._args = self._args + ('C', a, kw) ret._cur_kwargs = dict(self._cur_kwargs) ret._cur_kwargs.update({k: kw for k, _ in kw.items()}) return ret
[docs] def specs(self, *a, **kw): """Returns a new :class:`Invoke` spec, with the provided positional and keyword arguments stored to be interpreted as specs, with the results passed to the underlying function. >>> spec = Invoke(range).specs('value') >>> glom({'value': 5}, (spec, list)) [0, 1, 2, 3, 4] Subsequent positional arguments are appended: >>> spec = Invoke(range).specs('start').specs('end', 'step') >>> target = {'start': 2, 'end': 10, 'step': 2} >>> glom(target, (spec, list)) [2, 4, 6, 8] Keyword arguments also work as one might expect: >>> multiply = lambda x, y: x * y >>> times_3 = Invoke(multiply).constants(y=3).specs(x='value') >>> glom({'value': 5}, times_3) 15 :meth:`~Invoke.specs()` and other :class:`Invoke` methods may be called multiple times, just remember that every call returns a new spec. """ ret = self.__class__(self.func) ret._args = self._args + ('S', a, kw) ret._cur_kwargs = dict(self._cur_kwargs) ret._cur_kwargs.update({k: kw for k, _ in kw.items()}) return ret
[docs] def star(self, args=None, kwargs=None): """Returns a new :class:`Invoke` spec, with *args* and/or *kwargs* specs set to be "starred" or "star-starred" (respectively) >>> spec = Invoke(zip).star(args='lists') >>> target = {'lists': [[1, 2], [3, 4], [5, 6]]} >>> list(glom(target, spec)) [(1, 3, 5), (2, 4, 6)] Args: args (spec): A spec to be evaluated and "starred" into the underlying function. kwargs (spec): A spec to be evaluated and "star-starred" into the underlying function. One or both of the above arguments should be set. The :meth:`~Invoke.star()`, like other :class:`Invoke` methods, may be called multiple times. The *args* and *kwargs* will be stacked in the order in which they are provided. """ if args is None and kwargs is None: raise TypeError('expected one or both of args/kwargs to be passed') ret = self.__class__(self.func) ret._args = self._args + ('*', args, kwargs) ret._cur_kwargs = dict(self._cur_kwargs) return ret
def __repr__(self): base_fname = self.__class__.__name__ fname_map = {'C': 'constants', 'S': 'specs', '*': 'star'} if type(self.func) is Spec: base_fname += '.specfunc' args = (self.func.spec,) else: args = (self.func,) chunks = [format_invocation(base_fname, args, repr=bbrepr)] for i in range(len(self._args) // 3): op, args, _kwargs = self._args[i * 3: i * 3 + 3] fname = fname_map[op] if op in ('C', 'S'): kwargs = [(k, v) for k, v in _kwargs.items() if self._cur_kwargs[k] is _kwargs] else: kwargs = {} if args: kwargs['args'] = args if _kwargs: kwargs['kwargs'] = _kwargs args = () chunks.append('.' + format_invocation(fname, args, kwargs, repr=bbrepr)) return ''.join(chunks) def glomit(self, target, scope): all_args = [] all_kwargs = {} recurse = lambda spec: scope[glom](target, spec, scope) func = recurse(self.func) if _is_spec(self.func, strict=True) else self.func for i in range(len(self._args) // 3): op, args, kwargs = self._args[i * 3: i * 3 + 3] if op == 'C': all_args.extend(args) all_kwargs.update({k: v for k, v in kwargs.items() if self._cur_kwargs[k] is kwargs}) elif op == 'S': all_args.extend([recurse(arg) for arg in args]) all_kwargs.update({k: recurse(v) for k, v in kwargs.items() if self._cur_kwargs[k] is kwargs}) elif op == '*': if args is not None: all_args.extend(recurse(args)) if kwargs is not None: all_kwargs.update(recurse(kwargs)) return func(*all_args, **all_kwargs)
[docs]class Ref(object): """Name a part of a spec and refer to it elsewhere in the same spec, useful for trees and other self-similar data structures. Args: name (str): The name of the spec to reference. subspec: Pass a spec to name it *name*, or leave unset to refer to an already-named spec. """ def __init__(self, name, subspec=_MISSING): self.name, self.subspec = name, subspec def glomit(self, target, scope): subspec = self.subspec scope_key = (Ref, self.name) if subspec is _MISSING: subspec = scope[scope_key] else: scope[scope_key] = subspec return scope[glom](target, subspec, scope) def __repr__(self): if self.subspec is _MISSING: args = bbrepr(self.name) else: args = bbrepr((self.name, self.subspec))[1:-1] return "Ref(" + args + ")"
class TType(object): """``T``, short for "target". A singleton object that enables object-oriented expression of a glom specification. .. note:: ``T`` is a singleton, and does not need to be constructed. Basically, think of ``T`` as your data's stunt double. Everything that you do to ``T`` will be recorded and executed during the :func:`glom` call. Take this example: >>> spec = T['a']['b']['c'] >>> target = {'a': {'b': {'c': 'd'}}} >>> glom(target, spec) 'd' So far, we've relied on the ``'a.b.c'``-style shorthand for access, or used the :class:`~glom.Path` objects, but if you want to explicitly do attribute and key lookups, look no further than ``T``. But T doesn't stop with unambiguous access. You can also call methods and perform almost any action you would with a normal object: >>> spec = ('a', (T['b'].items(), list)) # reviewed below >>> glom(target, spec) [('c', 'd')] A ``T`` object can go anywhere in the spec. As seen in the example above, we access ``'a'``, use a ``T`` to get ``'b'`` and iterate over its ``items``, turning them into a ``list``. You can even use ``T`` with :class:`~glom.Call` to construct objects: >>> class ExampleClass(object): ... def __init__(self, attr): ... self.attr = attr ... >>> target = {'attr': 3.14} >>> glom(target, Call(ExampleClass, kwargs=T)).attr 3.14 On a further note, while ``lambda`` works great in glom specs, and can be very handy at times, ``T`` and :class:`~glom.Call` eliminate the need for the vast majority of ``lambda`` usage with glom. Unlike ``lambda`` and other functions, ``T`` roundtrips beautifully and transparently: >>> T['a'].b['c']('success') T['a'].b['c']('success') ``T``-related access errors raise a :exc:`~glom.PathAccessError` during the :func:`~glom.glom` call. .. note:: While ``T`` is clearly useful, powerful, and here to stay, its semantics are still being refined. Currently, operations beyond method calls and attribute/item access are considered experimental and should not be relied upon. .. note:: ``T`` attributes starting with __ are reserved to avoid colliding with many built-in Python behaviors, current and future. The ``T.__()`` method is available for cases where they are needed. For example, ``T.__('class__')`` is equivalent to accessing the ``__class__`` attribute. """ __slots__ = ('__ops__',) def __getattr__(self, name): if name.startswith('__'): raise AttributeError('T instances reserve dunder attributes.' ' To access the "{name}" attribute, use' ' T.__("{d_name}")'.format(name=name, d_name=name[2:])) return _t_child(self, '.', name) def __getitem__(self, item): return _t_child(self, '[', item) def __call__(self, *args, **kwargs): if self is S: if args: raise TypeError('S() takes no positional arguments, got: %r' % (args,)) if not kwargs: raise TypeError('S() expected at least one kwarg, got none') # TODO: typecheck kwarg vals? return _t_child(self, '(', (args, kwargs)) def __star__(self): return _t_child(self, 'x', None) def __starstar__(self): return _t_child(self, 'X', None) def __stars__(self): """how many times the result will be wrapped in extra lists""" t_ops = self.__ops__[1::2] return t_ops.count('x') + t_ops.count('X') def __add__(self, arg): return _t_child(self, '+', arg) def __sub__(self, arg): return _t_child(self, '-', arg) def __mul__(self, arg): return _t_child(self, '*', arg) def __floordiv__(self, arg): return _t_child(self, '#', arg) def __truediv__(self, arg): return _t_child(self, '/', arg) __div__ = __truediv__ def __mod__(self, arg): return _t_child(self, '%', arg) def __pow__(self, arg): return _t_child(self, ':', arg) def __and__(self, arg): return _t_child(self, '&', arg) def __or__(self, arg): return _t_child(self, '|', arg) def __xor__(self, arg): return _t_child(self, '^', arg) def __invert__(self): return _t_child(self, '~', None) def __neg__(self): return _t_child(self, '_', None) def __(self, name): return _t_child(self, '.', '__' + name) def __repr__(self): t_path = self.__ops__ return _format_t(t_path[1:], t_path[0]) def __getstate__(self): t_path = self.__ops__ return tuple(({T: 'T', S: 'S', A: 'A'}[t_path[0]],) + t_path[1:]) def __setstate__(self, state): self.__ops__ = ({'T': T, 'S': S, 'A': A}[state[0]],) + state[1:] def _t_child(parent, operation, arg): base = parent.__ops__ if base[0] is A and operation not in ('.', '[', 'P'): # whitelist rather than blacklist assignment friendly operations # TODO: error type? raise BadSpec("operation not allowed on A assignment path") t = TType() t.__ops__ = base + (operation, arg) return t def _s_first_magic(scope, key, _t): """ enable S.a to do S['a'] or S['a'].val as a special case for accessing user defined string variables """ err = None try: cur = scope[key] except KeyError as e: err = PathAccessError(e, Path(_t), 0) # always only one level depth, hence 0 if err: raise err return cur def _t_eval(target, _t, scope): t_path = _t.__ops__ i = 1 fetch_till = len(t_path) root = t_path[0] if root is T: cur = target elif root is S or root is A: # A is basically the same as S, but last step is assign if root is A: fetch_till -= 2 if fetch_till < 1: raise BadSpec('cannot assign without destination') cur = scope if fetch_till > 1 and t_path[1] in ('.', 'P'): cur = _s_first_magic(cur, t_path[2], _t) i += 2 elif root is S and fetch_till > 1 and t_path[1] == '(': # S(var='spec') style assignment _, kwargs = t_path[2] scope.update({ k: arg_val(target, v, scope) for k, v in kwargs.items()}) return target else: raise ValueError('TType instance with invalid root') # pragma: no cover pae = None while i < fetch_till: op, arg = t_path[i], t_path[i + 1] arg = arg_val(target, arg, scope) if op == '.': try: cur = getattr(cur, arg) except AttributeError as e: pae = PathAccessError(e, Path(_t), i // 2) elif op == '[': try: cur = cur[arg] except (KeyError, IndexError, TypeError) as e: pae = PathAccessError(e, Path(_t), i // 2) elif op == 'P': # Path type stuff (fuzzy match) get = scope[TargetRegistry].get_handler('get', cur, path=t_path[2:i+2:2]) try: cur = get(cur, arg) except Exception as e: pae = PathAccessError(e, Path(_t), i // 2) elif op in 'xX': nxt = [] get_handler = scope[TargetRegistry].get_handler if op == 'x': # increases arity of cur each time through # TODO: so many try/except -- could scope[TargetRegistry] stuff be cached on type? _extend_children(nxt, cur, get_handler) elif op == 'X': sofar = set() _extend_children(nxt, cur, get_handler) for item in nxt: if id(item) not in sofar: sofar.add(id(item)) _extend_children(nxt, item, get_handler) nxt.insert(0, cur) # handle the rest of the t_path in recursive calls cur = [] todo = TType() todo.__ops__ = (root,) + t_path[i+2:] for child in nxt: try: cur.append(_t_eval(child, todo, scope)) except PathAccessError: pass break # we handled the rest in recursive call, break loop elif op == '(': args, kwargs = arg scope[Path] += t_path[2:i+2:2] cur = scope[glom]( target, Call(cur, args, kwargs), scope) # call with target rather than cur, # because it is probably more intuitive # if args to the call "reset" their path # e.g. "T.a" should mean the same thing # in both of these specs: T.a and T.b(T.a) else: # arithmetic operators try: if op == '+': cur = cur + arg elif op == '-': cur = cur - arg elif op == '*': cur = cur * arg #elif op == '#': # cur = cur // arg # TODO: python 2 friendly approach? elif op == '/': cur = cur / arg elif op == '%': cur = cur % arg elif op == ':': cur = cur ** arg elif op == '&': cur = cur & arg elif op == '|': cur = cur | arg elif op == '^': cur = cur ^ arg elif op == '~': cur = ~cur elif op == '_': cur = -cur except (TypeError, ZeroDivisionError) as e: pae = PathAccessError(e, Path(_t), i // 2) if pae: raise pae i += 2 if root is A: op, arg = t_path[-2:] if cur is scope: op = '[' # all assignment on scope is setitem _assign_op(dest=cur, op=op, arg=arg, val=target, path=_t, scope=scope) return target # A should not change the target return cur def _assign_op(dest, op, arg, val, path, scope): """helper method for doing the assignment on a T operation""" if op == '[': dest[arg] = val elif op == '.': setattr(dest, arg, val) elif op == 'P': _assign = scope[TargetRegistry].get_handler('assign', dest) try: _assign(dest, arg, val) except Exception as e: raise PathAssignError(e, path, arg) else: # pragma: no cover raise ValueError('unsupported T operation for assignment') def _extend_children(children, item, get_handler): try: # dict or obj-like keys = get_handler('keys', item) get = get_handler('get', item) except UnregisteredTarget: try: iterate = get_handler('iterate', item) except UnregisteredTarget: pass else: try: # list-like children.extend(iterate(item)) except Exception: pass else: try: for key in keys(item): try: children.append(get(item, key)) except Exception: pass except Exception: pass T = TType() # target aka Mr. T aka "this" S = TType() # like T, but means grab stuff from Scope, not Target A = TType() # like S, but shorthand to assign target to scope T.__ops__ = (T,) S.__ops__ = (S,) A.__ops__ = (A,) _T_STAR = T.__star__() # helper constant for Path.from_text _T_STARSTAR = T.__starstar__() # helper constant for Path.from_text UP = make_sentinel('UP') ROOT = make_sentinel('ROOT') def _format_slice(x): if type(x) is not slice: return bbrepr(x) fmt = lambda v: "" if v is None else bbrepr(v) if x.step is None: return fmt(x.start) + ":" + fmt(x.stop) return fmt(x.start) + ":" + fmt(x.stop) + ":" + fmt(x.step) def _format_t(path, root=T): prepr = [{T: 'T', S: 'S', A: 'A'}[root]] i = 0 while i < len(path): op, arg = path[i], path[i + 1] if op == '.': prepr.append('.' + arg) elif op == '[': if type(arg) is tuple: index = ", ".join([_format_slice(x) for x in arg]) else: index = _format_slice(arg) prepr.append("[%s]" % (index,)) elif op == '(': args, kwargs = arg prepr.append(format_invocation(args=args, kwargs=kwargs, repr=bbrepr)) elif op == 'P': return _format_path(path) elif op == 'x': prepr.append(".__star__()") elif op == 'X': prepr.append(".__starstar__()") elif op in ('_', '~'): # unary arithmetic operators if any([o in path[:i] for o in '+-/%:&|^~_']): prepr = ['('] + prepr + [')'] prepr = ['-' if op == '_' else op] + prepr else: # binary arithmetic operators formatted_arg = bbrepr(arg) if type(arg) is TType: arg_path = arg.__ops__ if any([o in arg_path for o in '+-/%:&|^~_']): formatted_arg = '(' + formatted_arg + ')' prepr.append(' ' + ('**' if op == ':' else op) + ' ') prepr.append(formatted_arg) i += 2 return "".join(prepr)
[docs]class Val(object): """Val objects are specs which evaluate to the wrapped *value*. >>> target = {'a': {'b': 'c'}} >>> spec = {'a': 'a.b', 'readability': Val('counts')} >>> pprint(glom(target, spec)) {'a': 'c', 'readability': 'counts'} Instead of accessing ``'counts'`` as a key like it did with ``'a.b'``, :func:`~glom.glom` just unwrapped the Val and included the value. :class:`~glom.Val` takes one argument, the value to be returned. .. note:: :class:`Val` was named ``Literal`` in versions of glom before 20.7.0. An alias has been preserved for backwards compatibility, but reprs have changed. """ def __init__(self, value): self.value = value def glomit(self, target, scope): return self.value def __repr__(self): cn = self.__class__.__name__ return '%s(%s)' % (cn, bbrepr(self.value))
Literal = Val # backwards compat for pre-20.7.0 class ScopeVars(object): """This is the runtime partner of :class:`Vars` -- this is what actually lives in the scope and stores runtime values. While not part of the importable API of glom, it's half expected that some folks may write sepcs to populate and export scopes, at which point this type makes it easy to access values by attribute access or by converting to a dict. """ def __init__(self, base, defaults): self.__dict__ = dict(base) self.__dict__.update(defaults) def __iter__(self): return iter(self.__dict__.items()) def __repr__(self): return "%s(%s)" % (self.__class__.__name__, bbrepr(self.__dict__))
[docs]class Vars(object): """ :class:`Vars` is a helper that can be used with **S** in order to store shared mutable state. Takes the same arguments as :class:`dict()`. Arguments here should be thought of the same way as default arguments to a function. Each time the spec is evaluated, the same arguments will be referenced; so, think carefully about mutable data structures. """ def __init__(self, base=(), **kw): dict(base) # ensure it is a dict-compatible first arg self.base = base self.defaults = kw def glomit(self, target, spec): return ScopeVars(self.base, self.defaults) def __repr__(self): ret = format_invocation(self.__class__.__name__, args=(self.base,) if self.base else (), kwargs=self.defaults, repr=bbrepr) return ret
class Let(object): """ Deprecated, kept for backwards compat. Use S(x='y') instead. >>> target = {'data': {'val': 9}} >>> spec = (Let(value=T['data']['val']), {'val': S['value']}) >>> glom(target, spec) {'val': 9} """ def __init__(self, **kw): if not kw: raise TypeError('expected at least one keyword argument') self._binding = kw def glomit(self, target, scope): scope.update({ k: scope[glom](target, v, scope) for k, v in self._binding.items()}) return target def __repr__(self): cn = self.__class__.__name__ return format_invocation(cn, kwargs=self._binding, repr=bbrepr) class Auto(object): """ Switch to Auto mode (the default) TODO: this seems like it should be a sub-class of class Spec() -- if Spec() could help define the interface for new "modes" or dialects that would also help make match mode feel less duct-taped on """ def __init__(self, spec=None): self.spec = spec def glomit(self, target, scope): scope[MODE] = AUTO return scope[glom](target, self.spec, scope) def __repr__(self): cn = self.__class__.__name__ rpr = '' if self.spec is None else bbrepr(self.spec) return '%s(%s)' % (cn, rpr) class _AbstractIterable(_AbstractIterableBase): __metaclass__ = ABCMeta @classmethod def __subclasshook__(cls, C): if C in (str, bytes): return False return callable(getattr(C, "__iter__", None)) class _ObjStyleKeysMeta(type): def __instancecheck__(cls, C): return hasattr(C, "__dict__") and hasattr(C.__dict__, "keys") class _ObjStyleKeys(_ObjStyleKeysMeta('_AbstractKeys', (object,), {})): __metaclass__ = _ObjStyleKeysMeta @staticmethod def get_keys(obj): ret = obj.__dict__.keys() return ret def _get_sequence_item(target, index): return target[int(index)] # handlers are 3-arg callables, with args (spec, target, scope) # spec is the first argument for convenience in the case # that the handler is a method of the spec type def _handle_dict(target, spec, scope): ret = type(spec)() # TODO: works for dict + ordereddict, but sufficient for all? for field, subspec in spec.items(): val = scope[glom](target, subspec, scope) if val is SKIP: continue if type(field) in (Spec, TType): field = scope[glom](target, field, scope) ret[field] = val return ret def _handle_list(target, spec, scope): subspec = spec[0] iterate = scope[TargetRegistry].get_handler('iterate', target, path=scope[Path]) try: iterator = iterate(target) except Exception as e: raise TypeError('failed to iterate on instance of type %r at %r (got %r)' % (target.__class__.__name__, Path(*scope[Path]), e)) ret = [] base_path = scope[Path] for i, t in enumerate(iterator): scope[Path] = base_path + [i] val = scope[glom](t, subspec, scope) if val is SKIP: continue if val is STOP: break ret.append(val) return ret def _handle_tuple(target, spec, scope): res = target for subspec in spec: scope = chain_child(scope) nxt = scope[glom](res, subspec, scope) if nxt is SKIP: continue if nxt is STOP: break res = nxt if not isinstance(subspec, list): scope[Path] += [getattr(subspec, '__name__', subspec)] return res class Pipe(object): """Evaluate specs one after the other, passing the result of the previous evaluation in as the target of the next spec: >>> glom({'a': {'b': -5}}, Pipe('a', 'b', abs)) 5 Same behavior as ``Auto(tuple(steps))``, but useful for explicit usage in other modes. """ def __init__(self, *steps): self.steps = steps def glomit(self, target, scope): return _handle_tuple(target, self.steps, scope) def __repr__(self): return self.__class__.__name__ + bbrepr(self.steps) class TargetRegistry(object): ''' responsible for registration of target types for iteration and attribute walking ''' def __init__(self, register_default_types=True): self._op_type_map = {} self._op_type_tree = {} # see _register_fuzzy_type for details self._type_cache = {} self._op_auto_map = OrderedDict() # op name to function that returns handler function self._register_builtin_ops() if register_default_types: self._register_default_types() return def get_handler(self, op, obj, path=None, raise_exc=True): """for an operation and object **instance**, obj, return the closest-matching handler function, raising UnregisteredTarget if no handler can be found for *obj* (or False if raise_exc=False) """ ret = False obj_type = type(obj) cache_key = (obj_type, op) if cache_key not in self._type_cache: type_map = self.get_type_map(op) if type_map: try: ret = type_map[obj_type] except KeyError: type_tree = self._op_type_tree.get(op, {}) closest = self._get_closest_type(obj, type_tree=type_tree) if closest is None: ret = False else: ret = type_map[closest] if ret is False and raise_exc: raise UnregisteredTarget(op, obj_type, type_map=type_map, path=path) self._type_cache[cache_key] = ret return self._type_cache[cache_key] def get_type_map(self, op): try: return self._op_type_map[op] except KeyError: return OrderedDict() def _get_closest_type(self, obj, type_tree): default = None for cur_type, sub_tree in type_tree.items(): if isinstance(obj, cur_type): sub_type = self._get_closest_type(obj, type_tree=sub_tree) ret = cur_type if sub_type is None else sub_type return ret return default def _register_default_types(self): self.register(object) self.register(dict, get=operator.getitem) self.register(dict, keys=dict.keys) self.register(list, get=_get_sequence_item) self.register(tuple, get=_get_sequence_item) self.register(OrderedDict, get=operator.getitem) self.register(OrderedDict, keys=OrderedDict.keys) self.register(_AbstractIterable, iterate=iter) self.register(_ObjStyleKeys, keys=_ObjStyleKeys.get_keys) def _register_fuzzy_type(self, op, new_type, _type_tree=None): """Build a "type tree", an OrderedDict mapping registered types to their subtypes The type tree's invariant is that a key in the mapping is a valid parent type of all its children. Order is preserved such that non-overlapping parts of the subtree take precedence by which was most recently added. """ if _type_tree is None: try: _type_tree = self._op_type_tree[op] except KeyError: _type_tree = self._op_type_tree[op] = OrderedDict() registered = False for cur_type, sub_tree in list(_type_tree.items()): if issubclass(cur_type, new_type): sub_tree = _type_tree.pop(cur_type) # mutation for recursion brevity try: _type_tree[new_type][cur_type] = sub_tree except KeyError: _type_tree[new_type] = OrderedDict({cur_type: sub_tree}) registered = True elif issubclass(new_type, cur_type): _type_tree[cur_type] = self._register_fuzzy_type(op, new_type, _type_tree=sub_tree) registered = True if not registered: _type_tree[new_type] = OrderedDict() return _type_tree def register(self, target_type, **kwargs): if not isinstance(target_type, type): raise TypeError('register expected a type, not an instance: %r' % (target_type,)) exact = kwargs.pop('exact', None) new_op_map = dict(kwargs) for op_name in sorted(set(self._op_auto_map.keys()) | set(new_op_map.keys())): cur_type_map = self._op_type_map.setdefault(op_name, OrderedDict()) if op_name in new_op_map: handler = new_op_map[op_name] elif target_type in cur_type_map: handler = cur_type_map[target_type] else: try: handler = self._op_auto_map[op_name](target_type) except Exception as e: raise TypeError('error while determining support for operation' ' "%s" on target type: %s (got %r)' % (op_name, target_type.__name__, e)) if handler is not False and not callable(handler): raise TypeError('expected handler for op "%s" to be' ' callable or False, not: %r' % (op_name, handler)) new_op_map[op_name] = handler for op_name, handler in new_op_map.items(): self._op_type_map[op_name][target_type] = handler if not exact: for op_name in new_op_map: self._register_fuzzy_type(op_name, target_type) self._type_cache = {} # reset type cache return def register_op(self, op_name, auto_func=None, exact=False): """add operations beyond the builtins ('get' and 'iterate' at the time of writing). auto_func is a function that when passed a type, returns a handler associated with op_name if it's supported, or False if it's not. See glom.core.register_op() for the global version used by extensions. """ if not isinstance(op_name, basestring): raise TypeError('expected op_name to be a text name, not: %r' % (op_name,)) if auto_func is None: auto_func = lambda t: False elif not callable(auto_func): raise TypeError('expected auto_func to be callable, not: %r' % (auto_func,)) # determine support for any previously known types known_types = set(sum([list(m.keys()) for m in self._op_type_map.values()], [])) type_map = self._op_type_map.get(op_name, OrderedDict()) type_tree = self._op_type_tree.get(op_name, OrderedDict()) for t in sorted(known_types, key=lambda t: t.__name__): if t in type_map: continue try: handler = auto_func(t) except Exception as e: raise TypeError('error while determining support for operation' ' "%s" on target type: %s (got %r)' % (op_name, t.__name__, e)) if handler is not False and not callable(handler): raise TypeError('expected handler for op "%s" to be' ' callable or False, not: %r' % (op_name, handler)) type_map[t] = handler if not exact: for t in known_types: self._register_fuzzy_type(op_name, t, _type_tree=type_tree) self._op_type_map[op_name] = type_map self._op_type_tree[op_name] = type_tree self._op_auto_map[op_name] = auto_func def _register_builtin_ops(self): def _get_iterable_handler(type_obj): return iter if callable(getattr(type_obj, '__iter__', None)) else False self.register_op('iterate', _get_iterable_handler) self.register_op('get', lambda _: getattr) _DEFAULT_SCOPE = ChainMap({})
[docs]def glom(target, spec, **kwargs): """Access or construct a value from a given *target* based on the specification declared by *spec*. Accessing nested data, aka deep-get: >>> target = {'a': {'b': 'c'}} >>> glom(target, 'a.b') 'c' Here the *spec* was just a string denoting a path, ``'a.b'``. As simple as it should be. You can also use :mod:`glob`-like wildcard selectors: >>> target = {'a': [{'k': 'v1'}, {'k': 'v2'}]} >>> glom(target, 'a.*.k') ['v1', 'v2'] In addition to ``*``, you can also use ``**`` for recursive access: >>> target = {'a': [{'k': 'v3'}, {'k': 'v4'}], 'k': 'v0'} >>> glom(target, '**.k') ['v0', 'v3', 'v4'] The next example shows how to use nested data to access many fields at once, and make a new nested structure. Constructing, or restructuring more-complicated nested data: >>> target = {'a': {'b': 'c', 'd': 'e'}, 'f': 'g', 'h': [0, 1, 2]} >>> spec = {'a': 'a.b', 'd': 'a.d', 'h': ('h', [lambda x: x * 2])} >>> output = glom(target, spec) >>> pprint(output) {'a': 'c', 'd': 'e', 'h': [0, 2, 4]} ``glom`` also takes a keyword-argument, *default*. When set, if a ``glom`` operation fails with a :exc:`GlomError`, the *default* will be returned, very much like :meth:`dict.get()`: >>> glom(target, 'a.xx', default='nada') 'nada' The *skip_exc* keyword argument controls which errors should be ignored. >>> glom({}, lambda x: 100.0 / len(x), default=0.0, skip_exc=ZeroDivisionError) 0.0 Args: target (object): the object on which the glom will operate. spec (object): Specification of the output object in the form of a dict, list, tuple, string, other glom construct, or any composition of these. default (object): An optional default to return in the case an exception, specified by *skip_exc*, is raised. skip_exc (Exception): An optional exception or tuple of exceptions to ignore and return *default* (None if omitted). If *skip_exc* and *default* are both not set, glom raises errors through. scope (dict): Additional data that can be accessed via S inside the glom-spec. Read more: :ref:`scope`. It's a small API with big functionality, and glom's power is only surpassed by its intuitiveness. Give it a whirl! """ # TODO: check spec up front default = kwargs.pop('default', None if 'skip_exc' in kwargs else _MISSING) skip_exc = kwargs.pop('skip_exc', () if default is _MISSING else GlomError) glom_debug = kwargs.pop('glom_debug', GLOM_DEBUG) scope = _DEFAULT_SCOPE.new_child({ Path: kwargs.pop('path', []), Inspect: kwargs.pop('inspector', None), MODE: AUTO, MIN_MODE: None, CHILD_ERRORS: [], 'globals': ScopeVars({}, {}), }) scope[UP] = scope scope[ROOT] = scope scope[T] = target scope.update(kwargs.pop('scope', {})) err = None if kwargs: raise TypeError('unexpected keyword args: %r' % sorted(kwargs.keys())) try: try: ret = _glom(target, spec, scope) except skip_exc: if default is _MISSING: raise ret = default # should this also be arg_val'd? except Exception as e: if glom_debug: raise if isinstance(e, GlomError): # need to change id or else py3 seems to not let us truncate the # stack trace with the explicit "raise err" below err = copy.copy(e) err._set_wrapped(e) else: err = GlomError.wrap(e) if isinstance(err, GlomError): err._finalize(scope[LAST_CHILD_SCOPE]) else: # wrapping failed, fall back to default behavior raise if err: raise err return ret
def chain_child(scope): """ used for specs like Auto(tuple), Switch(), etc that want to chain their child scopes together returns a new scope that can be passed to the next recursive glom call, e.g. scope[glom](target, spec, chain_child(scope)) """ if LAST_CHILD_SCOPE not in scope.maps[0]: return scope # no children yet, nothing to do # NOTE: an option here is to drill down on LAST_CHILD_SCOPE; # this would have some interesting consequences for scoping # of tuples nxt_in_chain = scope[LAST_CHILD_SCOPE] nxt_in_chain.maps[0][NO_PYFRAME] = True # previous failed branches are forgiven as the # scope is re-wired into a new stack del nxt_in_chain.maps[0][CHILD_ERRORS][:] return nxt_in_chain unbound_methods = set([type(str.__len__)]) #, type(Ref.glomit)]) def _has_callable_glomit(obj): glomit = getattr(obj, 'glomit', None) return callable(glomit) and not isinstance(obj, type) def _glom(target, spec, scope): parent = scope pmap = parent.maps[0] scope = scope.new_child({ T: target, Spec: spec, UP: parent, CHILD_ERRORS: [], MODE: pmap[MODE], MIN_MODE: pmap[MIN_MODE], }) pmap[LAST_CHILD_SCOPE] = scope try: if type(spec) is TType: # must go first, due to callability scope[MIN_MODE] = None # None is tombstone return _t_eval(target, spec, scope) elif _has_callable_glomit(spec): scope[MIN_MODE] = None return spec.glomit(target, scope) return (scope.maps[0][MIN_MODE] or scope.maps[0][MODE])(target, spec, scope) except Exception as e: scope.maps[1][CHILD_ERRORS].append(scope) scope.maps[0][CUR_ERROR] = e if NO_PYFRAME in scope.maps[1]: cur_scope = scope[UP] while NO_PYFRAME in cur_scope.maps[0]: cur_scope.maps[1][CHILD_ERRORS].append(cur_scope) cur_scope.maps[0][CUR_ERROR] = e cur_scope = cur_scope[UP] raise def AUTO(target, spec, scope): if type(spec) is str: # shortcut to make deep-get use case faster return _t_eval(target, Path.from_text(spec).path_t, scope) if isinstance(spec, dict): return _handle_dict(target, spec, scope) elif isinstance(spec, list): return _handle_list(target, spec, scope) elif isinstance(spec, tuple): return _handle_tuple(target, spec, scope) elif isinstance(spec, basestring): return Path.from_text(spec).glomit(target, scope) elif callable(spec): return spec(target) raise TypeError('expected spec to be dict, list, tuple, callable, string,' ' or other Spec-like type, not: %r' % (spec,)) _DEFAULT_SCOPE.update({ glom: _glom, TargetRegistry: TargetRegistry(register_default_types=True), })
[docs]def register(target_type, **kwargs): """Register *target_type* so :meth:`~Glommer.glom()` will know how to handle instances of that type as targets. Here's an example of adding basic iterabile support for Django's ORM: .. code-block:: python import glom import django.db.models glom.register(django.db.models.Manager, iterate=lambda m: m.all()) glom.register(django.db.models.QuerySet, iterate=lambda qs: qs.all()) Args: target_type (type): A type expected to appear in a glom() call target get (callable): A function which takes a target object and a name, acting as a default accessor. Defaults to :func:`getattr`. iterate (callable): A function which takes a target object and returns an iterator. Defaults to :func:`iter` if *target_type* appears to be iterable. exact (bool): Whether or not to match instances of subtypes of *target_type*. .. note:: The module-level :func:`register()` function affects the module-level :func:`glom()` function's behavior. If this global effect is undesirable for your application, or you're implementing a library, consider instantiating a :class:`Glommer` instance, and using the :meth:`~Glommer.register()` and :meth:`Glommer.glom()` methods instead. """ _DEFAULT_SCOPE[TargetRegistry].register(target_type, **kwargs) return
[docs]def register_op(op_name, **kwargs): """For extension authors needing to add operations beyond the builtin 'get', 'iterate', 'keys', 'assign', and 'delete' to the default scope. See TargetRegistry for more details. """ _DEFAULT_SCOPE[TargetRegistry].register_op(op_name, **kwargs) return
[docs]class Glommer(object): """The :class:`Glommer` type mostly serves to encapsulate type registration context so that advanced uses of glom don't need to worry about stepping on each other. Glommer objects are lightweight and, once instantiated, provide a :func:`glom()` method: >>> glommer = Glommer() >>> glommer.glom({}, 'a.b.c', default='d') 'd' >>> Glommer().glom({'vals': list(range(3))}, ('vals', len)) 3 Instances also provide :meth:`~Glommer.register()` method for localized control over type handling. Args: register_default_types (bool): Whether or not to enable the handling behaviors of the default :func:`glom()`. These default actions include dict access, list and iterable iteration, and generic object attribute access. Defaults to True. """ def __init__(self, **kwargs): register_default_types = kwargs.pop('register_default_types', True) scope = kwargs.pop('scope', _DEFAULT_SCOPE) # this "freezes" the scope in at the time of construction self.scope = ChainMap(dict(scope)) self.scope[TargetRegistry] = TargetRegistry(register_default_types=register_default_types) def register(self, target_type, **kwargs): """Register *target_type* so :meth:`~Glommer.glom()` will know how to handle instances of that type as targets. Args: target_type (type): A type expected to appear in a glom() call target get (callable): A function which takes a target object and a name, acting as a default accessor. Defaults to :func:`getattr`. iterate (callable): A function which takes a target object and returns an iterator. Defaults to :func:`iter` if *target_type* appears to be iterable. exact (bool): Whether or not to match instances of subtypes of *target_type*. .. note:: The module-level :func:`register()` function affects the module-level :func:`glom()` function's behavior. If this global effect is undesirable for your application, or you're implementing a library, consider instantiating a :class:`Glommer` instance, and using the :meth:`~Glommer.register()` and :meth:`Glommer.glom()` methods instead. """ exact = kwargs.pop('exact', False) self.scope[TargetRegistry].register(target_type, exact=exact, **kwargs) return def glom(self, target, spec, **kwargs): return glom(target, spec, scope=self.scope, **kwargs)
class Fill(object): """A specifier type which switches to glom into "fill-mode". For the spec contained within the Fill, glom will only interpret explicit specifier types (including T objects). Whereas the default mode has special interpretations for each of these builtins, fill-mode takes a lighter touch, making Fill great for "filling out" Python literals, like tuples, dicts, sets, and lists. >>> target = {'data': [0, 2, 4]} >>> spec = Fill((T['data'][2], T['data'][0])) >>> glom(target, spec) (4, 0) As you can see, glom's usual built-in tuple item chaining behavior has switched into a simple tuple constructor. (Sidenote for Lisp fans: Fill is like glom's quasi-quoting.) """ def __init__(self, spec=None): self.spec = spec def glomit(self, target, scope): scope[MODE] = FILL return scope[glom](target, self.spec, scope) def fill(self, target): return glom(target, self) def __repr__(self): cn = self.__class__.__name__ rpr = '' if self.spec is None else bbrepr(self.spec) return '%s(%s)' % (cn, rpr) def FILL(target, spec, scope): # TODO: register an operator or two for the following to allow # extension. This operator can probably be shared with the # upcoming traversal/remap feature. recurse = lambda val: scope[glom](target, val, scope) if type(spec) is dict: return {recurse(key): recurse(val) for key, val in spec.items()} if type(spec) in (list, tuple, set, frozenset): result = [recurse(val) for val in spec] if type(spec) is list: return result return type(spec)(result) if callable(spec): return spec(target) return spec class _ArgValuator(object): def __init__(self): self.cache = {} def mode(self, target, spec, scope): """ similar to FILL, but without function calling; useful for default, scope assignment, call/invoke, etc """ recur = lambda val: scope[glom](target, val, scope) result = spec if type(spec) in (list, dict): # can contain themselves if id(spec) in self.cache: return self.cache[id(spec)] result = self.cache[id(spec)] = type(spec)() if type(spec) is dict: result.update({recur(key): recur(val) for key, val in spec.items()}) else: result.extend([recur(val) for val in spec]) if type(spec) in (tuple, set, frozenset): # cannot contain themselves result = type(spec)([recur(val) for val in spec]) return result def arg_val(target, arg, scope): """ evaluate an argument to find its value (arg_val phonetically similar to "eval" -- evaluate as an arg) """ mode = scope[MIN_MODE] scope[MIN_MODE] = _ArgValuator().mode result = scope[glom](target, arg, scope) scope[MIN_MODE] = mode return result