runtime.remote
¶
Remote (procedure) calls.
Much like asyncio
’s transports and protocols, this module is divided into
low-level and high-level APIs:
The low-level API,
Node
and its implementations, deal with transporting discrete binary messages and managing the underlying transport.The high-level API,
Endpoint
and its implementations, implement request/response semantics. Most consumers should use the high-level API.
This remote call message format is based on MessagePack-RPC, except this module uses
cbor2
for serialization.
Every application (process) typically creates a single Handler
bound to one or
more Service
instances. The handler encapsulates the application’s business
logic and state, while each service exposes the handler’s methods to a different
transport.
- exception runtime.remote.RemoteCallError(message, /, **context)[source]¶
Bases:
runtime.exception.RuntimeBaseException
Error produced by executing a remote call.
- Parameters
message (str) – A human-readable description of the exception.
context (Any) – Machine-readable data.
- class runtime.remote.Client(node, concurrency=1, logger=<factory>, requests=<factory>)[source]¶
Bases:
runtime.remote.Endpoint
An endpoint for issuing remote calls.
A request is matched to its response with a message ID, a 32-bit integer unique among in-flight requests at any given time.
- Parameters
requests (runtime.remote.RequestTracker[Any]) – Stores in-flight message IDs to futures. Each future represents the outcome of a call (a result or an exception).
node (runtime.remote.Node) – A node for transporting messages.
concurrency (int) – The number of workers for processing responses.
- async issue_call(method, /, *args, address=None, notification=False, timeout=5)[source]¶
Issue a remote procedure call and possibly wait for the result.
- Parameters
method (str) – Method name.
args (Any) – Method arguments.
address (Optional[Any]) – A transport-dependent address.
notification (bool) – False iff this call requires a response. Has no effect for nodes that cannot receive data, which can only send notifications.
timeout (float) – Maximum duration (in seconds) to wait for a response.
- Raises
asyncio.TimeoutError – The request was successfully sent, but the response never arrived in time.
ValueError – If the request tracker could not generate a unique message ID.
RemoteCallError – If the service returned an error.
cbor2.CBOREncodeError – If the arguments were not serializable.
Note
Notification calls will not raise an exception client-side if the server fails, even if the node supports duplex communication.
- property call: runtime.remote.CallFactory¶
Syntactic sugar for issuing remote procedure calls.
Instead of:
await client.issue_call('add', 1, 2)
Replace with either of:
await client.call.add(1, 2) await client.call['add'](1, 2)
- class runtime.remote.DatagramNode(host='', port=8000, bind=True, options=frozenset({}))[source]¶
Bases:
runtime.remote.Node
,asyncio.protocols.DatagramProtocol
A wrapper around
asyncio
’s datagram support.- Parameters
host (str) – Hostname.
port (int) – Port number.
bind (bool) – Whether to bind the socket to a local address or connect to a remote one.
options (collections.abc.Collection[tuple[int, int, typing.Union[int, bytes]]]) – Socket options in the form (level, option, value) passed to
socket.socket.setsockopt()
.
- classmethod from_address(address, *, bind=True, options=frozenset({}))[source]¶
Build a datagram node from an address.
- Parameters
address (str) – The address to parse, in the form
udp://[hostname[:port]]
.bind (bool) – True if this is a local address (socket is bound). False for a remote address (socket connects).
options (collections.abc.Collection[tuple[int, int, typing.Union[int, bytes]]]) – Socket options passed to
DatagramNode
.
- Raises
ValueError – If the address is not a valid UDP address.
- class runtime.remote.Endpoint(node, concurrency=1, logger=<factory>)[source]¶
Bases:
abc.ABC
A source or destination of messages.
An
Endpoint
has a number of workers (instances ofasyncio.Task
) that listen for and process incoming messages. This allows for request pipelining. Once all workers are busy processing messages, the node wrapped by the endpoint buffers any additional messages.- Parameters
node (runtime.remote.Node) – The message transceiver. Not all node/endpoint pairs are compatible.
concurrency (int) – The number of workers.
logger (structlog.stdlib.AsyncBoundLogger) – A logger instance.
- abstract async handle_message(address, message_type, *message)[source]¶
Process a message.
- Parameters
address (Any) – The address of the message’s sender, if available. The semantics depend on the node. Pass this argument directly to
Node.send()
.message_type (runtime.remote.MessageType) – The message type.
message (Any) – Other message parts. The message type determines the format.
- Raises
ValueError – If the endpoint could not unpack part of the message.
RemoteCallError – If the endpoint could not otherwise process the message.
- class runtime.remote.Handler[source]¶
Bases:
object
An object whose bound methods are exposed to remote callers.
Define a handler by subclassing
Handler
and applying theroute()
decorator:>>> class CustomHandler(Handler): ... @route ... async def method1(self, arg: int) -> int: ... ... ... @route('non-python-identifier') ... def method2(self): ... ...
- async dispatch(method, *args, timeout=30)[source]¶
Dispatch a remote procedure call.
If the method is synchronous (possibly blocking), the default executor performs the call.
- Parameters
method (str) – The procedure name.
args (Any) – Positional arguments for the procedure.
- Returns
The procedure’s result, which must be CBOR-serializable.
- Raises
RemoteCallError – The procedure call does not exist, timed out, or raised an exception.
- class runtime.remote.MessageType(value)[source]¶
Bases:
enum.IntEnum
The message type ID.
- REQUEST¶
Denotes a request message sent by clients. Requires a response.
- RESPONSE¶
Denotes a response message sent by services.
- NOTIFICATION¶
Denotes a notification message sent by clients. Does not require a response. Unlike the synchronous request-response pattern, notifications may be pipelined (i.e., multiple notifications in-flight simultaneously) for increased throughput.
- class runtime.remote.Node[source]¶
Bases:
abc.ABC
A transceiver of discrete binary messages.
A node wraps an underlying transport, such as a UDP endpoint, that it can repeatedly open, close, and reopen.
Node
supports the async context manager protocol (reusable) for automatically managing the transport.When the transport is open, a node can send to and receive messages from one or more peers concurrently.
State Diagram:
start [-> closed]? -> open [[-> close -> open]? [-> send]? [-> recv]? [-> closed?]]* -> close -> end
The data segments and address a
Node
sends and receives are opaque to the node. Their format and semantics depend on the transport andEndpoint
the node works with.- async recv()[source]¶
Receive a message.
- Returns
Zero or more data segments and an address, which are transport-dependent.
- Raises
RemoteCallError – If the transport cannot receive a message.
Note
Asking the transport directly for messages may be problematic if it is reopened or does not support concurrent waiters.
- abstract async send(parts, /, *, address=None)[source]¶
Send a message.
- Parameters
- Raises
RemoteCallError – If the transport cannot send the message. May reopen the internal transport.
- class runtime.remote.RequestTracker(futures=<factory>, lower=0, upper=4294967295)[source]¶
Bases:
Generic
[runtime.remote.ResponseType
]Track outstanding requests and their results.
Every request is associated with a unique request ID (an integer, or an object serializable as an integer).
- Parameters
futures (collections.abc.MutableMapping[int, _asyncio.Future]) – A mapping from request IDs to futures representing responses.
lower (int) – Minimum valid request ID.
upper (int) – Maximum valid request ID.
- generate_uid(*, attempts=10)[source]¶
Generate a unique request ID.
- Parameters
attempts (int) – The maximum number of times to try to generate an ID.
- Raises
ValueError – If the tracker could not generate a unique ID. If the ID space is sufficiently large, this error is exceedingly rare. Increasing the number of attempts or decreasing the number of in-flight requests should increase the probability of a unique ID.
- new_request(request_id=None)[source]¶
Register a new request.
- Parameters
request_id (Optional[int]) – A unique request identifier. If not provided, a request ID is randomly generated.
- Returns
The request ID and a future representing the outcome of the request.
- register_response(request_id, result)[source]¶
Register a request’s response.
- Parameters
request_id (int) – The request identifier returned from
new_request()
.result (Union[BaseException, runtime.remote.ResponseType]) – The response or exception.
- class runtime.remote.Router(frontend, backend)[source]¶
Bases:
object
Routes messages between :class:`Client`s and :class:`Service`s that use sockets.
Routers are stateless, duplex, and symmetric (i.e., require the same format and exhibit the same behavior on both ends).
Routers have no error handling and may silently drop messages if the destination is unreachable. Clients must rely on timeouts to determine when to consider a request failed.
The payloads themselves are opaque to the router and are not deserialized.
- Parameters
frontend (runtime.remote.SocketNode) – A
ROUTER
socket clients connect to.backend (runtime.remote.SocketNode) – A
ROUTER
socket services connect to.route_task – The background task performing the routing.
Router
implements the async context manager protocol, which automatically schedules and cancels this task.
- class runtime.remote.Service(node, concurrency=1, logger=<factory>, handler=<factory>, timeout=30)[source]¶
Bases:
runtime.remote.Endpoint
Responds to RPC requests.
- Parameters
handler (runtime.remote.Handler) – The object whose bound methods this service will call.
timeout (float) – Maximum duration (in seconds) to execute methods for.
- class runtime.remote.SocketNode(socket_type=5, options=<factory>, bindings=frozenset({}), connections=frozenset({}), subscriptions=<factory>)[source]¶
Bases:
runtime.remote.Node
A wrapper around
zmq.asyncio.Socket
for handling timeouts.When the underlying socket of a
SocketNode
times out, the socket is closed and rebuilt to reset socket’s internal state. For example, aREQ
socket may become stuck in the “listening” state indefinitely if the message it sent gets lost.- Parameters
socket_type (int) – The socket type (a constant defined under
zmq
).subscriptions (set[str]) – A set of topics to subscribe to (for
SUB
sockets only).options (dict[int, typing.Union[int, bytes]]) – A mapping of ZMQ socket option symbols to their values.
connections (frozenset[str]) – A set of addresses to connect to.
- subscribe(topic='')[source]¶
Subscribe to a topic (for
zmq.SUB
sockets only).- Parameters
topic (str) – The topic to subscribe to.
- runtime.remote.route(method_or_name: str, /) collections.abc.Callable[collections.abc.Callable[..., Any], runtime.remote.RemoteMethod] [source]¶
- runtime.remote.route(method_or_name: collections.abc.Callable[..., Any], /) runtime.remote.RemoteMethod
Decorator for marking a bound method as an RPC target.
- Parameters
method_or_name – Either the method to be registered or the name it should be registered under. If the former, the method name is exposed to the
Handler
. The latter is useful for exposing a name that is not a valid Python identifier.- Returns
Either an identity decorator (if a name was provided) or the method provided.