This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for celery.utils.functional

"""Functional-style utilities."""
import inspect
from collections import UserList
from functools import partial
from itertools import islice, tee, zip_longest
from typing import Any, Callable

from kombu.utils.functional import LRUCache, dictfilter, is_list, lazy, maybe_evaluate, maybe_list, memoize
from vine import promise

from celery.utils.log import get_logger

logger = get_logger(__name__)

__all__ = (
    'LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
    'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
    'regen', 'dictfilter', 'lazy', 'maybe_evaluate', 'head_from_fun',
    'maybe', 'fun_accepts_kwargs',
)

FUNHEAD_TEMPLATE = """
def {fun_name}({fun_args}):
    return {fun_value}
"""


class DummyContext:

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        pass


[docs] class mlazy(lazy): """Memoized lazy evaluation. The function is only evaluated once, every subsequent access will return the same value. """ #: Set to :const:`True` after the object has been evaluated. evaluated = False _value = None
[docs] def evaluate(self): if not self.evaluated: self._value = super().evaluate() self.evaluated = True return self._value
[docs] def noop(*args, **kwargs): """No operation. Takes any arguments/keyword arguments and does nothing. """
def pass1(arg, *args, **kwargs): """Return the first positional argument.""" return arg def evaluate_promises(it): for value in it: if isinstance(value, promise): value = value() yield value
[docs] def first(predicate, it): """Return the first element in ``it`` that ``predicate`` accepts. If ``predicate`` is None it will return the first item that's not :const:`None`. """ return next( (v for v in evaluate_promises(it) if ( predicate(v) if predicate is not None else v is not None)), None, )
[docs] def firstmethod(method, on_call=None): """Multiple dispatch. Return a function that with a list of instances, finds the first instance that gives a value for the given method. The list can also contain lazy instances (:class:`~kombu.utils.functional.lazy`.) """ def _matcher(it, *args, **kwargs): for obj in it: try: meth = getattr(maybe_evaluate(obj), method) reply = (on_call(meth, *args, **kwargs) if on_call else meth(*args, **kwargs)) except AttributeError: pass else: if reply is not None: return reply return _matcher
[docs] def chunks(it, n): """Split an iterator into chunks with `n` elements each. Warning: ``it`` must be an actual iterator, if you pass this a concrete sequence will get you repeating elements. So ``chunks(iter(range(1000)), 10)`` is fine, but ``chunks(range(1000), 10)`` is not. Example: # n == 2 >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2) >>> list(x) [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]] # n == 3 >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3) >>> list(x) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] """ for item in it: yield [item] + list(islice(it, n - 1))
[docs] def padlist(container, size, default=None): """Pad list with default elements. Example: >>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3) ('George', 'Costanza', 'NYC') >>> first, last, city = padlist(['George', 'Costanza'], 3) ('George', 'Costanza', None) >>> first, last, city, planet = padlist( ... ['George', 'Costanza', 'NYC'], 4, default='Earth', ... ) ('George', 'Costanza', 'NYC', 'Earth') """ return list(container)[:size] + [default] * (size - len(container))
[docs] def mattrgetter(*attrs): """Get attributes, ignoring attribute errors. Like :func:`operator.itemgetter` but return :const:`None` on missing attributes instead of raising :exc:`AttributeError`. """ return lambda obj: {attr: getattr(obj, attr, None) for attr in attrs}
[docs] def uniq(it): """Return all unique elements in ``it``, preserving order.""" seen = set() return (seen.add(obj) or obj for obj in it if obj not in seen)
def lookahead(it): """Yield pairs of (current, next) items in `it`. `next` is None if `current` is the last item. Example: >>> list(lookahead(x for x in range(6))) [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, None)] """ a, b = tee(it) next(b, None) return zip_longest(a, b)
[docs] def regen(it): """Convert iterator to an object that can be consumed multiple times. ``Regen`` takes any iterable, and if the object is an generator it will cache the evaluated list on first access, so that the generator can be "consumed" multiple times. """ if isinstance(it, (list, tuple)): return it return _regen(it)
class _regen(UserList, list): # must be subclass of list so that json can encode. def __init__(self, it): # pylint: disable=super-init-not-called # UserList creates a new list and sets .data, so we don't # want to call init here. self.__it = it self.__consumed = [] self.__done = False def __reduce__(self): return list, (self.data,) def map(self, func): self.__consumed = [func(el) for el in self.__consumed] self.__it = map(func, self.__it) def __length_hint__(self): return self.__it.__length_hint__() def __lookahead_consume(self, limit=None): if not self.__done and (limit is None or limit > 0): it = iter(self.__it) try: now = next(it) except StopIteration: return self.__consumed.append(now) # Maintain a single look-ahead to ensure we set `__done` when the # underlying iterator gets exhausted while not self.__done: try: next_ = next(it) self.__consumed.append(next_) except StopIteration: self.__done = True break finally: yield now now = next_ # We can break out when `limit` is exhausted if limit is not None: limit -= 1 if limit <= 0: break def __iter__(self): yield from self.__consumed yield from self.__lookahead_consume() def __getitem__(self, index): if index < 0: return self.data[index] # Consume elements up to the desired index prior to attempting to # access it from within `__consumed` consume_count = index - len(self.__consumed) + 1 for _ in self.__lookahead_consume(limit=consume_count): pass return self.__consumed[index] def __bool__(self): if len(self.__consumed): return True try: next(iter(self)) except StopIteration: return False else: return True @property def data(self): if not self.__done: self.__consumed.extend(self.__it) self.__done = True return self.__consumed def __repr__(self): return "<{}: [{}{}]>".format( self.__class__.__name__, ", ".join(repr(e) for e in self.__consumed), "..." if not self.__done else "", ) def _argsfromspec(spec, replace_defaults=True): if spec.defaults: split = len(spec.defaults) defaults = (list(range(len(spec.defaults))) if replace_defaults else spec.defaults) positional = spec.args[:-split] optional = list(zip(spec.args[-split:], defaults)) else: positional, optional = spec.args, [] varargs = spec.varargs varkw = spec.varkw if spec.kwonlydefaults: kwonlyargs = set(spec.kwonlyargs) - set(spec.kwonlydefaults.keys()) if replace_defaults: kwonlyargs_optional = [ (kw, i) for i, kw in enumerate(spec.kwonlydefaults.keys()) ] else: kwonlyargs_optional = list(spec.kwonlydefaults.items()) else: kwonlyargs, kwonlyargs_optional = spec.kwonlyargs, [] return ', '.join(filter(None, [ ', '.join(positional), ', '.join(f'{k}={v}' for k, v in optional), f'*{varargs}' if varargs else None, '*' if (kwonlyargs or kwonlyargs_optional) and not varargs else None, ', '.join(kwonlyargs) if kwonlyargs else None, ', '.join(f'{k}="{v}"' for k, v in kwonlyargs_optional), f'**{varkw}' if varkw else None, ]))
[docs] def head_from_fun(fun: Callable[..., Any], bound: bool = False) -> str: """Generate signature function from actual function.""" # we could use inspect.Signature here, but that implementation # is very slow since it implements the argument checking # in pure-Python. Instead we use exec to create a new function # with an empty body, meaning it has the same performance as # as just calling a function. is_function = inspect.isfunction(fun) is_callable = callable(fun) is_cython = fun.__class__.__name__ == 'cython_function_or_method' is_method = inspect.ismethod(fun) if not is_function and is_callable and not is_method and not is_cython: name, fun = fun.__class__.__name__, fun.__call__ else: name = fun.__name__ definition = FUNHEAD_TEMPLATE.format( fun_name=name, fun_args=_argsfromspec(inspect.getfullargspec(fun)), fun_value=1, ) logger.debug(definition) namespace = {'__name__': fun.__module__} # pylint: disable=exec-used # Tasks are rarely, if ever, created at runtime - exec here is fine. exec(definition, namespace) result = namespace[name] result._source = definition if bound: return partial(result, object()) return result
def arity_greater(fun, n): argspec = inspect.getfullargspec(fun) return argspec.varargs or len(argspec.args) > n def fun_takes_argument(name, fun, position=None): spec = inspect.getfullargspec(fun) return ( spec.varkw or spec.varargs or (len(spec.args) >= position if position else name in spec.args) )
[docs] def fun_accepts_kwargs(fun): """Return true if function accepts arbitrary keyword arguments.""" return any( p for p in inspect.signature(fun).parameters.values() if p.kind == p.VAR_KEYWORD )
[docs] def maybe(typ, val): """Call typ on value if val is defined.""" return typ(val) if val is not None else val
def seq_concat_item(seq, item): """Return copy of sequence seq with item added. Returns: Sequence: if seq is a tuple, the result will be a tuple, otherwise it depends on the implementation of ``__add__``. """ return seq + (item,) if isinstance(seq, tuple) else seq + [item] def seq_concat_seq(a, b): """Concatenate two sequences: ``a + b``. Returns: Sequence: The return value will depend on the largest sequence - if b is larger and is a tuple, the return value will be a tuple. - if a is larger and is a list, the return value will be a list, """ # find the type of the largest sequence prefer = type(max([a, b], key=len)) # convert the smallest list to the type of the largest sequence. if not isinstance(a, prefer): a = prefer(a) if not isinstance(b, prefer): b = prefer(b) return a + b def is_numeric_value(value): return isinstance(value, (int, float)) and not isinstance(value, bool)