runtime.buffer
¶
Peripheral data storage and sharing.
Buffers are the primary interprocess communication (IPC) mechanism within Runtime. Each
buffer is a C-style structure created with the ctypes
foreign function library
and possibly backed by shared memory. Each buffer represents one Smart Device or other
peripheral and contains parameters that can be read or written to.
For extensibility reasons, this library contains no hardcoded information about specific peripherals. Instead, this library parses each peripheral’s parameter and memory layout specifications from a config file and generates the buffer types dynamically.
Consumers should poll buffers for changes. There is no mechanism for event-based notifications.
This library is synchronous. Since many operations acquire a mutex, buffer operations should run in an executor, so as not to block the event loop.
- exception runtime.buffer.DeviceBufferError(message, /, **context)[source]¶
Bases:
runtime.exception.RuntimeBaseException
Error when interfacing with a buffer.
- class runtime.buffer.Buffer[source]¶
Bases:
runtime.buffer.BaseStructure
A structure for storing peripheral data.
A buffer consists of two substructures: an update block for storing current parameter values (as read from the peripheral), and a write block for parameter values to be written to the peripheral.
- params¶
Maps param names to their descriptors.
- Type
Mapping[str, runtime.buffer.Parameter]
- get(param, /)[source]¶
Read a parameter from the update block.
- Parameters
param (str) – The parameter name.
- Returns
The parameter value. Consult the catalog for each device’s parameters and their types.
- Raises
DeviceBufferError – If the parameter does not exist or is not readable.
- classmethod make_type(name, params, *extra_fields, **attrs)[source]¶
Create a new buffer type (subclass).
- classmethod open(name, /, *, create=True)[source]¶
Open a new buffer backed by shared memory.
- Parameters
- Returns
A context manager that automatically closes the shared memory object when the exit handler runs. The object is not unlinked, meaning other processes may still access the object. To finally destroy the object, you must call
Buffer.unlink()
on this object’s name.- Raises
DeviceBufferError – If
create=False
, but the object does not exist.
Note
When two processes attempt to create this buffer simultaneously, there is a small chance that the buffer that loses out yields its view before the other process initializes the mutex. This behavior is OK, since attempting to acquire an uninitialized mutex should raise a
SyncError
with EINVAL.
- set(param, value, /)[source]¶
Set a parameter value in the update block.
- Parameters
param (str) – The parameter name.
value (Any) – The parameter value. Consult the catalog for each device’s parameters and their types.
- Raises
DeviceBufferError – If the parameter does not exist or is not writeable.
TypeError – If the value is not the correct type.
- transaction()[source]¶
Acquire the buffer’s mutex and check its valid bit.
All methods already use this reentrant context manager to guarantee consistency, but this context manager may also be used to group transactions into a larger atomic transaction. This avoids acquiring and releasing the mutex repeatedly.
- classmethod unlink(name)[source]¶
Destroy a shared memory object.
The exact behavior depends on the platform. See
SharedMemory.unlink()
.
- write(param, value, /)[source]¶
Request writing a parameter to the device.
Because the buffer is polled and the writes are batched during each cycle, it’s possible to overwrite a pending parameter write. This is a feature to reduce excessive writes.
- Parameters
param (str) – The parameter name.
value (Any) – The parameter value. Consult the catalog for each device’s parameters and their types.
- Raises
DeviceBufferError – If the parameter does not exist or is not writeable.
TypeError – If the value is not the correct type.
- class runtime.buffer.BufferStore(catalog, buffers=<factory>, stack=<factory>, shared=True)[source]¶
Bases:
collections.abc.Mapping
[Union
[int
,tuple
[str
,int
]],runtime.buffer.Buffer
]Manage the lifecycle of a collection of buffers.
Every device type has a unique name and every device has an integer identifier (UID) unique among its type. A type name and UID pair uniquely identifies a device globally. For Smart Devices, the UID encodes the type, so its UID alone specifies a buffer.
A
BufferStore
maps type name and UID pairs to buffer instances. The store implements the context manager protocol for automatically closing all open buffers.- Parameters
catalog (Mapping[str, type[runtime.buffer.Buffer]]) – A device catalog mapping unique type names to buffer types. The buffer types should be generated with the
Buffer.make_type()
factory.buffers (MutableMapping[tuple[str, int], runtime.buffer.Buffer]) – A mapping from type name and UID pairs to buffer instances.
stack (contextlib.ExitStack) – An exit stack for closing buffers.
shared (bool) – Whether buffers should be created shared memory.
- get_or_open(key: int, /, *, create: bool = True) runtime.buffer.DeviceBuffer [source]¶
- get_or_open(key: tuple[str, int], /, *, create: bool = True) runtime.buffer.Buffer
Get a buffer, opening a new one if necessary.
- Parameters
key – A buffer identifier.
create – Whether to create a new buffer if one does not already exist.
- Raises
KeyError – The device ID or type name does not exist.
DeviceBufferError – If
create=False
, but the buffer does not exist.
- classmethod make_catalog(catalog)[source]¶
Generate buffer types from the raw peripheral specification.
The raw peripheral specification follows this form:
{ "device_id": <int>, # For Smart Devices only "sub_interval": <float>, # Optional "write_interval": <float>, # Optional "heartbeat_interval": <float>, # Optional "params": [ # Keyword arguments of ``Parameter`` {"name": <str>, "type": <str>} ] }
- class runtime.buffer.DeviceBuffer[source]¶
Bases:
runtime.buffer.Buffer
A special buffer representing a Smart Device.
- emit_dev_data()[source]¶
Make messages for containing device data for recently updated parameters.
Calling this method will mark all parameters are not recently updated.
- Yields
runtime.messaging.Message – Zero or more messages containing device data.
- emit_dev_rw()[source]¶
Make messages for reading and writing parameters.
Calling this method will clear the pending parameter maps.
- Yields
runtime.messaging.Message – Zero or more messages, depending on how many parameters the consumer requested with
read()
orBuffer.write()
. If no parameters have been requested, since the last read, this method may yield no messages.
- emit_subscription()[source]¶
Make messages containing device data for subscribed parameters.
- Yields
runtime.messaging.Message – Zero or more messages containing device data.
- get_read()[source]¶
Get the parameters that are to be read.
Calling this method will clear the pending read map.
- Returns
A collection of parameter names.
- get_update()[source]¶
Get the parameters that were updated.
Calling this method will clear the pending update map.
- Returns
A map of parameter names to their corresponding values.
- get_write()[source]¶
Get the parameters that were written.
Calling this method will clear the pending write map.
- Returns
A map of parameter names to their corresponding values.
- make_sub_res()[source]¶
Make a subscription response for the parameters subscribed to.
- Returns
The subscription response message.
- update(message, /)[source]¶
Digest a Smart Device message and update this buffer’s state accordingly.
The message’s bitmaps and parameter values are copied into this buffer.
- property last_update: float¶
The timestamp given by
time.time()
of the last update.
- property last_write: float¶
The timestamp given by
time.time()
of the last write.
- class runtime.buffer.DeviceUID[source]¶
Bases:
runtime.buffer.BaseStructure
A unique device identifier.
- year¶
The year in which the device was created.
0x00
corresponds to the spring 2016 season, and increments for each season afterwards.- Type
Examples
>>> uid = DeviceUID(0xffff, 0xee, 0xc0debeef_deadbeef) >>> assert int(uid) == 0xffff_ee_c0debeef_deadbeef >>> uid = DeviceUID.from_int(0xffff_ee_c0debeef_deadbeef) >>> hex(uid.device_id) '0xffff' >>> hex(uid.year) '0xee' >>> hex(uid.random) '0xc0debeefdeadbeef'
- class runtime.buffer.NullDevice¶
Bases:
runtime.buffer.DeviceBuffer
A buffer type with no parameters.
Useful for creating a non-null placeholder where a buffer is required.
- class runtime.buffer.Parameter(name, ctype, id, lower=- inf, upper=inf, readable=True, writeable=False, subscribed=True)[source]¶
Bases:
NamedTuple
A parameter descriptor.
- Parameters
name (str) – The parameter name.
id (int) – The parameter ID, a nonnegative integer.
lower (float) – This parameter’s minimum valid value. For numeric parameters only.
upper (float) – This parameter’s maximum valid value. For numeric parameters only.
readable (bool) – Whether this parameter should be readable with
Buffer.get()
.writeable (bool) – Whether this parameter should be writeable with
Buffer.write()
.subscribed (bool) – Whether this parameter should be subscribed to, by default. Only applicable for Smart Devices.
- clamp(value, /)[source]¶
Ensure a real value is between this parameter’s lower and upper limits.
If the value is not within the bounds, this method will emit a warning.
- Parameters
value (float) – Candidate value.
- Returns
A value between
Parameter.lower
andParameter.upper
. A value that exceeds the minimum or maximum will be clipped to that bound.
Example
>>> param = Parameter('param', ctypes.c_double, 0, lower=-1, upper=1) >>> param.clamp(-1.1) -1 >>> param.clamp(1.1) 1 >>> param.clamp(0) 0
- static parse_ctype(type_specifier)[source]¶
Parse a type specifier into the corresponding C type.
- Parameters
type_specifier (str) – A description of the parameter’s type. Can be a the name of any simple
ctypes
data type without thec_
prefix. A vector type of n elements can also be specified by appending[<n>]
to an existing type (resembling C array declaration syntax). Pointer types are not supported.- Returns
A
ctypes
-compatible data type.
Example
>>> assert Parameter.parse_ctype('bool') is ctypes.c_bool >>> assert Parameter.parse_ctype('int32[64]') is ctypes.c_int32 * 64 >>> assert Parameter.parse_ctype('uint64[8][8]') is ctypes.c_uint64 * 8 * 8
- property default: Any¶
The default value when initializing this parameter.
- Returns
The default value of
Parameter.platform_type
.- Raises
DeviceBufferError – If this parameter is not a simple
ctype
.
Example
>>> Parameter('param', ctypes.c_double, 0).default 0.0 >>> Parameter('param', ctypes.c_int8, 0).default 0 >>> Parameter('param', ctypes.c_bool, 0).default False >>> Parameter('param', ctypes.c_bool * 3, 0).default Traceback (most recent call last): ... runtime.buffer.DeviceBufferError: vector parameters have no default
- property platform_type: type¶
A C type that has the correct width to hold this parameter’s values.
The type widths of Runtime’s platform and the peripheral’s platform may not match. This method performs the necessary conversion to allocate the correct amount of buffer space.
- Returns
A type exactly as wide as the values (bytes) emitted by the peripheral.