Source code for robotpy_ext.misc.simple_watchdog

import wpilib
import logging
from typing import List, Tuple

logger = logging.getLogger("simple_watchdog")

__all__ = ["SimpleWatchdog"]


[docs] class SimpleWatchdog: """A class that's a wrapper around a watchdog timer. When the timer expires, a message is printed to the console and an optional user-provided callback is invoked. The watchdog is initialized disabled, so the user needs to call enable() before use. .. note:: This is a simpler replacement for the :class:`wpilib.Watchdog`, and should function mostly the same (except that this watchdog will not detect infinite loops). .. warning:: This watchdog is not threadsafe """ # Used for timeout print rate-limiting kMinPrintPeriod = 1000000 # us def __init__(self, timeout: float): """Watchdog constructor. :param timeout: The watchdog's timeout in seconds with microsecond resolution. """ self._get_time = wpilib.RobotController.getFPGATime self._startTime = 0 # us self._timeout = int(timeout * 1e6) # us self._expirationTime = 0 # us self._lastTimeoutPrintTime = 0 # us self._lastEpochsPrintTime = 0 # us self._epochs: List[Tuple[str, int]] = []
[docs] def getTime(self) -> float: """Returns the time in seconds since the watchdog was last fed.""" return (self._get_time() - self._startTime) / 1e6
[docs] def setTimeout(self, timeout: float) -> None: """Sets the watchdog's timeout. :param timeout: The watchdog's timeout in seconds with microsecond resolution. """ self._epochs.clear() timeout = int(timeout * 1e6) # us self._timeout = timeout self._startTime = self._get_time() self._expirationTime = self._startTime + timeout
[docs] def getTimeout(self) -> float: """Returns the watchdog's timeout in seconds.""" return self._timeout / 1e6
[docs] def isExpired(self) -> bool: """Returns true if the watchdog timer has expired.""" return self._get_time() > self._expirationTime
[docs] def addEpoch(self, epochName: str) -> None: """ Adds time since last epoch to the list printed by printEpochs(). Epochs are a way to partition the time elapsed so that when overruns occur, one can determine which parts of an operation consumed the most time. :param epochName: The name to associate with the epoch. """ self._epochs.append((epochName, self._get_time()))
[docs] def printIfExpired(self) -> None: """Prints list of epochs added so far and their times.""" now = self._get_time() if ( now > self._expirationTime and now - self._lastEpochsPrintTime > self.kMinPrintPeriod ): self._lastEpochsPrintTime = now prev = self._startTime logger.warning("Watchdog not fed after %.6fs", (now - prev) / 1e6) epoch_logs = [] for key, value in self._epochs: time = (value - prev) / 1e6 epoch_logs.append(f"\t{key}: {time:.6f}") prev = value logger.info("Epochs:\n%s", "\n".join(epoch_logs))
[docs] def reset(self) -> None: """Resets the watchdog timer. This also enables the timer if it was previously disabled. """ self.enable()
[docs] def enable(self) -> None: """Enables the watchdog timer.""" self._epochs.clear() self._startTime = self._get_time() self._expirationTime = self._startTime + self._timeout
[docs] def disable(self) -> None: """Disables the watchdog timer."""
# .. this doesn't do anything