This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for celery.loaders.base

"""Loader base class."""
import importlib
import os
import re
import sys
from datetime import datetime, timezone

from kombu.utils import json
from kombu.utils.objects import cached_property

from celery import signals
from celery.exceptions import reraise
from celery.utils.collections import DictAttribute, force_mapping
from celery.utils.functional import maybe_list
from celery.utils.imports import NotAPackage, find_module, import_from_cwd, symbol_by_name

__all__ = ('BaseLoader',)

_RACE_PROTECTION = False

CONFIG_INVALID_NAME = """\
Error: Module '{module}' doesn't exist, or it's not a valid \
Python module name.
"""

CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
Did you mean '{suggest}'?
"""

unconfigured = object()


[docs] class BaseLoader: """Base class for loaders. Loaders handles, * Reading celery client/worker configurations. * What happens when a task starts? See :meth:`on_task_init`. * What happens when the worker starts? See :meth:`on_worker_init`. * What happens when the worker shuts down? See :meth:`on_worker_shutdown`. * What modules are imported to find tasks? """ builtin_modules = frozenset() configured = False override_backends = {} worker_initialized = False _conf = unconfigured def __init__(self, app, **kwargs): self.app = app self.task_modules = set()
[docs] def now(self, utc=True): if utc: return datetime.now(timezone.utc) return datetime.now()
[docs] def on_task_init(self, task_id, task): """Called before a task is executed."""
[docs] def on_process_cleanup(self): """Called after a task is executed."""
[docs] def on_worker_init(self): """Called when the worker (:program:`celery worker`) starts."""
[docs] def on_worker_shutdown(self): """Called when the worker (:program:`celery worker`) shuts down."""
[docs] def on_worker_process_init(self): """Called when a child process starts."""
[docs] def import_task_module(self, module): self.task_modules.add(module) return self.import_from_cwd(module)
[docs] def import_module(self, module, package=None): return importlib.import_module(module, package=package)
[docs] def import_from_cwd(self, module, imp=None, package=None): return import_from_cwd( module, self.import_module if imp is None else imp, package=package, )
[docs] def import_default_modules(self): responses = signals.import_modules.send(sender=self.app) # Prior to this point loggers are not yet set up properly, need to # check responses manually and reraised exceptions if any, otherwise # they'll be silenced, making it incredibly difficult to debug. for _, response in responses: if isinstance(response, Exception): raise response return [self.import_task_module(m) for m in self.default_modules]
[docs] def init_worker(self): if not self.worker_initialized: self.worker_initialized = True self.import_default_modules() self.on_worker_init()
[docs] def shutdown_worker(self): self.on_worker_shutdown()
[docs] def init_worker_process(self): self.on_worker_process_init()
[docs] def config_from_object(self, obj, silent=False): if isinstance(obj, str): try: obj = self._smart_import(obj, imp=self.import_from_cwd) except (ImportError, AttributeError): if silent: return False raise self._conf = force_mapping(obj) if self._conf.get('override_backends') is not None: self.override_backends = self._conf['override_backends'] return True
def _smart_import(self, path, imp=None): imp = self.import_module if imp is None else imp if ':' in path: # Path includes attribute so can just jump # here (e.g., ``os.path:abspath``). return symbol_by_name(path, imp=imp) # Not sure if path is just a module name or if it includes an # attribute name (e.g., ``os.path``, vs, ``os.path.abspath``). try: return imp(path) except ImportError: # Not a module name, so try module + attribute. return symbol_by_name(path, imp=imp) def _import_config_module(self, name): try: self.find_module(name) except NotAPackage as exc: if name.endswith('.py'): reraise(NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX.format( module=name, suggest=name[:-3])), sys.exc_info()[2]) raise NotAPackage(CONFIG_INVALID_NAME.format(module=name)) from exc else: return self.import_from_cwd(name)
[docs] def find_module(self, module): return find_module(module)
[docs] def cmdline_config_parser(self, args, namespace='celery', re_type=re.compile(r'\((\w+)\)'), extra_types=None, override_types=None): extra_types = extra_types if extra_types else {'json': json.loads} override_types = override_types if override_types else { 'tuple': 'json', 'list': 'json', 'dict': 'json' } from celery.app.defaults import NAMESPACES, Option namespace = namespace and namespace.lower() typemap = dict(Option.typemap, **extra_types) def getarg(arg): """Parse single configuration from command-line.""" # ## find key/value # ns.key=value|ns_key=value (case insensitive) key, value = arg.split('=', 1) key = key.lower().replace('.', '_') # ## find name-space. # .key=value|_key=value expands to default name-space. if key[0] == '_': ns, key = namespace, key[1:] else: # find name-space part of key ns, key = key.split('_', 1) ns_key = (ns and ns + '_' or '') + key # (type)value makes cast to custom type. cast = re_type.match(value) if cast: type_ = cast.groups()[0] type_ = override_types.get(type_, type_) value = value[len(cast.group()):] value = typemap[type_](value) else: try: value = NAMESPACES[ns.lower()][key].to_python(value) except ValueError as exc: # display key name in error message. raise ValueError(f'{ns_key!r}: {exc}') return ns_key, value return dict(getarg(arg) for arg in args)
[docs] def read_configuration(self, env='CELERY_CONFIG_MODULE'): try: custom_config = os.environ[env] except KeyError: pass else: if custom_config: usercfg = self._import_config_module(custom_config) return DictAttribute(usercfg)
[docs] def autodiscover_tasks(self, packages, related_name='tasks'): self.task_modules.update( mod.__name__ for mod in autodiscover_tasks(packages or (), related_name) if mod)
@cached_property def default_modules(self): return ( tuple(self.builtin_modules) + tuple(maybe_list(self.app.conf.imports)) + tuple(maybe_list(self.app.conf.include)) ) @property def conf(self): """Loader configuration.""" if self._conf is unconfigured: self._conf = self.read_configuration() return self._conf
def autodiscover_tasks(packages, related_name='tasks'): global _RACE_PROTECTION if _RACE_PROTECTION: return () _RACE_PROTECTION = True try: return [find_related_module(pkg, related_name) for pkg in packages] finally: _RACE_PROTECTION = False def find_related_module(package, related_name): """Find module in package.""" # Django 1.7 allows for specifying a class name in INSTALLED_APPS. # (Issue #2248). try: # Return package itself when no related_name. module = importlib.import_module(package) if not related_name and module: return module except ModuleNotFoundError: # On import error, try to walk package up one level. package, _, _ = package.rpartition('.') if not package: raise module_name = f'{package}.{related_name}' try: # Try to find related_name under package. return importlib.import_module(module_name) except ModuleNotFoundError as e: import_exc_name = getattr(e, 'name', None) # If candidate does not exist, then return None. if import_exc_name and module_name == import_exc_name: return # Otherwise, raise because error probably originated from a nested import. raise e