"""glom's helpers for streaming use cases.
Specifier types which yield their results incrementally so that they
can be applied to targets which are themselves streaming (e.g. chunks
of rows from a database, lines from a file) without excessive memory
usage.
glom's streaming functionality revolves around a single :class:`Iter`
Specifier type, which has methods to transform the target stream.
"""
from itertools import islice, dropwhile, takewhile, chain
from functools import partial
try:
from itertools import imap, ifilter
except ImportError:
# py3
imap = map
ifilter = filter
from boltons.iterutils import split_iter, chunked_iter, windowed_iter, unique_iter, first
from boltons.funcutils import FunctionBuilder
from .core import glom, T, STOP, SKIP, _MISSING, Path, TargetRegistry, Call, Spec, Pipe, S, bbrepr, format_invocation
from .matching import Check
[docs]class Iter:
"""``Iter()`` is glom's counterpart to Python's built-in :func:`iter()`
function. Given an iterable target, ``Iter()`` yields the result
of applying the passed spec to each element of the target, similar
to the built-in ``[]`` spec, but streaming.
The following turns a list of strings into integers using Iter(),
before deduplicating and converting it to a tuple:
>>> glom(['1', '2', '1', '3'], (Iter(int), set, tuple))
(1, 2, 3)
``Iter()`` also has many useful methods which can be chained to
compose a stream processing pipeline. The above can also be
written as:
>>> glom(['1', '2', '1', '3'], (Iter().map(int).unique(), tuple))
(1, 2, 3)
``Iter()`` also respects glom's :data:`~glom.SKIP` and
:data:`~glom.STOP` singletons for filtering and breaking
iteration.
Args:
subspec: A subspec to be applied on each element from the iterable.
sentinel: Keyword-only argument, which, when found in the
iterable stream, causes the iteration to stop. Same as with the
built-in :func:`iter`.
"""
def __init__(self, subspec=T, **kwargs):
self.subspec = subspec
self._iter_stack = kwargs.pop('_iter_stack', [])
self.sentinel = kwargs.pop('sentinel', STOP)
if kwargs:
raise TypeError('unexpected keyword arguments: %r' % sorted(kwargs))
return
def __repr__(self):
base_args = ()
if self.subspec != T:
base_args = (self.subspec,)
base = format_invocation(self.__class__.__name__, base_args, repr=bbrepr)
chunks = [base]
for fname, args, _ in reversed(self._iter_stack):
meth = getattr(self, fname)
fb = FunctionBuilder.from_func(meth)
fb.args = fb.args[1:] # drop self
arg_names = fb.get_arg_names()
# TODO: something fancier with defaults:
kwargs = []
if len(args) > 1 and arg_names:
args, kwargs = (), zip(arg_names, args)
chunks.append('.' + format_invocation(fname, args, kwargs, repr=bbrepr))
return ''.join(chunks)
def glomit(self, target, scope):
iterator = self._iterate(target, scope)
for _, _, callback in reversed(self._iter_stack):
iterator = callback(iterator, scope)
return iter(iterator)
def _iterate(self, target, scope):
iterate = scope[TargetRegistry].get_handler('iterate', target, path=scope[Path])
try:
iterator = iterate(target)
except Exception as e:
raise TypeError('failed to iterate on instance of type %r at %r (got %r)'
% (target.__class__.__name__, Path(*scope[Path]), e))
base_path = scope[Path]
for i, t in enumerate(iterator):
scope[Path] = base_path + [i]
yld = (t if self.subspec is T else scope[glom](t, self.subspec, scope))
if yld is SKIP:
continue
elif yld is self.sentinel or yld is STOP:
# NB: sentinel defaults to STOP so I was torn whether
# to also check for STOP, and landed on the side of
# never letting STOP through.
return
yield yld
return
def _add_op(self, opname, args, callback):
return type(self)(subspec=self.subspec, _iter_stack=[(opname, args, callback)] + self._iter_stack)
[docs] def map(self, subspec):
"""Return a new :class:`Iter()` spec which will apply the provided
*subspec* to each element of the iterable.
>>> glom(range(5), Iter().map(lambda x: x * 2).all())
[0, 2, 4, 6, 8]
Because a spec can be a callable, :meth:`Iter.map()` does
everything the built-in :func:`map` does, but with the full
power of glom specs.
>>> glom(['a', 'B', 'C'], Iter().map(T.islower()).all())
[True, False, False]
"""
# whatever validation you want goes here
# TODO: DRY the self._add_op with a decorator?
return self._add_op(
'map',
(subspec,),
lambda iterable, scope: imap(
lambda t: scope[glom](t, subspec, scope), iterable))
[docs] def filter(self, key=T):
"""Return a new :class:`Iter()` spec which will include only elements matching the
given *key*.
>>> glom(range(6), Iter().filter(lambda x: x % 2).all())
[1, 3, 5]
Because a spec can be a callable, :meth:`Iter.filter()` does
everything the built-in :func:`filter` does, but with the full
power of glom specs. For even more power, combine,
:meth:`Iter.filter()` with :class:`Check()`.
>>> # PROTIP: Python's ints know how many binary digits they require, using the bit_length method
>>> glom(range(9), Iter().filter(Check(T.bit_length(), one_of=(2, 4), default=SKIP)).all())
[2, 3, 8]
"""
# NB: Check's validate function defaults to bool, and
# *default* is returned on access errors as well validation
# errors, so the lambda passed to ifilter below works fine.
check_spec = key if isinstance(key, Check) else Check(key, default=SKIP)
return self._add_op(
'filter',
(key,),
lambda iterable, scope: ifilter(
lambda t: scope[glom](t, check_spec, scope) is not SKIP, iterable))
[docs] def chunked(self, size, fill=_MISSING):
"""Return a new :class:`Iter()` spec which groups elements in the iterable
into lists of length *size*.
If the optional *fill* argument is provided, iterables not
evenly divisible by *size* will be padded out by the *fill*
constant. Otherwise, the final chunk will be shorter than *size*.
>>> list(glom(range(10), Iter().chunked(3)))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> list(glom(range(10), Iter().chunked(3, fill=None)))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]]
"""
kw = {'size': size}
args = size,
if fill is not _MISSING:
kw['fill'] = fill
args += (fill,)
return self._add_op(
'chunked', args, lambda it, scope: chunked_iter(it, **kw))
def windowed(self, size):
"""Return a new :class:`Iter()` spec which will yield a sliding window of
adjacent elements in the iterable. Each tuple yielded will be
of length *size*.
Useful for getting adjacent pairs and triples.
>>> list(glom(range(4), Iter().windowed(2)))
[(0, 1), (1, 2), (2, 3)]
"""
return self._add_op(
'windowed', (size,), lambda it, scope: windowed_iter(it, size))
[docs] def split(self, sep=None, maxsplit=None):
"""Return a new :class:`Iter()` spec which will lazily split an iterable based
on a separator (or list of separators), *sep*. Like
:meth:`str.split()`, but for all iterables.
``split_iter()`` yields lists of non-separator values. A separator will
never appear in the output.
>>> target = [1, 2, None, None, 3, None, 4, None]
>>> list(glom(target, Iter().split()))
[[1, 2], [3], [4]]
Note that ``split_iter`` is based on :func:`str.split`, so if
*sep* is ``None``, ``split()`` **groups** separators. If empty lists
are desired between two contiguous ``None`` values, simply use
``sep=[None]``:
>>> list(glom(target, Iter().split(sep=[None])))
[[1, 2], [], [3], [4], []]
A max number of splits may also be set:
>>> list(glom(target, Iter().split(maxsplit=2)))
[[1, 2], [3], [4, None]]
"""
return self._add_op(
'split',
(sep, maxsplit),
lambda it, scope: split_iter(it, sep=sep, maxsplit=maxsplit))
[docs] def flatten(self):
"""Returns a new :class:`Iter()` instance which combines iterables into a
single iterable.
>>> target = [[1, 2], [3, 4], [5]]
>>> list(glom(target, Iter().flatten()))
[1, 2, 3, 4, 5]
"""
return self._add_op(
'flatten',
(),
lambda it, scope: chain.from_iterable(it))
[docs] def unique(self, key=T):
"""Return a new :class:`Iter()` spec which lazily filters out duplicate
values, i.e., only the first appearance of a value in a stream will
be yielded.
>>> target = list('gloMolIcious')
>>> out = list(glom(target, Iter().unique(T.lower())))
>>> print(''.join(out))
gloMIcus
"""
return self._add_op(
'unique',
(key,),
lambda it, scope: unique_iter(it, key=lambda t: scope[glom](t, key, scope)))
[docs] def slice(self, *args):
"""Returns a new :class:`Iter()` spec which trims iterables in the
same manner as :func:`itertools.islice`.
>>> target = [0, 1, 2, 3, 4, 5]
>>> glom(target, Iter().slice(3).all())
[0, 1, 2]
>>> glom(target, Iter().slice(2, 4).all())
[2, 3]
This method accepts only positional arguments.
"""
# TODO: make a kwarg-compatible version of this (islice takes no kwargs)
# TODO: also support slice syntax Iter()[::]
try:
islice([], *args)
except TypeError:
raise TypeError(f'invalid slice arguments: {args!r}')
return self._add_op('slice', args, lambda it, scope: islice(it, *args))
[docs] def limit(self, count):
"""A convenient alias for :meth:`~Iter.slice`, which takes a single
argument, *count*, the max number of items to yield.
"""
return self._add_op('limit', (count,), lambda it, scope: islice(it, count))
[docs] def takewhile(self, key=T):
"""Returns a new :class:`Iter()` spec which stops the stream once
*key* becomes falsy.
>>> glom([3, 2, 0, 1], Iter().takewhile().all())
[3, 2]
:func:`itertools.takewhile` for more details.
"""
return self._add_op(
'takewhile',
(key,),
lambda it, scope: takewhile(
lambda t: scope[glom](t, key, scope), it))
[docs] def dropwhile(self, key=T):
"""Returns a new :class:`Iter()` spec which drops stream items until
*key* becomes falsy.
>>> glom([0, 0, 3, 2, 0], Iter().dropwhile(lambda t: t < 1).all())
[3, 2, 0]
Note that while similar to :meth:`Iter.filter()`, the filter
only applies to the beginning of the stream. In a way,
:meth:`Iter.dropwhile` can be thought of as
:meth:`~str.lstrip()` for streams. See
:func:`itertools.dropwhile` for more details.
"""
return self._add_op(
'dropwhile',
(key,),
lambda it, scope: dropwhile(
lambda t: scope[glom](t, key, scope), it))
# Terminal methods follow
[docs] def all(self):
"""A convenience method which returns a new spec which turns an
iterable into a list.
>>> glom(range(5), Iter(lambda t: t * 2).all())
[0, 2, 4, 6, 8]
Note that this spec will always consume the whole iterable, and as
such, the spec returned is *not* an :class:`Iter()` instance.
"""
return Pipe(self, list)
[docs] def first(self, key=T, default=None):
"""A convenience method for lazily yielding a single truthy item from
an iterable.
>>> target = [False, 1, 2, 3]
>>> glom(target, Iter().first())
1
This method takes a condition, *key*, which can also be a
glomspec, as well as a *default*, in case nothing matches the
condition.
As this spec yields at most one item, and not an iterable, the
spec returned from this method is not an :class:`Iter()` instance.
"""
return (self, First(key=key, default=default))
class First:
"""Get the first element of an iterable which matches *key*, if there
is one, otherwise return *default* (``None`` if unset).
>>> is_odd = lambda x: x % 2
>>> glom([0, 1, 2, 3], First(is_odd))
1
>>> glom([0, 2, 4], First(is_odd, default=False))
False
"""
# The impl of this ain't pretty and basically just exists for a
# nicer-looking repr. (Iter(), First()) is the equivalent of doing
# (Iter().filter(spec), Call(first, args=(T,), kwargs={'default':
# default}))
__slots__ = ('_spec', '_default', '_first')
def __init__(self, key=T, default=None):
self._spec = key
self._default = default
spec_glom = Spec(Call(partial, args=(Spec(self._spec).glom,), kwargs={'scope': S}))
self._first = Call(first, args=(T,), kwargs={'default': default, 'key': spec_glom})
def glomit(self, target, scope):
return self._first.glomit(target, scope)
def __repr__(self):
cn = self.__class__.__name__
if self._default is None:
return f'{cn}({bbrepr(self._spec)})'
return f'{cn}({bbrepr(self._spec)}, default={bbrepr(self._default)})'