"""Student API for controlling the robot.
The student provides a single Python file containing a minimum of four functions:
>>> def autonomous_setup():
... ...
>>> def autonomous_main():
... ...
>>> def teleop_setup():
... ...
>>> def teleop_main():
... ...
The ``*_setup`` functions---``autonomous_setup`` and ``teleop_setup``---run once at the
start of the autonomous and teleop phases, respectively. After the setup function runs,
the corresponding ``*_main`` functions run periodically until the end of the phase. The
frequency of ``main`` calls is configurable, but the default interval is 0.1s.
These interfaces are largely thin wrappers around :class:`runtime.buffer.BufferStore`.
"""
import abc
import asyncio
import enum
import functools
import time
from dataclasses import dataclass, field
from typing import (
Any,
Awaitable,
Callable,
ClassVar,
Mapping,
Optional,
Protocol,
TypeVar,
Union,
)
from . import log
from .buffer import Buffer, BufferKey, BufferStore, DeviceBufferError
__all__ = [
'Actions',
'Alliance',
'BufferAPI',
'Field',
'Gamepad',
'Robot',
'StudentAPI',
'safe',
]
Action = Callable[..., Awaitable[None]]
[docs]@enum.unique
class Alliance(enum.IntEnum):
"""The alliances that compete in a match.
Attributes:
BLUE: The blue alliance.
GOLD: The gold alliance.
"""
BLUE = 0
GOLD = 1
[docs]class StudentAPI(abc.ABC):
"""Base type for all student-callable interfaces."""
[docs]class Actions(StudentAPI):
"""API for performing asynchronous execution.
An "action" is a special function defined with the ``async def`` keywords, like so:
>>> async def wave_hand():
... Robot.set(SERVO_UID, 'servo0', -0.5)
... await Actions.sleep(1)
... Robot.set(SERVO_UID, 'servo0', 0.5)
In this example, the robot's servo rotates 45 degrees counterclockwise from the
motor's center, waits one second, and then rotates 90 degrees clockwise. Actions are
useful for triggering time-sensitive events outside the ``autonomous_main`` or
``teleop_main`` functions, which fire at a fixed frequency.
To schedule an action to run, call :meth:`Actions.run` on the action's name from one
of the regular ``*_setup`` or ``*_main`` functions, like so:
>>> def autonomous_setup():
... Actions.run(wave_hand) # Correct
... wave_hand() # Incorrect: will not run
Do not call an action like you would call a regular function.
"""
[docs] @staticmethod
async def sleep(duration: float, /) -> None:
"""Pause the current action for some amount of time.
Parameters:
duration: The number of seconds to wait for. Must be a nonnegative number.
Note:
Remember to use the ``await`` keyword before :meth:`Actions.sleep`.
"""
await asyncio.sleep(duration)
[docs] @abc.abstractmethod
def run(
self,
action: Action,
/,
*args: Any,
timeout: float = 30,
periodic: bool = False,
) -> None:
"""Schedule an action to run outside of the ``*_main`` functions.
Parameters:
action: An action (coroutine function).
args: Positional arguments to pass to the action.
timeout: Maximum number of seconds the action should be allowed to run for.
Must be a nonnegative number.
periodic: Whether to run the action repeatedly or not. A periodic action
that completes before the timeout has elapsed is not rescheduled early.
"""
[docs] @abc.abstractmethod
def is_running(self, action: Action, /) -> Optional[bool]:
"""Check whether an action is already running.
Parameters:
action: An action (coroutine function).
"""
[docs]@dataclass
class BufferAPI(StudentAPI):
"""Base type for all APIs that access shared memory buffers.
Parameters:
buffers: Buffer store.
logger: Synchronous bound logger.
"""
buffers: BufferStore
logger: log.Logger
def _get_default(self, type_name: str, param: str) -> Any:
buf_type = self.buffers.catalog[type_name]
return buf_type.params[param].default
def _get(self, key: BufferKey, param: str) -> Any:
type_name, _ = key = self.buffers.normalize_key(key)
default = self._get_default(type_name, param)
context = {'type': type_name, 'param': param, 'default': default}
try:
buffer = self.buffers[key]
except (DeviceBufferError, KeyError) as exc:
self.logger.warn('Device does not exist', exc_info=exc, **context)
return default
try:
return buffer.get(param)
except DeviceBufferError as exc:
self.logger.warn('Unable to get parameter', exc_info=exc, **context)
return default
RT = TypeVar('RT')
[docs]def safe(method: Callable[..., RT]) -> Callable[..., Optional[RT]]:
"""A decorator that wraps API methods to catch and log any exceptions.
Parameters:
method: API method to be wrapped.
Returns:
The method wrapper. If the wrapped method raises an :class:`Exception`, the
wrapper's return value will be :data:`None`. :class:`BaseException` is too broad
to catch.
"""
@functools.wraps(method)
def wrapper(self: BufferAPI, /, *args: Any, **kwargs: Any) -> Optional[RT]:
try:
return method(self, *args, **kwargs)
except Exception as exc: # pylint: disable=broad-except; student-facing method
self.logger.error(f'{method.__name__}(...) raised an error', exc_info=exc)
return None
return wrapper
[docs]@dataclass
class Robot(BufferAPI):
"""API for accessing Smart Devices.
Parameters:
names: A mapping from human-readable device names (aliases) to UIDs that
students can configure.
"""
names: Mapping[str, int] = field(default_factory=dict)
def _translate_uid(self, uid: Union[str, int]) -> int:
if isinstance(uid, str):
uid = self.names.get(uid, uid)
try:
return int(uid)
except ValueError as exc:
raise DeviceBufferError(
'UID is neither an integer nor a name',
names=list(self.names),
) from exc
[docs] @safe
def get(self, uid: Union[str, int], param: str, /) -> Any:
"""Get a Smart Device parameter.
Parameters:
uid: Either a UID as an integer or a device name to be resolved into a UID.
param: Parameter name.
Returns:
The current parameter value. Because written parameters take time to
propogate to the device and the device must send an acknowledgement, the
current value may not immediately reflect a written value.
"""
return self._get(self._translate_uid(uid), param)
[docs] @safe
def write(self, uid: Union[str, int], param: str, value: Any, /) -> None:
"""Write a Smart Device parameter.
Parameters:
uid: Either a UID as an integer or a device name to be resolved into a UID.
param: Parameter name.
value: New parameter value.
"""
self.buffers[self._translate_uid(uid)].write(param, value)
[docs]@dataclass
class Gamepad(BufferAPI):
"""API for reading game controller inputs.
Parameters:
enabled: Whether gamepads are enabled. In autonomous mode, this parameter should
be set to :data:`False`.
"""
enabled: bool = True
TYPE_NAME: ClassVar[str] = 'gamepad'
[docs] @safe
def get(self, param: str, index: int = 0, /) -> Any:
"""Get a gamepad parameter.
Attempting to access a gamepad while it is disabled will emit a warning but
will still return a type-safe default value.
Parameters:
param: Parameter name.
index: Gamepad identifier (a nonnegative integer).
"""
if not self.enabled:
default = self._get_default(self.TYPE_NAME, param)
self.logger.error(
'Gamepad is not enabled in autonomous',
param=param,
index=index,
)
return default
return self._get((self.TYPE_NAME, index), param)
[docs]@dataclass
class Field(BufferAPI):
"""API for interacting with the field and other robots.
Parameters:
start: The UNIX timestamp (in seconds) of the start of the current
autonomous/teleop phase.
"""
start: float = field(default_factory=time.time)
@property
def _buffer(self, /) -> Buffer:
return self.buffers['field', 0]
@property # type: ignore[misc]
@safe
def alliance(self, /) -> Alliance:
"""The alliance this robot is a member of in this match."""
return Alliance(self._buffer.alliance)
[docs] def clock(self, /) -> float:
"""The number of seconds since autonomus or teleop started."""
return time.time() - self.start
[docs] @safe
def send(self, obj: Any, /) -> None:
"""Send a message to an allied robot."""
... # TODO
[docs] @safe
def recv(self, /) -> Any:
"""Receive a message from an allied robot."""
... # TODO
class StudentCodeModule(Protocol):
"""The API symbols made available to the student code module.
Note:
The :func:`print` function should also be listed, but Mypy does not yet support
replacing callables: https://github.com/python/mypy/issues/708
"""
Alliance: type[Alliance] = Alliance
Actions: Actions
Robot: Robot
Gamepad: Gamepad
Field: Field