runtime.remote

Remote (procedure) calls.

Much like asyncio’s transports and protocols, this module is divided into low-level and high-level APIs:

  • The low-level API, Node and its implementations, deal with transporting discrete binary messages and managing the underlying transport.

  • The high-level API, Endpoint and its implementations, implement request/response semantics. Most consumers should use the high-level API.

This remote call message format is based on MessagePack-RPC, except this module uses cbor2 for serialization.

Every application (process) typically creates a single Handler bound to one or more Service instances. The handler encapsulates the application’s business logic and state, while each service exposes the handler’s methods to a different transport.

exception runtime.remote.RemoteCallError(message, /, **context)[source]

Bases: runtime.exception.RuntimeBaseException

Error produced by executing a remote call.

Parameters
  • message (str) – A human-readable description of the exception.

  • context (Any) – Machine-readable data.

class runtime.remote.Client(node, concurrency=1, logger=<factory>, requests=<factory>)[source]

Bases: runtime.remote.Endpoint

An endpoint for issuing remote calls.

A request is matched to its response with a message ID, a 32-bit integer unique among in-flight requests at any given time.

Parameters
  • requests (runtime.remote.RequestTracker[Any]) – Stores in-flight message IDs to futures. Each future represents the outcome of a call (a result or an exception).

  • node (runtime.remote.Node) – A node for transporting messages.

  • concurrency (int) – The number of workers for processing responses.

async issue_call(method, /, *args, address=None, notification=False, timeout=5)[source]

Issue a remote procedure call and possibly wait for the result.

Parameters
  • method (str) – Method name.

  • args (Any) – Method arguments.

  • address (Optional[Any]) – A transport-dependent address.

  • notification (bool) – False iff this call requires a response. Has no effect for nodes that cannot receive data, which can only send notifications.

  • timeout (float) – Maximum duration (in seconds) to wait for a response.

Raises
  • asyncio.TimeoutError – The request was successfully sent, but the response never arrived in time.

  • ValueError – If the request tracker could not generate a unique message ID.

  • RemoteCallError – If the service returned an error.

  • cbor2.CBOREncodeError – If the arguments were not serializable.

Note

Notification calls will not raise an exception client-side if the server fails, even if the node supports duplex communication.

property call: runtime.remote.CallFactory

Syntactic sugar for issuing remote procedure calls.

Instead of:

await client.issue_call('add', 1, 2)

Replace with either of:

await client.call.add(1, 2)
await client.call['add'](1, 2)
class runtime.remote.DatagramNode(host='', port=8000, bind=True, options=frozenset({}))[source]

Bases: runtime.remote.Node, asyncio.protocols.DatagramProtocol

A wrapper around asyncio’s datagram support.

Parameters
classmethod from_address(address, *, bind=True, options=frozenset({}))[source]

Build a datagram node from an address.

Parameters
Raises

ValueError – If the address is not a valid UDP address.

class runtime.remote.Endpoint(node, concurrency=1, logger=<factory>)[source]

Bases: abc.ABC

A source or destination of messages.

An Endpoint has a number of workers (instances of asyncio.Task) that listen for and process incoming messages. This allows for request pipelining. Once all workers are busy processing messages, the node wrapped by the endpoint buffers any additional messages.

Parameters
abstract async handle_message(address, message_type, *message)[source]

Process a message.

Parameters
  • address (Any) – The address of the message’s sender, if available. The semantics depend on the node. Pass this argument directly to Node.send().

  • message_type (runtime.remote.MessageType) – The message type.

  • message (Any) – Other message parts. The message type determines the format.

Raises
  • ValueError – If the endpoint could not unpack part of the message.

  • RemoteCallError – If the endpoint could not otherwise process the message.

class runtime.remote.Handler[source]

Bases: object

An object whose bound methods are exposed to remote callers.

Define a handler by subclassing Handler and applying the route() decorator:

>>> class CustomHandler(Handler):
...     @route
...     async def method1(self, arg: int) -> int:
...         ...
...     @route('non-python-identifier')
...     def method2(self):
...         ...
async dispatch(method, *args, timeout=30)[source]

Dispatch a remote procedure call.

If the method is synchronous (possibly blocking), the default executor performs the call.

Parameters
  • method (str) – The procedure name.

  • args (Any) – Positional arguments for the procedure.

Returns

The procedure’s result, which must be CBOR-serializable.

Raises

RemoteCallError – The procedure call does not exist, timed out, or raised an exception.

class runtime.remote.MessageType(value)[source]

Bases: enum.IntEnum

The message type ID.

REQUEST

Denotes a request message sent by clients. Requires a response.

RESPONSE

Denotes a response message sent by services.

NOTIFICATION

Denotes a notification message sent by clients. Does not require a response. Unlike the synchronous request-response pattern, notifications may be pipelined (i.e., multiple notifications in-flight simultaneously) for increased throughput.

class runtime.remote.Node[source]

Bases: abc.ABC

A transceiver of discrete binary messages.

A node wraps an underlying transport, such as a UDP endpoint, that it can repeatedly open, close, and reopen. Node supports the async context manager protocol (reusable) for automatically managing the transport.

When the transport is open, a node can send to and receive messages from one or more peers concurrently.

State Diagram:

start [-> closed]? -> open
    [[-> close -> open]? [-> send]? [-> recv]? [-> closed?]]*
-> close -> end

The data segments and address a Node sends and receives are opaque to the node. Their format and semantics depend on the transport and Endpoint the node works with.

send_count

The number of messages sent since the transport was opened.

Type

int

recv_count

The number of messages received since the transport was opened.

Type

int

abstract close()[source]

Close the internal transport.

abstract async open()[source]

Open the internal transport.

async recv()[source]

Receive a message.

Returns

Zero or more data segments and an address, which are transport-dependent.

Raises

RemoteCallError – If the transport cannot receive a message.

Note

Asking the transport directly for messages may be problematic if it is reopened or does not support concurrent waiters.

abstract async send(parts, /, *, address=None)[source]

Send a message.

Parameters
  • parts (list[bytes]) – Zero or more data segments.

  • address (Optional[Any]) – The destination’s address.

Raises

RemoteCallError – If the transport cannot send the message. May reopen the internal transport.

abstract property can_recv: bool

Whether the transport can receive messages.

abstract property closed: bool

Whether the internal transport is closed.

class runtime.remote.RequestTracker(futures=<factory>, lower=0, upper=4294967295)[source]

Bases: Generic[runtime.remote.ResponseType]

Track outstanding requests and their results.

Every request is associated with a unique request ID (an integer, or an object serializable as an integer).

Parameters
  • futures (collections.abc.MutableMapping[int, _asyncio.Future]) – A mapping from request IDs to futures representing responses.

  • lower (int) – Minimum valid request ID.

  • upper (int) – Maximum valid request ID.

generate_uid(*, attempts=10)[source]

Generate a unique request ID.

Parameters

attempts (int) – The maximum number of times to try to generate an ID.

Raises

ValueError – If the tracker could not generate a unique ID. If the ID space is sufficiently large, this error is exceedingly rare. Increasing the number of attempts or decreasing the number of in-flight requests should increase the probability of a unique ID.

new_request(request_id=None)[source]

Register a new request.

Parameters

request_id (Optional[int]) – A unique request identifier. If not provided, a request ID is randomly generated.

Returns

The request ID and a future representing the outcome of the request.

register_response(request_id, result)[source]

Register a request’s response.

Parameters
  • request_id (int) – The request identifier returned from new_request().

  • result (Union[BaseException, runtime.remote.ResponseType]) – The response or exception.

class runtime.remote.Router(frontend, backend)[source]

Bases: object

Routes messages between :class:`Client`s and :class:`Service`s that use sockets.

Routers are stateless, duplex, and symmetric (i.e., require the same format and exhibit the same behavior on both ends).

Routers have no error handling and may silently drop messages if the destination is unreachable. Clients must rely on timeouts to determine when to consider a request failed.

The payloads themselves are opaque to the router and are not deserialized.

Parameters
  • frontend (runtime.remote.SocketNode) – A ROUTER socket clients connect to.

  • backend (runtime.remote.SocketNode) – A ROUTER socket services connect to.

  • route_task – The background task performing the routing. Router implements the async context manager protocol, which automatically schedules and cancels this task.

classmethod bind(frontend, backend, frontend_options=None, backend_options=None)[source]

Construct a Router bound to the provided addresses.

class runtime.remote.Service(node, concurrency=1, logger=<factory>, handler=<factory>, timeout=30)[source]

Bases: runtime.remote.Endpoint

Responds to RPC requests.

Parameters
  • handler (runtime.remote.Handler) – The object whose bound methods this service will call.

  • timeout (float) – Maximum duration (in seconds) to execute methods for.

class runtime.remote.SocketNode(socket_type=5, options=<factory>, bindings=frozenset({}), connections=frozenset({}), subscriptions=<factory>)[source]

Bases: runtime.remote.Node

A wrapper around zmq.asyncio.Socket for handling timeouts.

When the underlying socket of a SocketNode times out, the socket is closed and rebuilt to reset socket’s internal state. For example, a REQ socket may become stuck in the “listening” state indefinitely if the message it sent gets lost.

Parameters
  • socket_type (int) – The socket type (a constant defined under zmq).

  • subscriptions (set[str]) – A set of topics to subscribe to (for SUB sockets only).

  • options (dict[int, typing.Union[int, bytes]]) – A mapping of ZMQ socket option symbols to their values.

  • connections (frozenset[str]) – A set of addresses to connect to.

  • bindings (frozenset[str]) – A set of addresses to bind to.

set_option(option, value, /)[source]

Set a socket option.

Parameters
  • option (int) – A socket option symbol defined by zmq.

  • value (Union[int, bytes]) – The option value (the type/format depends on the option).

subscribe(topic='')[source]

Subscribe to a topic (for zmq.SUB sockets only).

Parameters

topic (str) – The topic to subscribe to.

unsubscribe(topic='')[source]

Unsubscribe from a topic (for zmq.SUB sockets only).

Parameters

topic (str) – The topic to unsubscribe from.

property identity: bytes

The ZMQ identity of this socket.

runtime.remote.route(method_or_name: str, /) collections.abc.Callable[collections.abc.Callable[..., Any], runtime.remote.RemoteMethod][source]
runtime.remote.route(method_or_name: collections.abc.Callable[..., Any], /) runtime.remote.RemoteMethod

Decorator for marking a bound method as an RPC target.

Parameters

method_or_name – Either the method to be registered or the name it should be registered under. If the former, the method name is exposed to the Handler. The latter is useful for exposing a name that is not a valid Python identifier.

Returns

Either an identity decorator (if a name was provided) or the method provided.