runtime.process

Process and resource management.

Wrapping other low-level modules, this module provides a high-level interface for managing processes, endpoints, and other resources. This module is intended for consumption by service handlers and tools implementing Runtime’s business logic.

class runtime.process.Application(name, options, stack=<factory>, endpoints=<factory>, logger=<factory>)[source]

Bases: object

An application opens and closes resources created from command-line options.

Generally, you create one Application per asyncio.run main function, like so:

>>> async def main(**options):
...     async with Application('my-app', options) as app:
...         ...

An Application produces remote.Endpoint and remote.Router instances in common configurations. It also configures the asyncio loop and logging framework, which are needed to make the messaging components work.

Parameters
configure_loop()[source]

Configure the current asyncio loop and environment.

  • Sets the debug flag, default executor, and exception handler, which logs exceptions produced by event loop callbacks.

  • Set the current task and thread names.

  • If this method is called in the main thread, set signal handlers for SIGINT and SIGTERM that cancel the current task.

Note

This method assumes the current task is the main task run by asyncio.run().

make_buffer_manager(*, shared=True)[source]

Make a buffer manager.

Parameters

shared (bool) – True if the buffers should be backed by shared memory, False for non-shared (private) memory.

async make_client(node=None)[source]

Make and start a remote call client.

Parameters

node (Optional[runtime.remote.Node]) – The node the client should wrap. If not provided, this method constructs and uses a node backed by a DEALER socket and connected to the router frontend. By default, the socket’s identity is the app named suffixed by -client.

async make_control_client()[source]

Make a client for sending gamepad (control) inputs.

Since the message flow is unidirectional, notification messages are recommended.

async make_control_service(handler)[source]

Make a service for receiving gamepad (control) inputs.

Parameters

handler (runtime.remote.Handler) – A remote call handler with a bound method update_gamepads, which accepts a single positional argument (an update object) and returns nothing.

Note

The format of an update object is:

{"<uid>": {"<param-name>": <value>}}
async make_log_forwarder()[source]

Make a threaded device that forwards ZMQ PUB-SUB messages emitted by loggers.

The device is subscribed to all messages. Both sockets bind to fixed addresses.

async make_log_publisher()[source]

Make a client that connects to the log forwarder’s backend socket.

The publisher will be installed in the processor chain.

async make_log_subscriber(handler)[source]

Make a service that connects to the log forwarder’s frontend socket.

Parameters

handler (runtime.remote.Handler) – A remote call handler. The subscriber will call the handler’s methods when it receives a logged message. Each method should be named after the log level it handles (e.g., debug) and take a single positional argument: the event dictionary (see runtime.log.configure() for details on the format).

async make_router()[source]

Make a router for passing remote call requests and responses.

async make_service(handler, node=None, logger=None)[source]

Make and start a remote call service.

Parameters
  • handler (runtime.remote.Handler) – A remote call handler.

  • node (Optional[runtime.remote.Node]) – The node the service should wrap. If not provided, this method constructs and uses a node backed by a DEALER socket and connected to the router backend. By default, the socket’s identity is the app name suffixed by -service.

  • logger (Optional[structlog.stdlib.AsyncBoundLogger]) – The logger passed to the service.

async make_update_client()[source]

Make a client for publishing Smart Device updates over UDP/IP multicast.

Since the message flow is unidirectional, notification messages are recommended.

async make_update_service(handler)[source]

Make a service for receiving Smart Device updates over UDP/IP multicast.

Parameters

handler (runtime.remote.Handler) – A remote call handler with a bound method update, which accepts a single positional argument (an update object) and returns nothing.

Note

The format of an update object is:

{
    "<gamepad-index>": {
        "lx": <float: [-1, 1]>,
        "ly": <float: [-1, 1]>,
        "rx": <float: [-1, 1]>,
        "ry": <float: [-1, 1]>,
        "btn": int,
    },
    ...
}

The [lr][xy] keys represent joystick positions, where l and r stand for left and right, respectively, and x and y are Cartesian coordinates. The origin (0, 0) corresponds to the joystick in the resting position. Each joystick is constrained within the unit circle.

btn is a bitmask where a “1” bit indicates the corresponding button is pressed. Consult the device catalog for which buttons correspond to which bits. (The first parameters correspond to the lower-order bits.)

report_health()[source]

Schedule a task to periodically log the health of this process.

property executor: concurrent.futures.thread.ThreadPoolExecutor

A thread pool executor for running synchronous tasks.

class runtime.process.AsyncProcess(*args, **kwargs)[source]

Bases: multiprocessing.context.Process, runtime.process.AsyncProcessType

A subprocess with a callable as an entry point.

This class is the asynchronous version of multiprocessing.Process, meaning that the parent process can join the child process asynchronously. The implementation works with asyncio natively by watching a file descriptor that is ready to read once the process exits. This event-triggered approach does not need a helper task/thread dedicated to blocking on multiprocessing.Process.join() or polling multiprocessing.Process.is_alive().

Parameters
class runtime.process.AsyncProcessType(*args, **kwargs)[source]

Bases: Protocol

Abstract base type for subprocesses.

This interface contains a subset of asyncio.subprocess.Process and follows similar semantics.

abstract kill()[source]

Forcefully kill the child process.

The behavior is platform-dependent. On POSIX systems, this method sends the SIGKILL signal to the child process.

abstract terminate()[source]

Stop the child process.

The behavior is platform-dependent. On POSIX systems, this method sends the SIGTERM signal to the child process.

abstract async wait()[source]

Wait for the child process to terminate.

Returns

The exit code (AsyncProcessType.returncode).

Raises

ValueError – If the process is not yet started.

abstract property pid: Optional[int]

The process identifier (PID).

abstract property returncode: Optional[int]

The return (exit) code of a terminated process.

This attribute is None for processes that have not yet exited.

async runtime.process.run_process(process, *, terminate_timeout=2)[source]

Start and wait for a subprocess to exit.

If the task running this function is cancelled in the parent process while the child process has not yet exited, this function will attempt to terminate the child. If the child is not well-behaved and does not terminate by a timeout, this function kills the child, guaranteeing no orphan process left behind.

Once the child is dead, this function also inspects the child’s return code to determine if it raised an emergency stop. If so, this function re-raises the exception.

Parameters
Returns

The process exit code.

Raises

EmergencyStopException – If the subprocess raised an emergency stop.

async runtime.process.spin(func, /, *args, interval=1, **kwargs)[source]

Periodically execute an async callback.

Parameters
  • func (Callable[[...], Awaitable[Any]]) – Async callback.

  • args (Any) – Positonal arguments to the callback.

  • interval (float) – Duration (in seconds) between calls. The callback is allows to run for longer than the interval. The callback should implement any timeout logic if cancellation is desired.

  • kwargs (Any) – Keyword arguments to the callback.