runtime.process
¶
Process and resource management.
Wrapping other low-level modules, this module provides a high-level interface for managing processes, endpoints, and other resources. This module is intended for consumption by service handlers and tools implementing Runtime’s business logic.
- class runtime.process.Application(name, options, stack=<factory>, endpoints=<factory>, logger=<factory>)[source]¶
Bases:
object
An application opens and closes resources created from command-line options.
Generally, you create one
Application
perasyncio.run
main function, like so:>>> async def main(**options): ... async with Application('my-app', options) as app: ... ...
An
Application
producesremote.Endpoint
andremote.Router
instances in common configurations. It also configures theasyncio
loop and logging framework, which are needed to make the messaging components work.- Parameters
name (str) – The name of the application (preferably kebab case and unique across all applications).
options (Mapping[str, Any]) – A map of option names to their values.
stack (contextlib.AsyncExitStack) – The stack that the app’s resources are pushed on.
endpoints (dict[str, runtime.remote.SocketNode]) – A map of endpoint names to endpoints.
logger (structlog.stdlib.AsyncBoundLogger) – A logger instance (may not be bound).
- configure_loop()[source]¶
Configure the current
asyncio
loop and environment.Sets the debug flag, default executor, and exception handler, which logs exceptions produced by event loop callbacks.
Set the current task and thread names.
If this method is called in the main thread, set signal handlers for
SIGINT
andSIGTERM
that cancel the current task.
Note
This method assumes the current task is the main task run by
asyncio.run()
.
- async make_client(node=None)[source]¶
Make and start a remote call client.
- Parameters
node (Optional[runtime.remote.Node]) – The node the client should wrap. If not provided, this method constructs and uses a node backed by a
DEALER
socket and connected to the router frontend. By default, the socket’s identity is the app named suffixed by-client
.
- async make_control_client()[source]¶
Make a client for sending gamepad (control) inputs.
Since the message flow is unidirectional, notification messages are recommended.
- async make_control_service(handler)[source]¶
Make a service for receiving gamepad (control) inputs.
- Parameters
handler (runtime.remote.Handler) – A remote call handler with a bound method
update_gamepads
, which accepts a single positional argument (an update object) and returns nothing.
Note
The format of an update object is:
{"<uid>": {"<param-name>": <value>}}
- async make_log_forwarder()[source]¶
Make a threaded device that forwards ZMQ PUB-SUB messages emitted by loggers.
The device is subscribed to all messages. Both sockets bind to fixed addresses.
- async make_log_publisher()[source]¶
Make a client that connects to the log forwarder’s backend socket.
The publisher will be installed in the processor chain.
- async make_log_subscriber(handler)[source]¶
Make a service that connects to the log forwarder’s frontend socket.
- Parameters
handler (runtime.remote.Handler) – A remote call handler. The subscriber will call the handler’s methods when it receives a logged message. Each method should be named after the log level it handles (e.g.,
debug
) and take a single positional argument: the event dictionary (seeruntime.log.configure()
for details on the format).
- async make_service(handler, node=None, logger=None)[source]¶
Make and start a remote call service.
- Parameters
handler (runtime.remote.Handler) – A remote call handler.
node (Optional[runtime.remote.Node]) – The node the service should wrap. If not provided, this method constructs and uses a node backed by a
DEALER
socket and connected to the router backend. By default, the socket’s identity is the app name suffixed by-service
.logger (Optional[structlog.stdlib.AsyncBoundLogger]) – The logger passed to the service.
- async make_update_client()[source]¶
Make a client for publishing Smart Device updates over UDP/IP multicast.
Since the message flow is unidirectional, notification messages are recommended.
- async make_update_service(handler)[source]¶
Make a service for receiving Smart Device updates over UDP/IP multicast.
- Parameters
handler (runtime.remote.Handler) – A remote call handler with a bound method
update
, which accepts a single positional argument (an update object) and returns nothing.
Note
The format of an update object is:
{ "<gamepad-index>": { "lx": <float: [-1, 1]>, "ly": <float: [-1, 1]>, "rx": <float: [-1, 1]>, "ry": <float: [-1, 1]>, "btn": int, }, ... }
The
[lr][xy]
keys represent joystick positions, wherel
andr
stand for left and right, respectively, andx
andy
are Cartesian coordinates. The origin (0, 0) corresponds to the joystick in the resting position. Each joystick is constrained within the unit circle.btn
is a bitmask where a “1” bit indicates the corresponding button is pressed. Consult the device catalog for which buttons correspond to which bits. (The first parameters correspond to the lower-order bits.)
- property executor: concurrent.futures.thread.ThreadPoolExecutor¶
A thread pool executor for running synchronous tasks.
- class runtime.process.AsyncProcess(*args, **kwargs)[source]¶
Bases:
multiprocessing.context.Process
,runtime.process.AsyncProcessType
A subprocess with a callable as an entry point.
This class is the asynchronous version of
multiprocessing.Process
, meaning that the parent process can join the child process asynchronously. The implementation works withasyncio
natively by watching a file descriptor that is ready to read once the process exits. This event-triggered approach does not need a helper task/thread dedicated to blocking onmultiprocessing.Process.join()
or pollingmultiprocessing.Process.is_alive()
.- Parameters
args (Any) – Positional arguments to
multiprocessing.Process
.kwargs (Any) – Keyword arguments to
multiprocessing.Process
. By default, this is a daemon process.
- class runtime.process.AsyncProcessType(*args, **kwargs)[source]¶
Bases:
Protocol
Abstract base type for subprocesses.
This interface contains a subset of
asyncio.subprocess.Process
and follows similar semantics.- abstract kill()[source]¶
Forcefully kill the child process.
The behavior is platform-dependent. On POSIX systems, this method sends the
SIGKILL
signal to the child process.
- abstract terminate()[source]¶
Stop the child process.
The behavior is platform-dependent. On POSIX systems, this method sends the
SIGTERM
signal to the child process.
- abstract async wait()[source]¶
Wait for the child process to terminate.
- Returns
The exit code (
AsyncProcessType.returncode
).- Raises
ValueError – If the process is not yet started.
- async runtime.process.run_process(process, *, terminate_timeout=2)[source]¶
Start and wait for a subprocess to exit.
If the task running this function is cancelled in the parent process while the child process has not yet exited, this function will attempt to terminate the child. If the child is not well-behaved and does not terminate by a timeout, this function kills the child, guaranteeing no orphan process left behind.
Once the child is dead, this function also inspects the child’s return code to determine if it raised an emergency stop. If so, this function re-raises the exception.
- Parameters
process (runtime.process.AsyncProcessType) – The subprocess, which will be started if it is an instance of
AsyncProcess
and not yet alive.terminate_timeout (float) – Maximum duration (in seconds) to wait for termination.
- Returns
The process exit code.
- Raises
EmergencyStopException – If the subprocess raised an emergency stop.
- async runtime.process.spin(func, /, *args, interval=1, **kwargs)[source]¶
Periodically execute an async callback.
- Parameters
func (Callable[[...], Awaitable[Any]]) – Async callback.
args (Any) – Positonal arguments to the callback.
interval (float) – Duration (in seconds) between calls. The callback is allows to run for longer than the interval. The callback should implement any timeout logic if cancellation is desired.
kwargs (Any) – Keyword arguments to the callback.