Source code for magicbot.magicrobot

import contextlib
import inspect
import logging
import sys
import types
import typing

from typing import Any, Callable, Dict, List, Tuple

import hal
import wpilib

from ntcore import NetworkTableInstance, NetworkTableEntry

# from wpilib.shuffleboard import Shuffleboard

from robotpy_ext.autonomous import AutonomousModeSelector
from robotpy_ext.misc import NotifierDelay
from robotpy_ext.misc.simple_watchdog import SimpleWatchdog

from .inject import get_injection_requests, find_injections
from .magic_tunable import setup_tunables, tunable, collect_feedbacks
from .magic_reset import collect_resets

__all__ = ["MagicRobot"]


class MagicInjectError(ValueError):
    pass


[docs] class MagicRobot(wpilib.RobotBase): """ Robots that use the MagicBot framework should use this as their base robot class. If you use this as your base, you must implement the following methods: - :meth:`createObjects` - :meth:`teleopPeriodic` MagicRobot uses the :class:`.AutonomousModeSelector` to allow you to define multiple autonomous modes and to select one of them via the SmartDashboard/Shuffleboard. MagicRobot will set the following NetworkTables variables automatically: - ``/robot/mode``: one of 'disabled', 'auto', 'teleop', or 'test' - ``/robot/is_simulation``: True/False - ``/robot/is_ds_attached``: True/False """ #: Amount of time each loop takes (default is 20ms) control_loop_wait_time = 0.020 #: Error report interval: when an FMS is attached, how often should #: uncaught exceptions be reported? error_report_interval = 0.5 #: A Python logging object that you can use to send messages to the log. #: It is recommended to use this instead of print statements. logger = logging.getLogger("robot") #: If True, teleopPeriodic will be called in autonomous mode use_teleop_in_autonomous = False def __init__(self) -> None: super().__init__() self._exclude_from_injection = ["logger"] self.__last_error_report = -10 self._components: List[Tuple[str, Any]] = [] self._feedbacks: List[Tuple[Callable[[], Any], NetworkTableEntry]] = [] self._reset_components: List[Tuple[Dict[str, Any], Any]] = [] self.__done = False # cache these self.__is_ds_attached = wpilib.DriverStation.isDSAttached self.__sd_update = wpilib.SmartDashboard.updateValues self.__lv_update = wpilib.LiveWindow.updateValues # self.__sf_update = Shuffleboard.update def _simulationInit(self) -> None: pass def _simulationPeriodic(self) -> None: pass def __simulationPeriodic(self) -> None: hal.simPeriodicBefore() self._simulationPeriodic() hal.simPeriodicAfter() def robotInit(self) -> None: """ .. warning:: Internal API, don't override; use :meth:`createObjects` instead """ # Create the user's objects and stuff here self.createObjects() # Load autonomous modes self._automodes = AutonomousModeSelector("autonomous") # Next, create the robot components and wire them together self._create_components() # cache these self.__is_ds_attached = wpilib.DriverStation.isDSAttached self.__sd_update = wpilib.SmartDashboard.updateValues self.__lv_update = wpilib.LiveWindow.updateValues # self.__sf_update = Shuffleboard.update self.__nt = NetworkTableInstance.getDefault().getTable("/robot") self.__nt_put_is_ds_attached = self.__nt.getEntry("is_ds_attached").setBoolean self.__nt_put_mode = self.__nt.getEntry("mode").setString self.__nt.putBoolean("is_simulation", self.isSimulation()) self.__nt_put_is_ds_attached(self.__is_ds_attached()) self.watchdog = SimpleWatchdog(self.control_loop_wait_time) self.__periodics: List[Tuple[Callable[[], None], str]] = [ (self.robotPeriodic, "robotPeriodic()"), ] if self.isSimulation(): self._simulationInit() self.__periodics.append((self.__simulationPeriodic, "simulationPeriodic()"))
[docs] def createObjects(self) -> None: """ You should override this and initialize all of your wpilib objects here (and not in your components, for example). This serves two purposes: - It puts all of your motor/sensor initialization in the same place, so that if you need to change a port/pin number it makes it really easy to find it. Additionally, if you want to create a simplified robot program to test a specific thing, it makes it really easy to copy/paste it elsewhere - It allows you to use the magic injection mechanism to share variables between components .. note:: Do not access your magic components in this function, as their instances have not been created yet. Do not create them either. """ raise NotImplementedError
[docs] def autonomousInit(self) -> None: """Initialization code for autonomous mode may go here. Users may override this method for initialization code which will be called each time the robot enters autonomous mode, regardless of the selected autonomous mode. This can be useful for code that must be run at the beginning of a match. .. note:: This method is called after every component's ``on_enable`` method, but before the selected autonomous mode's ``on_enable`` method. """ pass
[docs] def teleopInit(self) -> None: """ Initialization code for teleop control code may go here. Users may override this method for initialization code which will be called each time the robot enters teleop mode. .. note:: The ``on_enable`` functions of all components are called before this function is called. """ pass
[docs] def teleopPeriodic(self): """ Periodic code for teleop mode should go here. Users should override this method for code which will be called periodically at a regular rate while the robot is in teleop mode. This code executes before the ``execute`` functions of all components are called. .. note:: If you want this function to be called in autonomous mode, set ``use_teleop_in_autonomous`` to True in your robot class. """ func = self.teleopPeriodic.__func__ if not hasattr(func, "firstRun"): self.logger.warning( "Default MagicRobot.teleopPeriodic() method... Override me!" ) func.firstRun = False
[docs] def disabledInit(self) -> None: """ Initialization code for disabled mode may go here. Users may override this method for initialization code which will be called each time the robot enters disabled mode. .. note:: The ``on_disable`` functions of all components are called before this function is called. """ pass
[docs] def disabledPeriodic(self): """ Periodic code for disabled mode should go here. Users should override this method for code which will be called periodically at a regular rate while the robot is in disabled mode. This code executes before the ``execute`` functions of all components are called. """ func = self.disabledPeriodic.__func__ if not hasattr(func, "firstRun"): self.logger.warning( "Default MagicRobot.disabledPeriodic() method... Override me!" ) func.firstRun = False
[docs] def testInit(self) -> None: """Initialization code for test mode should go here. Users should override this method for initialization code which will be called each time the robot enters disabled mode. """ pass
[docs] def testPeriodic(self) -> None: """Periodic code for test mode should go here.""" pass
[docs] def robotPeriodic(self) -> None: """ Periodic code for all modes should go here. Users must override this method to utilize it but it is not required. This function gets called last in each mode. You may use it for any code you need to run during all modes of the robot (e.g NetworkTables updates) The default implementation will update SmartDashboard, LiveWindow and Shuffleboard. """ watchdog = self.watchdog self.__sd_update() watchdog.addEpoch("SmartDashboard") self.__lv_update() watchdog.addEpoch("LiveWindow")
# self.__sf_update() # watchdog.addEpoch("Shuffleboard")
[docs] def onException(self, forceReport: bool = False) -> None: """ This function must *only* be called when an unexpected exception has occurred that would otherwise crash the robot code. Use this inside your :meth:`operatorActions` function. If the FMS is attached (eg, during a real competition match), this function will return without raising an error. However, it will try to report one-off errors to the Driver Station so that it will be recorded in the Driver Station Log Viewer. Repeated errors may not get logged. Example usage:: def teleopPeriodic(self): try: if self.joystick.getTrigger(): self.shooter.shoot() except: self.onException() try: if self.joystick.getRawButton(2): self.ball_intake.run() except: self.onException() # and so on... :param forceReport: Always report the exception to the DS. Don't set this to True """ # If the FMS is not attached, crash the robot program if not wpilib.DriverStation.isFMSAttached(): raise # Otherwise, if the FMS is attached then try to report the error via # the driver station console. Maybe. now = wpilib.Timer.getFPGATimestamp() try: if ( forceReport or (now - self.__last_error_report) > self.error_report_interval ): wpilib.reportError("Unexpected exception", True) except: pass # ok, can't do anything here self.__last_error_report = now
[docs] @contextlib.contextmanager def consumeExceptions(self, forceReport: bool = False): """ This returns a context manager which will consume any uncaught exceptions that might otherwise crash the robot. Example usage:: def teleopPeriodic(self): with self.consumeExceptions(): if self.joystick.getTrigger(): self.shooter.shoot() with self.consumeExceptions(): if self.joystick.getRawButton(2): self.ball_intake.run() # and so on... :param forceReport: Always report the exception to the DS. Don't set this to True .. seealso:: :meth:`onException` for more details """ try: yield except: self.onException(forceReport=forceReport)
# # Internal API #
[docs] def startCompetition(self) -> None: """ This runs the mode-switching loop. .. warning:: Internal API, don't override """ # TODO: usage reporting? self.robotInit() # Tell the DS the robot is ready to be enabled hal.observeUserProgramStarting() while not self.__done: isEnabled, isAutonomous, isTest = self.getControlState() if not isEnabled: self._disabled() elif isAutonomous: self.autonomous() elif isTest: self._test() else: self._operatorControl()
[docs] def endCompetition(self) -> None: self.__done = True self._automodes.endCompetition()
def autonomous(self) -> None: """ MagicRobot will do The Right Thing and automatically load all autonomous mode routines defined in the autonomous folder. .. warning:: Internal API, don't override """ self.__nt_put_mode("auto") self.__nt_put_is_ds_attached(self.__is_ds_attached()) self._on_mode_enable_components() try: self.autonomousInit() except: self.onException(forceReport=True) auto_functions: Tuple[Callable[[], None], ...] = (self._enabled_periodic,) if self.use_teleop_in_autonomous: auto_functions = (self.teleopPeriodic,) + auto_functions self._automodes.run( self.control_loop_wait_time, auto_functions, self.onException, watchdog=self.watchdog, ) self._on_mode_disable_components() def _disabled(self) -> None: """ This function is called in disabled mode. You should not override this function; rather, you should override the :meth:`disabledPeriodic` function instead. .. warning:: Internal API, don't override """ watchdog = self.watchdog watchdog.reset() self.__nt_put_mode("disabled") ds_attached = None self._on_mode_disable_components() try: self.disabledInit() except: self.onException(forceReport=True) watchdog.addEpoch("disabledInit()") refreshData = wpilib.DriverStation.refreshData DSControlWord = wpilib.DSControlWord with NotifierDelay(self.control_loop_wait_time) as delay: while not self.__done: refreshData() cw = DSControlWord() if cw.isEnabled(): break if ds_attached != cw.isDSAttached(): ds_attached = not ds_attached self.__nt_put_is_ds_attached(ds_attached) hal.observeUserProgramDisabled() try: self.disabledPeriodic() except: self.onException() watchdog.addEpoch("disabledPeriodic()") self._do_periodics() # watchdog.disable() watchdog.printIfExpired() delay.wait() watchdog.reset() def _operatorControl(self) -> None: """ This function is called in teleoperated mode. You should not override this function; rather, you should override the :meth:`teleopPeriodics` function instead. .. warning:: Internal API, don't override """ watchdog = self.watchdog watchdog.reset() self.__nt_put_mode("teleop") # don't need to update this during teleop -- presumably will switch # modes when ds is no longer attached self.__nt_put_is_ds_attached(self.__is_ds_attached()) # initialize things self._on_mode_enable_components() try: self.teleopInit() except: self.onException(forceReport=True) watchdog.addEpoch("teleopInit()") observe = hal.observeUserProgramTeleop refreshData = wpilib.DriverStation.refreshData isTeleopEnabled = wpilib.DriverStation.isTeleopEnabled with NotifierDelay(self.control_loop_wait_time) as delay: while not self.__done: refreshData() if not isTeleopEnabled(): break observe() try: self.teleopPeriodic() except: self.onException() watchdog.addEpoch("teleopPeriodic()") self._enabled_periodic() # watchdog.disable() watchdog.printIfExpired() delay.wait() watchdog.reset() self._on_mode_disable_components() def _test(self) -> None: """Called when the robot is in test mode""" watchdog = self.watchdog watchdog.reset() self.__nt_put_mode("test") self.__nt_put_is_ds_attached(self.__is_ds_attached()) wpilib.LiveWindow.setEnabled(True) # Shuffleboard.enableActuatorWidgets() try: self.testInit() except: self.onException(forceReport=True) watchdog.addEpoch("testInit()") refreshData = wpilib.DriverStation.refreshData DSControlWord = wpilib.DSControlWord with NotifierDelay(self.control_loop_wait_time) as delay: while not self.__done: refreshData() cw = DSControlWord() if not (cw.isTest() and cw.isEnabled()): break hal.observeUserProgramTest() try: self.testPeriodic() except: self.onException() watchdog.addEpoch("testPeriodic()") self._do_periodics() # watchdog.disable() watchdog.printIfExpired() delay.wait() watchdog.reset() wpilib.LiveWindow.setEnabled(False) # Shuffleboard.disableActuatorWidgets() def _on_mode_enable_components(self) -> None: # initialize things for _, component in self._components: on_enable = getattr(component, "on_enable", None) if on_enable is not None: try: on_enable() except: self.onException(forceReport=True) def _on_mode_disable_components(self) -> None: # deinitialize things for _, component in self._components: on_disable = getattr(component, "on_disable", None) if on_disable is not None: try: on_disable() except: self.onException(forceReport=True) def _create_components(self) -> None: # # TODO: Will need to inject into any autonomous mode component # too, as they're a bit different # # TODO: Will need to order state machine components before # other components just in case components = [] self.logger.info("Creating magic components") # Identify all of the types, and create them cls = type(self) # - Iterate over class variables with type annotations # .. this hack is necessary for pybind11 based modules sys.modules["pybind11_builtins"] = types.SimpleNamespace() # type: ignore injectables = self._collect_injectables() for m, ctyp in typing.get_type_hints(cls).items(): # Ignore private variables if m.startswith("_"): continue # If the variable has been set, skip it if hasattr(self, m): continue # If the type is not actually a type, give a meaningful error if not isinstance(ctyp, type): raise TypeError( "%s has a non-type annotation on %s (%r); lone non-injection variable annotations are disallowed, did you want to assign a static variable?" % (cls.__name__, m, ctyp) ) component = self._create_component(m, ctyp, injectables) # Store for later components.append((m, component)) injectables[m] = component # For each new component, perform magic injection for cname, component in components: setup_tunables(component, cname, "components") self._setup_vars(cname, component, injectables) self._setup_reset_vars(component) # Do it for autonomous modes too for mode in self._automodes.modes.values(): mode.logger = logging.getLogger(mode.MODE_NAME) setup_tunables(mode, mode.MODE_NAME, "autonomous") self._setup_vars(mode.MODE_NAME, mode, injectables) # And for self too setup_tunables(self, "robot", None) self._feedbacks += collect_feedbacks(self, "robot", None) # Call setup functions for components for cname, component in components: setup = getattr(component, "setup", None) if setup is not None: setup() # ... and grab all the feedback methods self._feedbacks += collect_feedbacks(component, cname, "components") # Call setup functions for autonomous modes for mode in self._automodes.modes.values(): if hasattr(mode, "setup"): mode.setup() self._components = components def _collect_injectables(self) -> Dict[str, Any]: injectables = {} cls = type(self) for n in dir(self): if ( n.startswith("_") or n in self._exclude_from_injection or isinstance(getattr(cls, n, None), (property, tunable)) ): continue o = getattr(self, n) # Don't inject methods # TODO: This could actually be a cool capability.. if inspect.ismethod(o): continue injectables[n] = o return injectables def _create_component(self, name: str, ctyp: type, injectables: Dict[str, Any]): type_hints = typing.get_type_hints(ctyp.__init__) NoneType = type(None) init_return_type = type_hints.pop("return", NoneType) assert ( init_return_type is NoneType ), f"{ctyp!r} __init__ had an unexpected non-None return type hint" requests = get_injection_requests(type_hints, name) injections = find_injections(requests, injectables, name) # Create instance, set it on self component = ctyp(**injections) setattr(self, name, component) # Ensure that mandatory methods are there if not callable(getattr(component, "execute", None)): raise ValueError( "Component %s (%r) must have a method named 'execute'" % (name, component) ) # Automatically inject a logger object component.logger = logging.getLogger(name) self.logger.info("-> %s (class: %s)", name, ctyp.__name__) return component def _setup_vars(self, cname: str, component, injectables: Dict[str, Any]) -> None: self.logger.debug("Injecting magic variables into %s", cname) type_hints = typing.get_type_hints(type(component)) requests = get_injection_requests(type_hints, cname, component) injections = find_injections(requests, injectables, cname) component.__dict__.update(injections) def _setup_reset_vars(self, component) -> None: reset_dict = collect_resets(type(component)) if reset_dict: component.__dict__.update(reset_dict) self._reset_components.append((reset_dict, component)) def _do_periodics(self) -> None: """Run periodic methods which run in every mode.""" watchdog = self.watchdog for method, entry in self._feedbacks: try: value = method() except: self.onException() else: entry.setValue(value) watchdog.addEpoch("@magicbot.feedback") for periodic, name in self.__periodics: periodic() watchdog.addEpoch(name) def _enabled_periodic(self) -> None: """Run components and all periodic methods.""" watchdog = self.watchdog for name, component in self._components: try: component.execute() except: self.onException() watchdog.addEpoch(name) self._do_periodics() for reset_dict, component in self._reset_components: component.__dict__.update(reset_dict)