Source code for runtime.buffer

"""Peripheral data storage and sharing.

Buffers are the primary interprocess communication (IPC) mechanism within Runtime. Each
buffer is a C-style structure created with the :mod:`ctypes` foreign function library
and possibly backed by shared memory. Each buffer represents one Smart Device or other
peripheral and contains parameters that can be read or written to.

For extensibility reasons, this library contains no hardcoded information about specific
peripherals. Instead, this library parses each peripheral's parameter and memory layout
specifications from a config file and generates the buffer types dynamically.

Consumers should poll buffers for changes. There is no mechanism for event-based
notifications.

This library is synchronous. Since many operations acquire a mutex, buffer operations
should run in an executor, so as not to block the event loop.
"""

import collections.abc
import contextlib
import ctypes
import functools
import operator
import re
import time
import types
import typing
import warnings
from dataclasses import dataclass, field
from multiprocessing.shared_memory import SharedMemory
from pathlib import Path
from typing import (
    Any,
    Callable,
    Collection,
    ContextManager,
    Iterator,
    Mapping,
    MutableMapping,
    NamedTuple,
    Optional,
    TypeVar,
    Union,
)

from .exception import RuntimeBaseException
from .messaging import Message, MessageError, MessageType, ParameterMap
from .sync import Mutex

__all__ = [
    'Buffer',
    'BufferStore',
    'Catalog',
    'DeviceBuffer',
    'DeviceBufferError',
    'DeviceUID',
    'NullDevice',
    'Parameter',
]

# List: https://docs.python.org/3/library/ctypes.html#fundamental-data-types
_SIMPLE_TYPES: frozenset[type] = frozenset(
    {
        ctypes.c_bool,
        ctypes.c_char,
        ctypes.c_wchar,
        ctypes.c_ubyte,
        ctypes.c_byte,
        ctypes.c_short,
        ctypes.c_ushort,
        ctypes.c_int,
        ctypes.c_uint,
        ctypes.c_long,
        ctypes.c_ulong,
        ctypes.c_longlong,
        ctypes.c_ulonglong,
        ctypes.c_size_t,
        ctypes.c_ssize_t,
        ctypes.c_float,
        ctypes.c_double,
        ctypes.c_longdouble,
        ctypes.c_char_p,
        ctypes.c_wchar_p,
        ctypes.c_void_p,
    }
)


[docs]class DeviceBufferError(RuntimeBaseException): """Error when interfacing with a buffer."""
[docs]class Parameter(NamedTuple): """A parameter descriptor. Parameters: name: The parameter name. ctype: A :mod:`ctypes` type. id: The parameter ID, a nonnegative integer. lower: This parameter's minimum valid value. For numeric parameters only. upper: This parameter's maximum valid value. For numeric parameters only. readable: Whether this parameter should be readable with :meth:`Buffer.get`. writeable: Whether this parameter should be writeable with :meth:`Buffer.write`. subscribed: Whether this parameter should be subscribed to, by default. Only applicable for Smart Devices. """ name: str ctype: type id: int lower: float = float('-inf') upper: float = float('inf') readable: bool = True writeable: bool = False subscribed: bool = True @property def platform_type(self, /) -> type: """A C type that has the correct width to hold this parameter's values. The type widths of Runtime's platform and the peripheral's platform may not match. This method performs the necessary conversion to allocate the correct amount of buffer space. Returns: A type exactly as wide as the values (bytes) emitted by the peripheral. """ return ctypes.c_float if self.ctype is ctypes.c_double else self.ctype
[docs] @staticmethod def parse_ctype(type_specifier: str) -> type: """Parse a type specifier into the corresponding C type. Parameters: type_specifier: A description of the parameter's type. Can be a the name of any simple :mod:`ctypes` data type without the ``c_`` prefix. A vector type of `n` elements can also be specified by appending ``[<n>]`` to an existing type (resembling C array declaration syntax). Pointer types are not supported. Returns: A :mod:`ctypes`-compatible data type. Example: >>> assert Parameter.parse_ctype('bool') is ctypes.c_bool >>> assert Parameter.parse_ctype('int32[64]') is ctypes.c_int32 * 64 >>> assert Parameter.parse_ctype('uint64[8][8]') is ctypes.c_uint64 * 8 * 8 """ pattern, dimensions = re.compile(r'(\[(\d+)\])$'), [] while match := pattern.search(type_specifier): suffix, dimension = match.groups() type_specifier = type_specifier.removesuffix(suffix) dimensions.append(int(dimension)) ctype = getattr(ctypes, f'c_{type_specifier}') for dim in dimensions[::-1]: ctype *= dim return typing.cast(type, ctype)
[docs] def clamp(self, value: float, /) -> float: """Ensure a real value is between this parameter's lower and upper limits. If the value is not within the bounds, this method will emit a warning. Parameters: value: Candidate value. Returns: A value between :attr:`Parameter.lower` and :attr:`Parameter.upper`. A value that exceeds the minimum or maximum will be clipped to that bound. Example: >>> param = Parameter('param', ctypes.c_double, 0, lower=-1, upper=1) >>> param.clamp(-1.1) -1 >>> param.clamp(1.1) 1 >>> param.clamp(0) 0 """ if value < self.lower: warnings.warn(f'{self.name} exceeded lowerbound ({value} < {self.lower})') if value > self.upper: warnings.warn(f'{self.name} exceeded upperbound ({value} > {self.upper})') return max(self.lower, min(self.upper, value))
@property def default(self, /) -> Any: """The default value when initializing this parameter. Returns: The default value of :attr:`Parameter.platform_type`. Raises: DeviceBufferError: If this parameter is not a simple :mod:`ctype`. Example: >>> Parameter('param', ctypes.c_double, 0).default 0.0 >>> Parameter('param', ctypes.c_int8, 0).default 0 >>> Parameter('param', ctypes.c_bool, 0).default False >>> Parameter('param', ctypes.c_bool * 3, 0).default Traceback (most recent call last): ... runtime.buffer.DeviceBufferError: vector parameters have no default """ ctype = self.platform_type if ctype not in _SIMPLE_TYPES: raise DeviceBufferError( 'vector parameters have no default', name=self.name, index=self.id, ) return ctype().value
WriteableBuffer = TypeVar('WriteableBuffer', memoryview, bytearray) class BaseStructure(ctypes.LittleEndianStructure): """Base type for all structures. This type ensures all buffers have the same endianness. The endianness of this type should match that of the Smart Device platform. """ @classmethod def get_view(cls, /, base: WriteableBuffer, field_name: str) -> memoryview: """Get a memory view of a field from the structure's buffer. Parameters: base: The buffer backing the structure. field_name: The name of the field view to extract. """ field_desc = getattr(cls, field_name) view = memoryview(base) return view[field_desc.offset : field_desc.offset + field_desc.size] @functools.lru_cache(maxsize=64) def make_bitmask(bits: int) -> int: """Construct a bitmask with the specified bitlength. Example: >>> make_bitmask(0) Traceback (most recent call last): ... ValueError: must provide a positive number of bits >>> bin(make_bitmask(1)) '0b1' >>> bin(make_bitmask(5)) '0b11111' """ if bits <= 0: raise ValueError('must provide a positive number of bits') return (1 << bits) - 1
[docs]class DeviceUID(BaseStructure): """A unique device identifier. Attributes: device_id (int): The device's type ID. Consult the catalog for the full list. year (int): The year in which the device was created. ``0x00`` corresponds to the spring 2016 season, and increments for each season afterwards. random (int): The UID's random bits to distinguish otherwise identical devices. Examples: >>> uid = DeviceUID(0xffff, 0xee, 0xc0debeef_deadbeef) >>> assert int(uid) == 0xffff_ee_c0debeef_deadbeef >>> uid = DeviceUID.from_int(0xffff_ee_c0debeef_deadbeef) >>> hex(uid.device_id) '0xffff' >>> hex(uid.year) '0xee' >>> hex(uid.random) '0xc0debeefdeadbeef' """ _fields_ = [ ('device_id', ctypes.c_uint16), ('year', ctypes.c_uint8), ('random', ctypes.c_uint64), ] def __int__(self, /) -> int: uid = int(self.device_id) uid = (uid << 8 * type(self).year.size) | self.year uid = (uid << 8 * type(self).random.size) | self.random return uid
[docs] @classmethod def from_int(cls, /, uid: int) -> 'DeviceUID': """Parse a device UID in integer format into its constituent fields.""" rand, uid = uid & make_bitmask(8 * cls.random.size), uid >> 8 * cls.random.size year, uid = uid & make_bitmask(8 * cls.year.size), uid >> 8 * cls.year.size device_id = uid & make_bitmask(8 * cls.device_id.size) return DeviceUID(device_id, year, rand)
class DeviceMetricsBlock(BaseStructure): """A special structure for storing Smart Device statistics.""" _counter_type = ctypes.c_uint64 _fields_ = [ ('send', _counter_type * Message.MAX_PARAMS), ('recv', _counter_type * Message.MAX_PARAMS), ] class DeviceControlBlock(BaseStructure): """A special structure for bookkeeping Smart Device control state.""" _param_map_type = ctypes.c_uint16 _timestamp_type = ctypes.c_double _fields_ = [ ('uid', DeviceUID), ('subscription', _param_map_type), ('interval', ctypes.c_uint16), ('read', _param_map_type), ('write', _param_map_type), ('update', _param_map_type), ('last_write', _timestamp_type), ('last_update', _timestamp_type), ] interval_spec: Parameter = Parameter('interval', ctypes.c_uint16, -1, 40, 250) class ParameterBlock(BaseStructure): """A structure for holding parameters.""" @classmethod def make_type(cls, /, name: str, params: list[Parameter]) -> type['ParameterBlock']: """Make a structure with the given parameters as fields.""" fields = [(param.name, param.platform_type) for param in params] return type(name, (cls,), {'_fields_': fields, 'params': params}) RT = TypeVar('RT') def with_transaction(wrapped: Callable[..., RT]) -> Callable[..., RT]: """Decorator applied to :class:`Buffer` methods to begin a transaction.""" @functools.wraps(wrapped) def wrapper(self: 'Buffer', /, *args: Any, **kwargs: Any) -> RT: with self.transaction(): return wrapped(self, *args, **kwargs) return wrapper BufferType = TypeVar('BufferType', bound='Buffer') DeviceBufferType = TypeVar('DeviceBufferType', bound='DeviceBuffer')
[docs]class Buffer(BaseStructure): """A structure for storing peripheral data. A buffer consists of two substructures: an *update block* for storing current parameter values (as read from the peripheral), and a *write block* for parameter values to be written to the peripheral. Attributes: params: Maps param names to their descriptors. """ params: Mapping[str, Parameter] mutex: ContextManager[None] = contextlib.nullcontext()
[docs] @classmethod def attach( cls: type[BufferType], buf: Optional[WriteableBuffer] = None, /, ) -> BufferType: """Create a new buffer. Parameters: buf: A writeable Python buffer (not to be confused with :class:`Buffer`). If not provided, a :class:`bytearray` with the correct size is allocated. """ if buf is None: return cls.attach(bytearray(ctypes.sizeof(cls))) structure = cls.from_buffer(buf) structure.buf = buf return structure
[docs] @classmethod def make_type( cls: type[BufferType], /, name: str, params: list[Parameter], *extra_fields: tuple[str, type], **attrs: Any, ) -> type[BufferType]: """Create a new buffer type (subclass). Parameters: name: Type name prefix (preferably kebab case). params: Parameter list. extra_fields: Extra :mod:`ctypes` fields (name and types). attrs: Additional class attributes. """ normalized_name = name.title().replace('-', '') update_block_type = ParameterBlock.make_type( f'{normalized_name}UpdateBlock', [param for param in params if param.readable], ) write_block_type = ParameterBlock.make_type( f'{normalized_name}WriteBlock', [param for param in params if param.writeable], ) attrs |= { '_fields_': [ ('valid_flag', ctypes.c_bool), ('update_block', update_block_type), ('write_block', write_block_type), *extra_fields, ], 'params': {param.name: param for param in params}, } return type(normalized_name, (cls,), attrs)
[docs] @classmethod @contextlib.contextmanager def open(cls, name: str, /, *, create: bool = True) -> Iterator['Buffer']: """Open a new buffer backed by shared memory. Parameters: name: The shared memory object's name. create: Whether to attempt to create a new shared memory object. If ``True`` but the object already exists, :meth:`open` silently attaches to the existing object. Returns: A context manager that automatically closes the shared memory object when the exit handler runs. The object is *not* unlinked, meaning other processes may still access the object. To finally destroy the object, you must call :meth:`Buffer.unlink` on this object's name. Raises: DeviceBufferError: If ``create=False``, but the object does not exist. Note: When two processes attempt to create this buffer simultaneously, there is a small chance that the buffer that loses out yields its view before the other process initializes the mutex. This behavior is OK, since attempting to acquire an uninitialized mutex should raise a ``SyncError`` with EINVAL. """ size = Mutex.SIZE + ctypes.sizeof(cls) try: shm, create_success = SharedMemory(name, create=create, size=size), True except FileNotFoundError as exc: raise DeviceBufferError( 'cannot attach to nonexistent shared memory', name=name, create=create, type=cls.__name__, ) from exc except FileExistsError: shm, create_success = SharedMemory(name), False buffer = cls.attach(shm.buf[Mutex.SIZE :]) mutex = Mutex(shm.buf[: Mutex.SIZE], shared=True, recursive=True) if create_success: mutex.initialize() buffer.mutex = mutex if create: buffer.valid = True try: yield buffer finally: if create: buffer.valid = False buffer.buf.release() # pylint: disable=protected-access; no good alternative solution if isinstance(buffer._objects, dict): # pragma: no cover buffer._objects.clear() shm.close()
[docs] @contextlib.contextmanager def transaction(self, /) -> Iterator[None]: """Acquire the buffer's mutex and check its valid bit. All methods already use this reentrant context manager to guarantee consistency, but this context manager may also be used to group transactions into a larger atomic transaction. This avoids acquiring and releasing the mutex repeatedly. """ with self.mutex: if not self.valid_flag: raise DeviceBufferError('device does not exist (marked as invalid)') yield
[docs] @with_transaction def get(self, param: str, /) -> Any: """Read a parameter from the update block. Parameters: param: The parameter name. Returns: The parameter value. Consult the catalog for each device's parameters and their types. Raises: DeviceBufferError: If the parameter does not exist or is not readable. """ try: return getattr(self.update_block, param) except AttributeError as exc: raise DeviceBufferError( 'parameter does not exist or is not readable', param=param, ) from exc
[docs] @with_transaction def set(self, param: str, value: Any, /) -> None: """Set a parameter value in the update block. Parameters: param: The parameter name. value: The parameter value. Consult the catalog for each device's parameters and their types. Raises: DeviceBufferError: If the parameter does not exist or is not writeable. TypeError: If the value is not the correct type. """ self._set(self.update_block, param, value)
[docs] @with_transaction def write(self, param: str, value: Any, /) -> None: """Request writing a parameter to the device. Because the buffer is polled and the writes are batched during each cycle, it's possible to overwrite a pending parameter write. This is a feature to reduce excessive writes. Parameters: param: The parameter name. value: The parameter value. Consult the catalog for each device's parameters and their types. Raises: DeviceBufferError: If the parameter does not exist or is not writeable. TypeError: If the value is not the correct type. """ self._set(self.write_block, param, value)
def _set(self, block: ParameterBlock, param_name: str, value: Any, /) -> None: if not hasattr(block, param_name): raise DeviceBufferError( 'parameter does not exist or is not writeable', param=param_name, ) param = self.params[param_name] if param.platform_type in (ctypes.c_float, ctypes.c_double): value = param.clamp(value) setattr(block, param_name, value) @property def valid(self, /) -> bool: """Whether this buffer represents an active device.""" with self.mutex: return self.valid_flag @valid.setter def valid(self, flag: bool, /) -> None: with self.mutex: self.valid_flag = flag
[docs]class DeviceBuffer(Buffer): """A special buffer representing a Smart Device. Attributes: sub_interval: The default subscription interval in seconds. write_interval: The duration in seconds between device writes. heartbeat_interval: The duration in seconds between heartbeat requests. """ sub_interval: float = 0.04 write_interval: float = 0.04 heartbeat_interval: float = 1 @staticmethod def _make_param_map(block: ParameterBlock, /) -> ParameterMap: param_map = ParameterMap() base, block_type = ctypes.addressof(block), type(block) for param in block.params: struct_field = getattr(block_type, param.name, None) if struct_field is not None: # pragma: no cover; should always exist offset = base + struct_field.offset param_map.set_param(param.id, offset, struct_field.size) return param_map @functools.cached_property def _update_param_map(self, /) -> ParameterMap: return self._make_param_map(self.update_block) @functools.cached_property def _write_param_map(self, /) -> ParameterMap: return self._make_param_map(self.write_block) @classmethod def make_type( cls: type[DeviceBufferType], name: str, params: list[Parameter], *extra_fields: tuple[str, type], **attrs: Any, ) -> type[DeviceBufferType]: if len(params) > Message.MAX_PARAMS: raise ValueError( f'Smart Devices may only have up to {Message.MAX_PARAMS} params' ) return super().make_type( name, params, ('control', DeviceControlBlock), ('metrics', DeviceMetricsBlock), *extra_fields, **attrs, ) @classmethod def _from_bitmap(cls, bitmap: int, /) -> frozenset[Parameter]: params = cls.params.values() return frozenset(param for param in params if (bitmap >> param.id) & 0b1) @classmethod def _to_bitmap(cls, params: Collection[str], /) -> int: masks = (1 << cls.params[param].id for param in params) return functools.reduce(operator.or_, masks, Message.NO_PARAMS) @functools.cached_property def _read_mask(self, /) -> int: params = self.params.values() return self._to_bitmap([param.name for param in params if param.readable]) @with_transaction def set(self, param: str, value: Any, /) -> None: super().set(param, value) self.control.update |= 1 << self.params[param].id self.control.last_update = time.time() @with_transaction def write(self, param: str, value: Any, /) -> None: super().write(param, value) self.control.write |= 1 << self.params[param].id self.control.last_write = time.time()
[docs] @with_transaction def read(self, /, params: Optional[Collection[str]] = None) -> None: """Request the device to return values for some parameters. Parameters: params: A list of parameter names to return values for. Passing ``None`` specifies all parameters of this device type. Raises: KeyError: If an invalid parameter name is provided. """ if params is None: params = {param.name for param in self.params.values() if param.readable} self.control.read |= self._to_bitmap(params) & self._read_mask
def _emit( self, make_msg: Callable[[int, ParameterMap], Message], bitmap: int, param_map: ParameterMap, ) -> Iterator[Message]: try: yield make_msg(bitmap, param_map) except MessageError: for param in self._from_bitmap(bitmap): yield make_msg(1 << param.id, param_map)
[docs] @with_transaction def emit_dev_rw(self, /) -> Iterator[Message]: """Make messages for reading and writing parameters. Calling this method will clear the pending parameter maps. Yields: runtime.messaging.Message: Zero or more messages, depending on how many parameters the consumer requested with :meth:`read` or :meth:`Buffer.write`. If no parameters have been requested, since the last read, this method may yield no messages. """ if self.control.write: yield from self._emit( Message.make_dev_write, self.control.write, self._write_param_map, ) self.control.read &= Message.ALL_PARAMS ^ self.control.write self.control.write = Message.NO_PARAMS if self.control.read: yield Message.make_dev_read(self.control.read) self.control.read = Message.NO_PARAMS
[docs] @with_transaction def emit_dev_data(self, /) -> Iterator[Message]: """Make messages for containing device data for recently updated parameters. Calling this method will mark all parameters are not recently updated. Yields: runtime.messaging.Message: Zero or more messages containing device data. """ # Subscribed parameters will be read soon anyway. # This optimization deduplicates updates. self.control.update &= Message.ALL_PARAMS ^ self.control.subscription if self.control.update: yield from self._emit( Message.make_dev_data, self.control.update, self._update_param_map, ) self.control.update = Message.NO_PARAMS
[docs] @with_transaction def emit_subscription(self, /) -> Iterator[Message]: """Make messages containing device data for subscribed parameters. Yields: runtime.messaging.Message: Zero or more messages containing device data. """ yield from self._emit( Message.make_dev_data, self.control.subscription, self._update_param_map, )
[docs] @with_transaction def make_sub_req( self, /, params: Optional[Collection[str]] = None, interval: Optional[float] = None, ) -> Message: """Make a subscription request for some parameters. Parameters: params: A list of parameter names to subscribe to. Passing ``None`` specifies all parameters of this device type. interval: The duration between updates in seconds. Returns: The subscription request message. """ if params is None: params = {param.name for param in self.params.values() if param.subscribed} if interval is None: interval = self.sub_interval return Message.make_sub_req(self._to_bitmap(params), int(1000 * interval))
[docs] @with_transaction def make_sub_res(self, /) -> Message: """Make a subscription response for the parameters subscribed to. Returns: The subscription response message. """ return Message.make_sub_res( self.control.subscription, self.control.interval, self.control.uid.device_id, self.control.uid.year, self.control.uid.random, )
[docs] @with_transaction def get_read(self, /) -> frozenset[str]: """Get the parameters that are to be read. Calling this method will clear the pending read map. Returns: A collection of parameter names. """ params = frozenset(param.name for param in self._from_bitmap(self.control.read)) self.control.read = Message.NO_PARAMS return params
[docs] @with_transaction def get_write(self, /) -> dict[str, Any]: """Get the parameters that were written. Calling this method will clear the pending write map. Returns: A map of parameter names to their corresponding values. """ params = self._from_bitmap(self.control.write) params = frozenset(param for param in params if param.writeable) self.control.write = Message.NO_PARAMS return {param.name: getattr(self.write_block, param.name) for param in params}
[docs] @with_transaction def get_update(self, /) -> dict[str, Any]: """Get the parameters that were updated. Calling this method will clear the pending update map. Returns: A map of parameter names to their corresponding values. """ params = self._from_bitmap(self.control.update) self.control.update = Message.NO_PARAMS return {param.name: self.get(param.name) for param in params}
[docs] @with_transaction def update(self, message: Message, /) -> None: """Digest a Smart Device message and update this buffer's state accordingly. The message's bitmaps and parameter values are copied into this buffer. """ if message.type is MessageType.DEV_DATA: self.control.update |= message.read_dev_data(self._update_param_map) self.control.last_update = time.time() elif message.type is MessageType.DEV_READ: self.control.read |= message.read_dev_read() & self._read_mask elif message.type is MessageType.DEV_WRITE: self.control.write |= message.read_dev_write(self._write_param_map) self.control.last_write = time.time() elif message.type is MessageType.SUB_REQ: subscription, self.control.interval = message.read_sub_req() self.control.subscription = subscription & self._read_mask if self.control.interval > 0: interval = self.control.interval_spec.clamp(self.control.interval) self.control.interval = interval elif message.type is MessageType.SUB_RES: ( self.control.subscription, self.control.interval, uid, ) = message.read_sub_res() self.control.uid = DeviceUID(*uid)
@property def uid(self) -> int: """Smart Device unique identifier.""" with self.transaction(): return int(self.control.uid) @uid.setter def uid(self, uid: int, /) -> None: with self.transaction(): uid_parts = DeviceUID.from_int(uid) self.control.uid.device_id = uid_parts.device_id self.control.uid.year = uid_parts.year self.control.uid.random = uid_parts.random @property def subscription(self) -> frozenset[str]: """The names of parameters subscribed to.""" with self.transaction(): params = self._from_bitmap(self.control.subscription) return frozenset(param.name for param in params) @property def last_write(self) -> float: """The timestamp given by :func:`time.time` of the last write.""" with self.transaction(): return float(self.control.last_write) @property def last_update(self) -> float: """The timestamp given by :func:`time.time` of the last update.""" with self.transaction(): return float(self.control.last_update) @property def interval(self) -> float: """The subscription interval in seconds.""" with self.transaction(): return float(self.control.interval) / 1000
NullDevice = DeviceBuffer.make_type('null-device', []) """A buffer type with no parameters. Useful for creating a non-null placeholder where a buffer is required. """ BufferKey = Union[int, tuple[str, int]] Catalog = Mapping[str, type[Buffer]]
[docs]@dataclass class BufferStore(collections.abc.Mapping[BufferKey, Buffer]): """Manage the lifecycle of a collection of buffers. Every device type has a unique name and every device has an integer identifier (UID) unique among its type. A type name and UID pair uniquely identifies a device globally. For Smart Devices, the UID encodes the type, so its UID alone specifies a buffer. A :class:`BufferStore` maps type name and UID pairs to buffer instances. The store implements the context manager protocol for automatically closing all open buffers. Parameters: catalog: A device catalog mapping unique type names to buffer types. The buffer types should be generated with the :meth:`Buffer.make_type` factory. buffers: A mapping from type name and UID pairs to buffer instances. stack: An exit stack for closing buffers. shared: Whether buffers should be created shared memory. """ catalog: Catalog buffers: MutableMapping[tuple[str, int], Buffer] = field(default_factory=dict) stack: contextlib.ExitStack = field(default_factory=contextlib.ExitStack) shared: bool = True device_ids: Mapping[int, str] = field(init=False, repr=False) def __post_init__(self, /) -> None: self.device_ids = self._make_device_ids() def _make_device_ids(self, /) -> Mapping[int, str]: device_ids = {} for type_name, buf_type in self.catalog.items(): device_id = getattr(buf_type, 'device_id', None) if device_id is not None: if device_id in device_ids: raise DeviceBufferError( 'duplicate Smart Device ID', device_id=device_id, ) device_ids[device_id] = type_name return device_ids def __enter__(self, /) -> 'BufferStore': self.stack.__enter__() return self def __exit__( self, exc_type: Optional[type[BaseException]], exc: Optional[BaseException], traceback: Optional[types.TracebackType], /, ) -> Optional[bool]: return self.stack.__exit__(exc_type, exc, traceback) def __getitem__(self, key: BufferKey, /) -> Buffer: return self.get_or_open(key, create=False) def __iter__(self, /) -> Iterator[BufferKey]: return iter(self.buffers) def __len__(self, /) -> int: return len(self.buffers)
[docs] def normalize_key(self, key: BufferKey, /) -> tuple[str, int]: """Convert a Smart Device UID into the type name and UID format. Returns: The type name and UID pair. """ if isinstance(key, int): uid = DeviceUID.from_int(key) return self.device_ids[uid.device_id], key return key
@typing.overload def get_or_open(self, key: int, /, *, create: bool = ...) -> DeviceBuffer: ... @typing.overload def get_or_open(self, key: tuple[str, int], /, *, create: bool = ...) -> Buffer: ...
[docs] def get_or_open(self, key: BufferKey, /, *, create: bool = True) -> Buffer: """Get a buffer, opening a new one if necessary. Parameters: key: A buffer identifier. create: Whether to create a new buffer if one does not already exist. Raises: KeyError: The device ID or type name does not exist. DeviceBufferError: If ``create=False``, but the buffer does not exist. """ type_name, uid = key = self.normalize_key(key) buffer = self.buffers.get(key) if not buffer: buf_type = self.catalog[type_name] if self.shared: name = f'rt-{type_name}-{uid}' buffer = self.stack.enter_context(buf_type.open(name, create=create)) else: if not create: raise DeviceBufferError( 'local buffer not found', type=type_name, uid=str(uid), ) buffer = buf_type.attach() buffer.valid = True self.buffers[key] = buffer self.stack.callback(self.buffers.pop, key) return buffer
@staticmethod def _make_params(attrs: dict[str, Any]) -> Iterator[Parameter]: for index, param in enumerate(attrs.pop('params', [])): param.setdefault('id', index) param['ctype'] = Parameter.parse_ctype(param.pop('type')) yield Parameter(**param)
[docs] @classmethod def make_catalog(cls, catalog: dict[str, dict[str, Any]]) -> Catalog: """Generate buffer types from the raw peripheral specification. The raw peripheral specification follows this form:: { "device_id": <int>, # For Smart Devices only "sub_interval": <float>, # Optional "write_interval": <float>, # Optional "heartbeat_interval": <float>, # Optional "params": [ # Keyword arguments of ``Parameter`` {"name": <str>, "type": <str>} ] } """ catalog_types = {} for type_name, attrs in catalog.items(): params = list(cls._make_params(attrs)) base_type = DeviceBuffer if 'device_id' in attrs else Buffer catalog_types[type_name] = base_type.make_type(type_name, params, **attrs) return catalog_types