This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for celery.utils.nodenames

"""Worker name utilities."""
from __future__ import annotations

import os
import socket
from functools import partial

from kombu.entity import Exchange, Queue

from .functional import memoize
from .text import simple_format

#: Exchange for worker direct queues.
WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')

#: Format for worker direct queue names.
WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'

#: Separator for worker node name and hostname.
NODENAME_SEP = '@'

NODENAME_DEFAULT = 'celery'

gethostname = memoize(1, Cache=dict)(socket.gethostname)

__all__ = (
    'worker_direct',
    'gethostname',
    'nodename',
    'anon_nodename',
    'nodesplit',
    'default_nodename',
    'node_format',
    'host_format',
)


[docs] def worker_direct(hostname: str | Queue) -> Queue: """Return the :class:`kombu.Queue` being a direct route to a worker. Arguments: hostname (str, ~kombu.Queue): The fully qualified node name of a worker (e.g., ``w1@example.com``). If passed a :class:`kombu.Queue` instance it will simply return that instead. """ if isinstance(hostname, Queue): return hostname return Queue( WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname), WORKER_DIRECT_EXCHANGE, hostname, )
[docs] def nodename(name: str, hostname: str) -> str: """Create node name from name/hostname pair.""" return NODENAME_SEP.join((name, hostname))
[docs] def anon_nodename(hostname: str | None = None, prefix: str = 'gen') -> str: """Return the nodename for this process (not a worker). This is used for e.g. the origin task message field. """ return nodename(''.join([prefix, str(os.getpid())]), hostname or gethostname())
[docs] def nodesplit(name: str) -> tuple[None, str] | list[str]: """Split node name into tuple of name/hostname.""" parts = name.split(NODENAME_SEP, 1) if len(parts) == 1: return None, parts[0] return parts
[docs] def default_nodename(hostname: str) -> str: """Return the default nodename for this process.""" name, host = nodesplit(hostname or '') return nodename(name or NODENAME_DEFAULT, host or gethostname())
[docs] def node_format(s: str, name: str, **extra: dict) -> str: """Format worker node name (name@host.com).""" shortname, host = nodesplit(name) return host_format(s, host, shortname or NODENAME_DEFAULT, p=name, **extra)
def _fmt_process_index(prefix: str = '', default: str = '0') -> str: from .log import current_process_index index = current_process_index() return f'{prefix}{index}' if index else default _fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
[docs] def host_format(s: str, host: str | None = None, name: str | None = None, **extra: dict) -> str: """Format host %x abbreviations.""" host = host or gethostname() hname, _, domain = host.partition('.') name = name or hname keys = dict( { 'h': host, 'n': name, 'd': domain, 'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix, }, **extra, ) return simple_format(s, keys)