import functools
import inspect
import logging
from typing import Callable, Optional
import ntcore
import wpilib
logger = logging.getLogger("autonomous")
# use this to track ordering of functions, so that we can display them
# properly in the tuning widget on the dashboard
__global_cnt_serial = [0]
def _get_state_serial():
__global_cnt_serial[0] = __global_cnt_serial[0] + 1
return __global_cnt_serial[0]
class _State:
def __init__(self, f: Callable, first: bool):
# inspect the args, provide a correct call implementation
allowed_args = "self", "tm", "state_tm", "initial_call"
sig = inspect.signature(f)
name = f.__name__
args = []
invalid_args = []
for i, arg in enumerate(sig.parameters.values()):
if i == 0 and arg.name != "self":
raise ValueError(f"First argument to {name} must be 'self'")
if arg.kind is arg.VAR_POSITIONAL:
raise ValueError(f"Cannot use *args in signature for function {name}")
if arg.kind is arg.VAR_KEYWORD:
raise ValueError(
f"Cannot use **kwargs in signature for function {name}"
)
if arg.kind is arg.KEYWORD_ONLY:
raise ValueError(
"Cannot use keyword-only parameters for function %s" % name
)
if arg.name in allowed_args:
args.append(arg.name)
else:
invalid_args.append(arg.name)
if invalid_args:
raise ValueError(
"Invalid parameter names in %s: %s" % (name, ",".join(invalid_args))
)
functools.update_wrapper(self, f)
# store state variables here
self._func = f
self.name = name
self.description = f.__doc__
self.ran = False
self.first = first
self.expires = 0xFFFFFFFF
self.serial = _get_state_serial()
varlist = {"f": f}
args_code = ",".join(args)
wrapper_code = f"lambda self, tm, state_tm, initial_call: f({args_code})"
self.run = eval(wrapper_code, varlist, varlist)
def __call__(self, *args, **kwargs):
self._func(*args, **kwargs)
def __set_name__(self, owner: type, name: str) -> None:
if not issubclass(owner, StatefulAutonomous):
raise TypeError(
f"StatefulAutonomous state {name} defined in non-StatefulAutonomous"
)
#
# Decorators:
#
# state
# timed_state
#
[docs]
def timed_state(
f: Optional[Callable] = None,
duration: float = None,
next_state: str = None,
first: bool = False,
):
"""
If this decorator is applied to a function in an object that inherits
from :class:`.StatefulAutonomous`, it indicates that the function
is a state that will run for a set amount of time unless interrupted
The decorated function can have the following arguments in any order:
- ``tm`` - The number of seconds since autonomous has started
- ``state_tm`` - The number of seconds since this state has been active
(note: it may not start at zero!)
- ``initial_call`` - Set to True when the state is initially called,
False otherwise. If the state is switched to multiple times, this
will be set to True at the start of each state.
:param duration: The length of time to run the state before progressing
to the next state
:param next_state: The name of the next state. If not specified, then
this will be the last state executed if time expires
:param first: If True, this state will be ran first
"""
if f is None:
return functools.partial(
timed_state, duration=duration, next_state=next_state, first=first
)
if duration is None:
raise ValueError("timed_state functions must specify a duration")
wrapper = _State(f, first)
wrapper.next_state = next_state
wrapper.duration = duration
return wrapper
[docs]
def state(f: Optional[Callable] = None, first: bool = False):
"""
If this decorator is applied to a function in an object that inherits
from :class:`.StatefulAutonomous`, it indicates that the function
is a state. The state will continue to be executed until the
``next_state`` function is executed.
The decorated function can have the following arguments in any order:
- ``tm`` - The number of seconds since autonomous has started
- ``state_tm`` - The number of seconds since this state has been active
(note: it may not start at zero!)
- ``initial_call`` - Set to True when the state is initially called,
False otherwise. If the state is switched to multiple times, this
will be set to True at the start of each state.
:param first: If True, this state will be ran first
"""
if f is None:
return functools.partial(state, first=first)
return _State(f, first)
[docs]
class StatefulAutonomous:
"""
This object is designed to be used to implement autonomous modes that
can be used with the :class:`.AutonomousModeSelector` object to select
an appropriate autonomous mode. However, you don't have to.
This object is designed to meet the following goals:
- Supports simple built-in tuning of autonomous mode parameters via
SmartDashboard
- Easy to create autonomous modes that support state machine or
time-based operation
- Autonomous modes that are easy to read and understand
You use this by defining a class that inherits from ``StatefulAutonomous``.
To define each state, you use the :func:`timed_state` decorator on a
function. When each state is run, the decorated function will be
called. Decorated functions can receive the following parameters:
- ``tm`` - The number of seconds since autonomous has started
- ``state_tm`` - The number of seconds since this state has been active
(note: it may not start at zero!)
- ``initial_call`` - Set to True when the state is initially called,
False otherwise. If the state is switched to multiple times, this
will be set to True at the start of each state.
An example autonomous mode that drives the robot forward for 5 seconds
might look something like this::
from robotpy_ext.autonomous import StatefulAutonomous
class DriveForward(StatefulAutonomous):
MODE_NAME = 'Drive Forward'
def initialize(self):
pass
@timed_state(duration=0.5, next_state='drive_forward', first=True)
def drive_wait(self):
pass
@timed_state(duration=5)
def drive_forward(self):
self.drive.move(0, 1, 0)
Note that in this example, it is assumed that the DriveForward object
is initialized with a dictionary with a value 'drive' that contains
an object that has a move function::
components = {'drive': SomeObject() }
mode = DriveForward(components)
If you use this object with :class:`.AutonomousModeSelector`, make sure
to initialize it with the dictionary, and it will be passed to this
autonomous mode object when initialized.
.. seealso:: Check out the samples in our github repository that show
some basic usage of ``AutonomousModeSelector``.
"""
__built = False
__done = False
def __init__(self, components=None):
"""
:param components: A dictionary of values that will be assigned
as attributes to this object, using the key
names in the dictionary
:type components: dict
"""
if not hasattr(self, "MODE_NAME"):
raise ValueError("Must define MODE_NAME class variable")
if components:
for k, v in components.items():
setattr(self, k, v)
NetworkTables = ntcore.NetworkTableInstance.getDefault()
self.__table = NetworkTables.getTable("SmartDashboard")
self.__sd_args = []
self.__build_states()
self.__tunables = []
if hasattr(self, "initialize"):
self.initialize()
[docs]
def register_sd_var(self, name, default, add_prefix=True, vmin=-1, vmax=1):
"""
Register a variable that is tunable via NetworkTables/SmartDashboard
When this autonomous mode is enabled, all of the SmartDashboard
settings will be read and stored as attributes of this object. For
example, to register a variable 'foo' with a default value of 1::
self.register_sd_var('foo', 1)
This value will show up on NetworkTables as the key ``MODE_NAME\\foo``
if add_prefix is specified, otherwise as ``foo``.
:param name: Name of variable to display to user, cannot have a
space in it.
:param default: Default value of variable
:param add_prefix: Prefix this setting with the mode name
:type add_prefix: bool
:param vmin: For tuning: minimum value of this variable
:param vmax: For tuning: maximum value of this variable
"""
is_number = self.__register_sd_var_internal(name, default, add_prefix, True)
if not add_prefix:
return
# communicate the min/max value for numbers to the dashboard
if is_number:
name = "%s|%0.3f|%0.3f" % (name, vmin, vmax)
self.__tunables.append(name)
self.__table.putStringArray(self.MODE_NAME + "_tunables", self.__tunables)
def __register_sd_var_internal(self, name, default, add_prefix, readback):
if " " in name:
raise ValueError(
"ERROR: Cannot use spaces in a tunable variable name (%s)" % name
)
is_number = False
sd_name = name
if add_prefix:
sd_name = "%s\\%s" % (self.MODE_NAME, name)
if isinstance(default, bool):
self.__table.putBoolean(sd_name, default)
args = (name, sd_name, self.__table.getBoolean, default)
elif isinstance(default, int) or isinstance(default, float):
self.__table.putNumber(sd_name, default)
args = (name, sd_name, self.__table.getNumber, default)
is_number = True
elif isinstance(default, str):
self.__table.putString(sd_name, default)
args = (name, sd_name, self.__table.getString, default)
else:
raise ValueError("Invalid default value")
if readback:
self.__sd_args.append(args)
return is_number
def __build_states(self):
has_first = False
states = {}
cls = type(self)
# for each state function:
for name in dir(cls):
state = getattr(cls, name)
if not isinstance(state, _State):
continue
# is this the first state to execute?
if state.first:
if has_first:
raise ValueError(
"Multiple states were specified as the first state!"
)
self.__first = name
has_first = True
# problem: how do we expire old entries?
# -> what if we just use json? more flexible, but then we can't tune it
# via SmartDashboard
# make the time tunable
if hasattr(state, "duration"):
self.__register_sd_var_internal(
state.name + "_duration", state.duration, True, True
)
description = ""
if state.description is not None:
description = state.description
states[state.serial] = (state.name, description)
# problem: the user interface won't know which entries are the
# current variables being used by the robot. So, we setup
# an array with the names, and the dashboard uses that
# to determine the ordering too
sorted_states = sorted(states.items())
self.__table.putStringArray(
self.MODE_NAME + "_durations", [name for _, (name, desc) in sorted_states]
)
self.__table.putStringArray(
self.MODE_NAME + "_descriptions",
[desc for _, (name, desc) in sorted_states],
)
if not has_first:
raise ValueError(
"Starting state not defined! Use first=True on a state decorator"
)
self.__built = True
def _validate(self):
# TODO: make sure the state machine can be executed
# - run at robot time? Probably not. Run this as part of a unit test
pass
# how long does introspection take? do this in the constructor?
# can do things like add all of the timed states, and project how long
# it will take to execute it (don't forget about cycles!)
[docs]
def on_enable(self):
"""
Called when autonomous mode is enabled, and initializes the
state machine internals.
If you override this function, be sure to call it from your
customized ``on_enable`` function::
super().on_enable()
"""
if not self.__built:
raise ValueError("super().__init__(components) was never called!")
# print out the details of this autonomous mode, and any tunables
self.battery_voltage = wpilib.DriverStation.getBatteryVoltage()
logger.info("Battery voltage: %.02fv", self.battery_voltage)
logger.info("Tunable values:")
# read smart dashboard values, print them
for name, sd_name, fn, default in self.__sd_args:
val = fn(sd_name, default)
setattr(self, name, val)
logger.info("-> %25s: %s" % (name, val))
# set the starting state
self.next_state(self.__first)
self.__done = False
[docs]
def on_disable(self):
"""Called when the autonomous mode is disabled"""
pass
[docs]
def done(self):
"""Call this function to indicate that no more states should be called"""
self.next_state(None)
[docs]
def next_state(self, name):
"""Call this function to transition to the next state
:param name: Name of the state to transition to
"""
if name is not None:
self.__state = getattr(self.__class__, name)
else:
self.__state = None
if self.__state is None:
return
self.__state.ran = False
[docs]
def on_iteration(self, tm):
"""This function is called by the autonomous mode switcher, should
not be called by enduser code. It is called once per control
loop iteration."""
# if you get an error here, then you probably overrode on_enable,
# but didn't call super().on_enable(). Don't do that.
try:
state = self.__state
except AttributeError:
raise ValueError("super().on_enable was never called!")
# we adjust this so that if we have states chained together,
# then the total time it runs is the amount of time of the
# states. Otherwise, the time drifts.
new_state_start = tm
# determine if the time has passed to execute the next state
if state is not None and state.expires < tm:
self.next_state(state.next_state)
new_state_start = state.expires
state = self.__state
if state is None:
if not self.__done:
logger.info("%.3fs: Done with autonomous mode", tm)
self.__done = True
return
# is this the first time this was executed?
initial_call = not state.ran
if initial_call:
state.ran = True
state.start_time = new_state_start
state.expires = state.start_time + getattr(
self, state.name + "_duration", 0xFFFFFFFF
)
logger.info("%.3fs: Entering state: %s", tm, state.name)
# execute the state function, passing it the arguments
state.run(self, tm, tm - state.start_time, initial_call)