Flight supervisor API#
This page describes the API for adding custom states to Flight Supervisor.
fsup.genstate#
- genstate.guidance_modes(args)#
State class decorator to declare mandatory guidance modes.
ARGS is a list of unique guidance mode names (strings).
This allows to declare a set of guidance modes that are necessary for the state to be considered valid, before running the state-machine. The set of modes is compared to the actual set of available modes in guidance when activating a state machine.
The decoration applies to the decorated class and all its subclasses: in other words, the set of mandatory guidance modes is inherited.
It is still possible to request a change to a non-declared mode when executing the code of the state machine, but in that case the guidance mode may not exist in the system.
@guidance_modes('descent', 'fixed') class Rth(State): pass @guidance_modes('emergency_descent') class EmergencyLanding(State): pass
- genstate.State : public object
Base State class.
All new states must inherit this.
Public Functions
- can_enter(self, msg)#
Block transition if this returns False on the target state.
The transition message is given as an argument.
When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).
These methods are only checked for the lowest substates of the source and target states.
For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().
Due to how transitions are done, only implement this for stages or level 1 states (flying.manual, landing.normal etc.).
Note
This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.
- can_exit(self, msg)#
Block transition if this returns False on the source state.
The transition message is given as an argument.
When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).
These methods are only checked for the lowest substates of the source and target states.
For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().
Note
This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.
- enter(self, msg)#
Called when the state is entered, with the message causing the transition.
- exit(self, msg)#
Called when the state is exited, with the message causing the transition.
- get_config(self, keys)#
Return a subset of the full configuration with only the given keys.
Doing this in the state init ensures that the config keys are effectively available instead of raising an exception while running.
- on_activate(self)#
Called when the parent state machine is being activated.
States are activated in no particular order.
- on_deactivate(self)#
Called when the parent state machine is being deactivated.
States are deactivated in no particular order.
- set_guidance_mode(self, mode_name, config=None, camera_config_override=[])#
Helper function to set guidance modes.
This automatically uses
Any.Pack()
on theconfig
parameter.
- set_timeout(self, seconds, reason='undefined')#
Trigger State.step() with a time_out(reason) message after X seconds.
- step(self, msg)#
Called when a message is received while the state is active.
It is always called once after State.enter().
- wakeup_after(self, seconds, reason='wakeup')#
Trigger State.step() with a wake_up(reason) message after X seconds.
- wakeup_periodically(self, seconds, reason='tick')#
Trigger State.step() with a wake_up(reason) message every X seconds.
Attributes
- loop#
Opaque pomp loop object, can be used with libpomp. The Python API is automatically generated from the C headers.
import libpomp # Create timer self.alert_pomp_cb = libpomp.pomp_timer_cb_t(lambda t, d: self.alerts.process()) self.battery_alert_timer = libpomp.pomp_timer_new(self.loop, self.alert_pomp_cb, None) # Start timer libpomp.pomp_timer_set_periodic(self.battery_alert_timer, 1000, 1000) # Clear timer libpomp.pomp_timer_clear(self.battery_alert_timer) # Destroy timer object libpomp.pomp_timer_destroy(self.battery_alert_timer)
- log: logging.Logger#
Standard Logger object with the current state name already set.
self.log.debug("X = ...") self.log.info("Recalculating trajectory") self.log.warning("Inconsistent value for sensor ...") self.log.error("Unable to ...")
- mc: MessageCenter#
Global message_center.events instance. This can be used to either end events or observe them to trigger callbacks.
- parent: State#
Parent state/stage object. This can be used to store objects or call methods mutual to substates.
class Rth(State): def do_something(): pass class RthFlightplan(State): def step(): self.parent.do_something() class RthAscent(State): def step(): self.parent.do_something()
- name: str#
Full state name, including parents (ex: “flying.rth.ascent”)
- statuses: dict#
Dictionary containing various drone statuses for sensors and position. It contains the following values, or None when the value is unknown:
Key
Type
battery_critical
bool
battery_power_level
defective_motor_failure_type
defective_motor_id
defective_sensors
{v._name_: bool for v in
fsup.enums.Sensor
}fcam_acquisition_sensor_ok
bool
fcam_axes_stabilization_ok
bool
fcam_ok
bool
freefall_detected
bool
fstcam_acquisition_sensor_ok
bool
fstcam_axes_stabilization_ok
bool
fstcam_luminosity_ok
bool
fstcam_ok
bool
fstcam_stereo_calibration_ok
bool
full_steady
bool
gps_navigation_available
bool
gps_satellites
int
gps_sensor_ok
bool
ground_distance_defective
bool
horizontal_steady
bool
linear_velocity_reliable
bool
low_earth_field
bool
magneto_disturbance
bool
magneto_ok
bool
min_altitude_constrained
bool
motion_state
moving_ground_state
phome_in_progress
bool
precise_home_enabled
bool
rth_battery_capacity
too_much_angle
bool
too_much_angle_at_low_altitude
bool
vcam_led_on
bool
vertical_steady
bool
vibration_level
wind_status
yaw_steady
bool
if any(self.statuses['defective_sensors']): self.log.warning("Invalid sensor detected") gps_status = self.statuses['gps_navigation_available']
- settings: SettingServer#
Drone shared settings server, implements a wrapper of libshsettings. Drone settings can also be inspected and modified in command-line with shs-cli.
See Settings for an exhaustive list of available settings.
- settings.subscribe(setting, callback)#
Register a callback to be called when the setting is updated. The callback arguments are the following: event_type, name, previous_value, new_value.
- settings.get(setting, fallback=None) float | int | str | bool #
Get the current value of given setting. The fallback is returned if the setting is not set or doesn’t exist.
- settings.set(setting, value)#
Change setting value.
Warning
This method is type-sensitive, make sure to pass int or float accordingly when assigning numerical values.
def on_rotation_speed_changed(event_type, name, previous_value, new_value): pass self.setting.subscribe("autopilot.rotation_speed", self.on_rotation_speed_changed) self.setting.set("autopilot.rotation_speed", 90.0)
- home: HomePositionManager#
Home position manager
- home.enable_home_update(enable)#
Enable or disable home update (includes automatic first_fix position).
- home.set_custom_position(coord_geo) libparrot_physics_binding.CoordGeo3 #
Set a custom home position of type libparrot_physics_binding.CoordGeo3
- home.get_current_home() libparrot_physics_binding.CoordGeo3 #
Get current home coordinates array (latitude, longitude, altitude_ato).
- home.get_current_home_type() fsup.enums.HomeType #
Get current home type.
- home.get(home_type)#
Get current saved position for given fsup.enums.HomeType.
from fsup.enums import HomeType # Get takeoff position takeoff = self.home.get(HomeType.TAKEOFF) self.log.info("Takeoff position: {0:.6f}, {0:.6f}".format( takeoff.latitude, takeoff.longitude))
- tlm_dctl: TlmSection#
DroneController telemetry consumer instance.
self.tlm_dctl.fetch_sample()
current_lat = self.tlm_dctl["position_absolute.latitude"]
current_lon = self.tlm_dctl["position_absolute.longitude"]
fsup.genmission#
- genmission::AbstractMission.get_manager(self, class_type, create_if_needed=True)
Get the manager instance of type class_type registered for this mission. If no current manager of this class is currently registered, and if create_if_needed is True, a new one is created and registered automatically (otherwise, the return value is None).
- genmission::AbstractMission.require_manager(self, class_type)
Like get_manager, but only registers it if necessary
fsup.message_center#
- class Supervisor#
The following services and channels are registered by default:
Channel
Service
Messages
intl_channel
intl
dctl_channel
dctl
autopilot_channel
autopilot
gdnc_channel
gdnc
gdnc_channel
gdnc_axes_camctrl
gdnc_channel
gdnc_zoom_camctrl
heimdall_channel
heimdall
[heimdall.messages_pb2]
[heimdall.messages_pb2]: add link when published on dpc-dev
Public Functions
- make_message_center(self)#
Message Center#
Warning
doxygenclass: Cannot find class “message_center::message_center::MessageCenter” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Service#
Warning
doxygenclass: Cannot find class “message_center::service::Service” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Service Pair#
Warning
doxygenclass: Cannot find class “message_center::service::ServicePair” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Observer#
Warning
doxygenclass: Cannot find class “message_center::observer::Observer” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Warning
doxygenclass: Cannot find class “message_center::observer::Notifier” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
from fsup.genstate import State
class MyState(State):
def enter(self, msg):
self.obs = self.mc.observe({
"DroneController.ground_close": lambda *_: self.on_ground_close(True),
"DroneController.ground_not_close": lambda *_: self.on_ground_close(False),
"DroneController.target_reached":
lambda _, msg: self.on_target_reached(msg.target_reached),
})
def exit(self, msg):
self.obs.unobserve()
def on_ground_close(self, is_close):
self.log.info("Ground is close = {}".format(is_close))
def on_target_reached(self, msg):
if msg.all_reached:
self.log.info("All targets reached!")
else:
pass # ...
import logging
import fsup.events as events
import my_channel_pb2 as pbuf_my_channel # Custom protobuf
class MyMission:
def __init__(self, mission_environment):
self.log = logging.getLogger("MyMission")
def on_activate(self):
# Listen to custom events while the mission is active.
# These events can only be used for transitions when the channel is active.
self.my_channel = self.mc.start_client_channel('unix:@my-channel')
self.my_channel_svc = self.mc.attach_client_service_pair(
self.my_channel, pbuf_my_channel, forward_events=False)
self.obs = self.my_channel_svc.observe({
events.Service.ACTIVE : self._enable_some_feature(True),
events.Service.INACTIVE : self._enable_some_feature(False),
})
def on_deactivate(self):
self.obs.unobserve()
self.my_channel_svc = None # Destroy service
def _enable_some_feature(self, enable):
self.log.info("Feature enabled = %s", enable)
Events#
The message_center.events module contains special events that can be listened to with observers.
Warning
doxygenclass: Cannot find class “message_center::events::Channel” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Warning
doxygenclass: Cannot find class “message_center::events::Service” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
Warning
doxygenclass: Cannot find class “message_center::events::StateMachine” in doxygen xml output for project “flight-supervisor-api” from directory: /data/jenkins/jobs/sphinx-doc-release/workspace/out/doc-sdk/doc/flight-supervisor/xml
fsup.timers#
- class Timer#
The
fsup.timers
module defines aTimer
class on top of timer methods from libpomp (i.e.pomp_timer_new
, etc.).
For example, assuming there is a loop pomp-loop object and a logger log in the current scope:
def tick(timer):
log.info("tick")
t = Timer(loop, 1, tick, repeat=True)
t.start()
fsup.cache#
- class Cache#
Dictionary with notifications on value change.
A cache is initialized with a dictionary of initial key/values.
Clients may call observe() to attach callback to value changes (the callback are called when the value is different from the previous value, as defined by != operator).
IMPORTANT: observe() returns a CacheObserver instance; it is necessary to call its request_notifications() method once after creation to be notified of any prior state for the values being observed (doing so will trigger all the callbacks associated with the observer). In some cases it is sufficient for an object that manages the lifecycle of observers to call notify_all_observers() on the Cache instance.
Public Functions
- get(self, key)#
- make_observer(self, bindings)#
Make an instance of CacheObserver for the given key/values bindings.
No event is observed until a call to observe() is made on the resulting object.
- notify_all_observers(self)#
Notify all observers with current cache values.
- notify_observer(self, observer)#
- observe(self, bindings)#
Observe changes for a subset of keys in cache.
- Parameters:
bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.
- Returns:
a CacheObserver instance
- reset(self, notify=True)#
Reset to initial cache values.
- Parameters:
notify=True – also notify all observers for all keys
- set(self, key, value)#
- set_multiple(self, kwargs)#
Set multiple cache entries at the same time.
For example, the following:
self.set(‘a’, 1) self.set(‘b’, 2)
Can be written:
self.set_multiple(a=1, b=2)
The main difference is that all the observers are notified after all entries are changed. In the original code, some observer could see a cache with ‘a’ being set to 1 while ‘b’ being still set to its previous value. This might not be desirable if certain entries have to always be consistent with each other (alternatively, a similar effect can be achieved by storing a tuple of values under a single key).
fsup.enums#
- class fsup.enums.DefectiveMotorFailureType#
- NONE#
- MOTOR_STALLED#
- PROPELLER_SECURITY#
- RC_EMERGENCY_STOP#
- DEFECTIVE_PROP#
libparrot_physics_binding#
- class libparrot_physics_binding.CoordGeo3#
Represents a 3D geographic coordinate.
Methods
- __init__(self, latitude, longitude, altitude)#
Construct a new CoordGeo3 object.
- is_valid(self)#
Check if the CoordGeo3 object is a valid geographic position.
- distance(self, other)#
Compute the distance to another CoordGeo3 object.
- to_ned(self, ned_origin)#
Convert to a CoordNed3 object. ned_origin is a CoordGeo3 object representing the NED origin.
Attributes
- latitude#
- longitude#
- altitude#
- class libparrot_physics_binding.CoordNed3#
Represents a 3D NED coordinate.
Methods
- __init__(self, x, y, z)#
Construct a new CoordNed3 object
- distance(self, other)#
Compute the distance to another CoordNed3 object.
- to_geo(self, ned_origin)#
Convert to a CoordGeo3 object. ned_origin is a CoordGeo3 object representing the NED origin.
Attributes
- x#
- y#
- z#
Managers#
Core#
event_cache#
- features.event_cache.EventCache : public Feature
Cache drone events end forward them as autopilot events.
The event cache keep track of different drone states (also known as statuses), when receiving events from various processes: Flight supervisor itself (internal messages), Drone Controller, Guidance, and others.
It also behaves like an fsup.cache.Cache object, thereby allowing client code to build observers for a set of keys in the cache, associated with callbacks on value change.
The only API this manager exposes are the methods delegating to the inner Cache object (e.g. observe(), get(), set(), …)
Public Functions
- get(self, key)#
Get current value associated with key.
- observe(self, bindings)#
Observe changes for a subset of keys in cache.
- Parameters:
bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.
- Returns:
an observer as defined in cache.py
- set(self, key, value)#
Set current value for key (it notifies observers).
geofence_manager#
- features.geofence_manager.GeofenceManager : public Feature
Public Functions
- overwrite_user_setting(self, enable)#
Overwrite user setting to enable.
- restore_user_setting(self)#
Restore user setting.
- set_geofence_center_to_current_position(self)#
Set the geofence’s center to the current position.
Public Members
- statuses#
oa_manager#
- features.oa_manager.OaDroneState : public IntEnum
Public Functions
- is_active(self)#
- features.oa_manager.OaDroneAvailability : public IntEnum
Public Functions
- is_available(self)#
- features.oa_manager.OaDesiredMode : public IntEnum
Public Functions
- is_desired(self)#
- features.oa_manager.ObstacleAvoidanceManager : public Feature
Public Functions
- is_obstacle_avoidance_drone_available(self)#
Return True if obstacle avoidance is currently available in the drone, independently from the current guidance mode.
- overwrite_user_setting(self, desired_mode)#
Overwrite user setting for desired mode.
- restore_user_setting(self)#
Restore user setting for desired mode.
- set_frozen_alert(self, status)#
Drone frozen.
- set_stuck_alert(self, status)#
Force stuck alert.
Public Members
- statuses#
takeoff_readyness_manager#
- features.takeoff_readyness_manager.TakeOffReadynessManager : public Feature
Public Functions
- get_ready(self)#
video_manager#
- features.video_manager.VideoManager : public Feature
List of video modes with a high CPU usage.
To be used to prioritize recording over other CPU intensive tasks.
Public Functions
- check_current_config(self, removed_camera_modes=None, removed_recording_capabilities=None, removed_photo_capabilities=None)#
Check the current video configuration and return True if it is compatible with some potential given removed capabilities.
See VideoManager.remove_capabilities() for parameter description.
- configure(self, config)#
Reconfigure the video pipeline.
config
is a dictionnary with the fields of the arsdk.camera2.Config protobuf message.selected_fields
will then be automatically be filled.from arsdk import camera2_pb2 as pbuf_camera2 self.supervisor.video_manager.configure(config={ 'camera_mode': pbuf_camera2.CAMERA_MODE_RECORDING, 'video_recording_mode': pbuf_camera2.VIDEO_RECORDING_MODE_STANDARD, 'video_recording_resolution': pbuf_camera2.VIDEO_RESOLUTION_1080P, 'video_recording_framerate': pbuf_camera2.FRAMERATE_120, 'video_recording_dynamic_range': pbuf_camera2.DYNAMIC_RANGE_STANDARD, })
- get_config(self)#
Get the current configuration as a dictionnary with the fields of the arsdk.camera2.Config protobuf message.
- is_high_perf(self)#
Check if the current video configuration needs the CPUs to be available to be running without skips.
- lock_config(self)#
Lock the current configuration and prevent external application to modify it.
- remove_capabilities(self, camera_modes=None, recording_capabilities=None, photo_capabilities=None)#
Ask video config to remove some capabilities temporarily.
camera_modes
is an array of arsdk.camera2_pb2.CameraMode.recording_capabilities
is an array of tuples containing:resolution (ex arsdk.camera2_pb2.VIDEO_RESOLUTION_DCI_4K)
framerate (ex arsdk.camera2_pb2.FRAMERATE_30)
dynamic_range (ex arsdk.camera2_pb2.DYNAMIC_RANGE_HDR10)
photo_capabilities
is an array of tuples containing:resolution (ex arsdk.camera2_pb2.)
format (ex arsdk.camera2_pb2.)
file_format (ex arsdk.camera2_pb2.)
If None or an empty array is given it will restore the video capabilities to the default.
from arsdk import camera2_pb2 as pbuf_camera2 self.supervisor.video_manager.remove_capabilities( recording_capabilities=[ ( pbuf_camera2.VIDEO_RESOLUTION_DCI_4K, pbuf_camera2.FRAMERATE_30, pbuf_camera2.DYNAMIC_RANGE_HDR10, )], )
- restore_default_capabilities(self)#
Restore video capabilities to their default values, without any restrictions.
- unlock_config(self)#
Unlock the current configuration and allow modifications.