runtime.buffer

Peripheral data storage and sharing.

Buffers are the primary interprocess communication (IPC) mechanism within Runtime. Each buffer is a C-style structure created with the ctypes foreign function library and possibly backed by shared memory. Each buffer represents one Smart Device or other peripheral and contains parameters that can be read or written to.

For extensibility reasons, this library contains no hardcoded information about specific peripherals. Instead, this library parses each peripheral’s parameter and memory layout specifications from a config file and generates the buffer types dynamically.

Consumers should poll buffers for changes. There is no mechanism for event-based notifications.

This library is synchronous. Since many operations acquire a mutex, buffer operations should run in an executor, so as not to block the event loop.

exception runtime.buffer.DeviceBufferError(message, /, **context)[source]

Bases: runtime.exception.RuntimeBaseException

Error when interfacing with a buffer.

class runtime.buffer.Buffer[source]

Bases: runtime.buffer.BaseStructure

A structure for storing peripheral data.

A buffer consists of two substructures: an update block for storing current parameter values (as read from the peripheral), and a write block for parameter values to be written to the peripheral.

params

Maps param names to their descriptors.

Type

Mapping[str, runtime.buffer.Parameter]

classmethod attach(buf=None, /)[source]

Create a new buffer.

Parameters

buf (Optional[runtime.buffer.WriteableBuffer]) – A writeable Python buffer (not to be confused with Buffer). If not provided, a bytearray with the correct size is allocated.

get(param, /)[source]

Read a parameter from the update block.

Parameters

param (str) – The parameter name.

Returns

The parameter value. Consult the catalog for each device’s parameters and their types.

Raises

DeviceBufferError – If the parameter does not exist or is not readable.

classmethod make_type(name, params, *extra_fields, **attrs)[source]

Create a new buffer type (subclass).

Parameters
classmethod open(name, /, *, create=True)[source]

Open a new buffer backed by shared memory.

Parameters
  • name (str) – The shared memory object’s name.

  • create (bool) – Whether to attempt to create a new shared memory object. If True but the object already exists, open() silently attaches to the existing object.

Returns

A context manager that automatically closes the shared memory object when the exit handler runs. The object is not unlinked, meaning other processes may still access the object. To finally destroy the object, you must call Buffer.unlink() on this object’s name.

Raises

DeviceBufferError – If create=False, but the object does not exist.

Note

When two processes attempt to create this buffer simultaneously, there is a small chance that the buffer that loses out yields its view before the other process initializes the mutex. This behavior is OK, since attempting to acquire an uninitialized mutex should raise a SyncError with EINVAL.

set(param, value, /)[source]

Set a parameter value in the update block.

Parameters
  • param (str) – The parameter name.

  • value (Any) – The parameter value. Consult the catalog for each device’s parameters and their types.

Raises
transaction()[source]

Acquire the buffer’s mutex and check its valid bit.

All methods already use this reentrant context manager to guarantee consistency, but this context manager may also be used to group transactions into a larger atomic transaction. This avoids acquiring and releasing the mutex repeatedly.

Destroy a shared memory object.

The exact behavior depends on the platform. See SharedMemory.unlink().

write(param, value, /)[source]

Request writing a parameter to the device.

Because the buffer is polled and the writes are batched during each cycle, it’s possible to overwrite a pending parameter write. This is a feature to reduce excessive writes.

Parameters
  • param (str) – The parameter name.

  • value (Any) – The parameter value. Consult the catalog for each device’s parameters and their types.

Raises
property valid: bool

Whether this buffer represents an active device.

class runtime.buffer.BufferStore(catalog, buffers=<factory>, stack=<factory>, shared=True)[source]

Bases: collections.abc.Mapping[Union[int, tuple[str, int]], runtime.buffer.Buffer]

Manage the lifecycle of a collection of buffers.

Every device type has a unique name and every device has an integer identifier (UID) unique among its type. A type name and UID pair uniquely identifies a device globally. For Smart Devices, the UID encodes the type, so its UID alone specifies a buffer.

A BufferStore maps type name and UID pairs to buffer instances. The store implements the context manager protocol for automatically closing all open buffers.

Parameters
get_or_open(key: int, /, *, create: bool = True) runtime.buffer.DeviceBuffer[source]
get_or_open(key: tuple[str, int], /, *, create: bool = True) runtime.buffer.Buffer

Get a buffer, opening a new one if necessary.

Parameters
  • key – A buffer identifier.

  • create – Whether to create a new buffer if one does not already exist.

Raises
  • KeyError – The device ID or type name does not exist.

  • DeviceBufferError – If create=False, but the buffer does not exist.

classmethod make_catalog(catalog)[source]

Generate buffer types from the raw peripheral specification.

The raw peripheral specification follows this form:

{
    "device_id": <int>,             # For Smart Devices only
    "sub_interval": <float>,          # Optional
    "write_interval": <float>,        # Optional
    "heartbeat_interval": <float>,    # Optional
    "params": [
        # Keyword arguments of ``Parameter``
        {"name": <str>, "type": <str>}
    ]
}
normalize_key(key, /)[source]

Convert a Smart Device UID into the type name and UID format.

Returns

The type name and UID pair.

Unlink all shared buffers owned by Runtime.

class runtime.buffer.DeviceBuffer[source]

Bases: runtime.buffer.Buffer

A special buffer representing a Smart Device.

sub_interval

The default subscription interval in seconds.

Type

float

write_interval

The duration in seconds between device writes.

Type

float

heartbeat_interval

The duration in seconds between heartbeat requests.

Type

float

emit_dev_data()[source]

Make messages for containing device data for recently updated parameters.

Calling this method will mark all parameters are not recently updated.

Yields

runtime.messaging.Message – Zero or more messages containing device data.

emit_dev_rw()[source]

Make messages for reading and writing parameters.

Calling this method will clear the pending parameter maps.

Yields

runtime.messaging.Message – Zero or more messages, depending on how many parameters the consumer requested with read() or Buffer.write(). If no parameters have been requested, since the last read, this method may yield no messages.

emit_subscription()[source]

Make messages containing device data for subscribed parameters.

Yields

runtime.messaging.Message – Zero or more messages containing device data.

get_read()[source]

Get the parameters that are to be read.

Calling this method will clear the pending read map.

Returns

A collection of parameter names.

get_update()[source]

Get the parameters that were updated.

Calling this method will clear the pending update map.

Returns

A map of parameter names to their corresponding values.

get_write()[source]

Get the parameters that were written.

Calling this method will clear the pending write map.

Returns

A map of parameter names to their corresponding values.

make_sub_req(params=None, interval=None)[source]

Make a subscription request for some parameters.

Parameters
  • params (Optional[Collection[str]]) – A list of parameter names to subscribe to. Passing None specifies all parameters of this device type.

  • interval (Optional[float]) – The duration between updates in seconds.

Returns

The subscription request message.

make_sub_res()[source]

Make a subscription response for the parameters subscribed to.

Returns

The subscription response message.

read(params=None)[source]

Request the device to return values for some parameters.

Parameters

params (Optional[Collection[str]]) – A list of parameter names to return values for. Passing None specifies all parameters of this device type.

Raises

KeyError – If an invalid parameter name is provided.

update(message, /)[source]

Digest a Smart Device message and update this buffer’s state accordingly.

The message’s bitmaps and parameter values are copied into this buffer.

property interval: float

The subscription interval in seconds.

property last_update: float

The timestamp given by time.time() of the last update.

property last_write: float

The timestamp given by time.time() of the last write.

property subscription: frozenset[str]

The names of parameters subscribed to.

property uid: int

Smart Device unique identifier.

class runtime.buffer.DeviceUID[source]

Bases: runtime.buffer.BaseStructure

A unique device identifier.

device_id

The device’s type ID. Consult the catalog for the full list.

Type

int

year

The year in which the device was created. 0x00 corresponds to the spring 2016 season, and increments for each season afterwards.

Type

int

random

The UID’s random bits to distinguish otherwise identical devices.

Type

int

Examples

>>> uid = DeviceUID(0xffff, 0xee, 0xc0debeef_deadbeef)
>>> assert int(uid) == 0xffff_ee_c0debeef_deadbeef
>>> uid = DeviceUID.from_int(0xffff_ee_c0debeef_deadbeef)
>>> hex(uid.device_id)
'0xffff'
>>> hex(uid.year)
'0xee'
>>> hex(uid.random)
'0xc0debeefdeadbeef'
classmethod from_int(uid)[source]

Parse a device UID in integer format into its constituent fields.

class runtime.buffer.NullDevice

Bases: runtime.buffer.DeviceBuffer

A buffer type with no parameters.

Useful for creating a non-null placeholder where a buffer is required.

class runtime.buffer.Parameter(name, ctype, id, lower=- inf, upper=inf, readable=True, writeable=False, subscribed=True)[source]

Bases: NamedTuple

A parameter descriptor.

Parameters
  • name (str) – The parameter name.

  • ctype (type) – A ctypes type.

  • id (int) – The parameter ID, a nonnegative integer.

  • lower (float) – This parameter’s minimum valid value. For numeric parameters only.

  • upper (float) – This parameter’s maximum valid value. For numeric parameters only.

  • readable (bool) – Whether this parameter should be readable with Buffer.get().

  • writeable (bool) – Whether this parameter should be writeable with Buffer.write().

  • subscribed (bool) – Whether this parameter should be subscribed to, by default. Only applicable for Smart Devices.

clamp(value, /)[source]

Ensure a real value is between this parameter’s lower and upper limits.

If the value is not within the bounds, this method will emit a warning.

Parameters

value (float) – Candidate value.

Returns

A value between Parameter.lower and Parameter.upper. A value that exceeds the minimum or maximum will be clipped to that bound.

Example

>>> param = Parameter('param', ctypes.c_double, 0, lower=-1, upper=1)
>>> param.clamp(-1.1)
-1
>>> param.clamp(1.1)
1
>>> param.clamp(0)
0
static parse_ctype(type_specifier)[source]

Parse a type specifier into the corresponding C type.

Parameters

type_specifier (str) – A description of the parameter’s type. Can be a the name of any simple ctypes data type without the c_ prefix. A vector type of n elements can also be specified by appending [<n>] to an existing type (resembling C array declaration syntax). Pointer types are not supported.

Returns

A ctypes-compatible data type.

Example

>>> assert Parameter.parse_ctype('bool') is ctypes.c_bool
>>> assert Parameter.parse_ctype('int32[64]') is ctypes.c_int32 * 64
>>> assert Parameter.parse_ctype('uint64[8][8]') is ctypes.c_uint64 * 8 * 8
property default: Any

The default value when initializing this parameter.

Returns

The default value of Parameter.platform_type.

Raises

DeviceBufferError – If this parameter is not a simple ctype.

Example

>>> Parameter('param', ctypes.c_double, 0).default
0.0
>>> Parameter('param', ctypes.c_int8, 0).default
0
>>> Parameter('param', ctypes.c_bool, 0).default
False
>>> Parameter('param', ctypes.c_bool * 3, 0).default
Traceback (most recent call last):
  ...
runtime.buffer.DeviceBufferError: vector parameters have no default
property platform_type: type

A C type that has the correct width to hold this parameter’s values.

The type widths of Runtime’s platform and the peripheral’s platform may not match. This method performs the necessary conversion to allocate the correct amount of buffer space.

Returns

A type exactly as wide as the values (bytes) emitted by the peripheral.