"""Logging configuration.
This module largely wraps the :mod:`structlog` framework to provide flexible structured
logging for Runtime services. A chain of "processors" (callables) filters or transforms
events produced by log statements.
To the greatest extent possible, this module favors native :mod:`structlog`
functionality over integration with the standard :mod:`logging` module for `performance
reasons <https://www.structlog.org/en/stable/performance.html>`_. :mod:`structlog` also
provides an async interface that :mod:`logging` does not.
Note:
:mod:`structlog` also has a notion of *bound* and *unbound* loggers. An *unbound*
logger is a proxy that borrows its configuration from the global configuration set
by :func:`runtime.log.configure`. Once a logger is bound by calling
:meth:`structlog.BoundLogggerBase.bind`, the global configuration is copied into the
logger's local state and frozen. Further changes in the global configuration no
longer affect the bound logger.
Because unbound loggers are proxies that perform some introspection on every call,
prefer bound loggers, which are much more performant because their methods are
concrete.
"""
import asyncio
import contextlib
import functools
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Literal, MutableMapping, NoReturn, Optional, Union
import orjson as json
import structlog
import structlog.processors
from structlog.stdlib import AsyncBoundLogger as AsyncLogger
from structlog.stdlib import BoundLogger as Logger
from . import remote
from .exception import RuntimeBaseException
__all__ = [
'AsyncLogger',
'LEVELS',
'LogPublisher',
'Logger',
'configure',
'get_level_num',
'get_logger',
'get_null_logger',
]
Event = MutableMapping[str, Any]
ProcessorReturnType = Union[Event, str, bytes]
Processor = Callable[[Any, str, Event], ProcessorReturnType]
LEVELS: list[str] = ['debug', 'info', 'warn', 'error', 'critical']
"""Log severity levels, in ascending order of severity.
These levels correspond to those used by the built-in :mod:`logging` library:
============ ================================= =========================================
Level Description Example
============ ================================= =========================================
``debug`` Frequent, low-level tracing. A Smart Device message is received.
``info`` Normal operation (default level). A Smart Device connects.
``warn`` Unusual or anomalous events. Referencing a nonexistent device.
``error`` Failure mode. A subprocess returns a nonzero exit code.
``critical`` Cannot continue running. Emergency stop.
============ ================================= =========================================
"""
def drop(_logger: AsyncLogger, _method: str, _event: Event, /) -> NoReturn:
"""A simple :mod:`structlog` processor to drop all events."""
raise structlog.DropEvent
get_logger = remote.get_logger
"""Get an unbound async-compatible logger.
Parameters:
factory_args: Positional arguments passed to the logger factory.
context: Contextual variables added to every event produced by this logger.
"""
[docs]def get_null_logger() -> AsyncLogger:
"""Get an async-compatible logger that drops all events unconditionally.
Useful for objects that emit unimportant or noisy log events. :class:`LogPublisher`
also uses a null logger internally, since using a real logger would result in a
feedback loop.
"""
return get_logger(processors=[drop])
[docs]@dataclass
class LogPublisher(remote.Client):
"""A client for publishing log events over a network.
A :class:`LogPublisher` instance is a threadsafe :mod:`structlog` processor
(callable). The processor feeds incoming events into an internal queue, which a
worker task drains. For each event, the worker issues a notification call, where the
method name is the log level and the only argument is the event dictionary.
The worker and queue are not initialized until the async context is entered. The
worker is attached to the context's running loop.
Parameters:
send_queue_capacity: The maximum size of the event queue. A nonpositive capacity
indicates the queue size should be unbounded. When the queue is full, any
additional events are dropped.
"""
send_queue_capacity: int = 512
send_queue: Optional[asyncio.Queue[tuple[str, Event]]] = field(
default=None,
init=False,
repr=False,
)
loop: Optional[asyncio.AbstractEventLoop] = field(
default=None,
init=False,
repr=False,
)
def __post_init__(self, /) -> None:
super().__post_init__()
self.logger = get_null_logger()
def __call__(self, _logger: AsyncLogger, method: str, event: Event, /) -> Event:
if self.loop and self.send_queue and event.get('transmit', True):
# Since later processors can mutate the event dictionary, we feed a copy
# into the queue.
self.loop.call_soon_threadsafe(
self.send_queue.put_nowait,
(method, dict(event)),
)
return event
async def __aenter__(self, /) -> 'LogPublisher':
result = await super().__aenter__()
self.loop = asyncio.get_running_loop()
self.send_queue = asyncio.Queue(self.send_queue_capacity)
worker = asyncio.create_task(self._send_forever(), name='log-publish')
self.stack.callback(worker.cancel, 'log publisher worker cancelled')
return result
async def _send_forever(self, /) -> NoReturn:
if not self.send_queue: # pragma: no cover; always initialized by `__aenter__`
raise ValueError('queue is not initialized')
while True:
method, event = await self.send_queue.get()
with contextlib.suppress(asyncio.TimeoutError):
await self.call[method](event, notification=True)
[docs]@functools.lru_cache(maxsize=16)
def get_level_num(level_name: str, /, *, default: int = logging.DEBUG) -> int:
"""Translate a :mod:`logging` level name into its numeric value.
Parameters:
level_name: A case-insensitive name, such as ``'DEBUG'``.
default: The numeric level to return if the name is invalid.
Example:
>>> get_level_num('INFO')
20
>>> assert get_level_num('DNE') == logging.DEBUG == 10
"""
level = getattr(logging, level_name.upper(), None)
return level if isinstance(level, int) else default
def _filter_by_level(level: str, /) -> Processor:
"""Build a :mod:`structlog` processor to filter events by log level (severity)."""
min_level = get_level_num(level)
def processor(
_logger: AsyncLogger,
method: str,
event: ProcessorReturnType,
/,
) -> ProcessorReturnType:
if get_level_num(method) < min_level:
raise structlog.DropEvent
return event
return processor
def _add_exc_context(_logger: AsyncLogger, _method: str, event: Event, /) -> Event:
"""A processor to add the context of a :class:`RuntimeBaseException` to the event.
When the keys of the exception context clash with those of the event, the event's
entries take priority.
"""
exception = event.get('exc_info')
if isinstance(exception, RuntimeBaseException):
event = exception.context | event
return event