Flight supervisor API#
This page describes the API for adding custom states to Flight Supervisor.
fsup.genstate#
- genstate::State.guidance_modes(cls)
- genstate.State : public object
Base State class.
All new states must inherit this.
Public Functions
- can_enter(self, msg)#
Block transition if this returns False on the target state.
The transition message is given as an argument.
When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).
These methods are only checked for the lowest substates of the source and target states.
For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().
Due to how transitions are done, only implement this for stages or level 1 states (flying.manual, landing.normal etc.).
Note
This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.
- can_exit(self, msg)#
Block transition if this returns False on the source state.
The transition message is given as an argument.
When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).
These methods are only checked for the lowest substates of the source and target states.
For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().
Note
This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.
- enter(self, msg)#
Called when the state is entered, with the message causing the transition.
- exit(self, msg)#
Called when the state is exited, with the message causing the transition.
- get_config(self, keys)#
Return a subset of the full configuration with only the given keys.
Doing this in the state init ensures that the config keys are effectively available instead of raising an exception while running.
- on_activate(self)#
Called when the parent state machine is being activated.
States are activated in no particular order.
- on_deactivate(self)#
Called when the parent state machine is being deactivated.
States are deactivated in no particular order.
- set_timeout(self, seconds, reason='undefined')#
Trigger State.step() with a time_out(reason) message after X seconds.
- step(self, msg)#
Called when a message is received while the state is active.
It is always called once after State.enter().
- wakeup_after(self, seconds, reason='wakeup')#
Trigger State.step() with a wake_up(reason) message after X seconds.
- wakeup_periodically(self, seconds, reason='tick')#
Trigger State.step() with a wake_up(reason) message every X seconds.
Public Static Attributes
- set_guidance_mode = set_guidance_local_mode#
Attributes
- loop#
Opaque pomp loop object, can be used with libpomp. The Python API is automatically generated from the C headers.
import libpomp # Create timer self.alert_pomp_cb = libpomp.pomp_timer_cb_t(lambda t, d: self.alerts.process()) self.battery_alert_timer = libpomp.pomp_timer_new(self.loop, self.alert_pomp_cb, None) # Start timer libpomp.pomp_timer_set_periodic(self.battery_alert_timer, 1000, 1000) # Clear timer libpomp.pomp_timer_clear(self.battery_alert_timer) # Destroy timer object libpomp.pomp_timer_destroy(self.battery_alert_timer)
- log: logging.Logger#
Standard Logger object with the current state name already set.
self.log.debug("X = ...") self.log.info("Recalculating trajectory") self.log.warning("Inconsistent value for sensor ...") self.log.error("Unable to ...")
- mc: MessageCenter#
Global message_center.events instance. This can be used to either end events or observe them to trigger callbacks.
- parent: State#
Parent state/stage object. This can be used to store objects or call methods mutual to substates.
class Rth(State): def do_something(): pass class RthFlightplan(State): def step(): self.parent.do_something() class RthAscent(State): def step(): self.parent.do_something()
- name: str#
Full state name, including parents (ex: “flying.rth.ascent”)
- statuses: dict#
Dictionary containing various drone statuses for sensors and position. It contains the following values, or None when the value is unknown:
Key
Type
battery_critical
bool
battery_power_level
defective_motor_failure_type
defective_motor_id
defective_sensors
{v._name_: bool for v in
fsup.enums.Sensor
}fcam_acquisition_sensor_ok
bool
fcam_axes_stabilization_ok
bool
fcam_ok
bool
freefall_detected
bool
fstcam_acquisition_sensor_ok
bool
fstcam_axes_stabilization_ok
bool
fstcam_luminosity_ok
bool
fstcam_ok
bool
fstcam_stereo_calibration_ok
bool
full_steady
bool
gps_navigation_available
bool
gps_satellites
int
gps_sensor_ok
bool
ground_distance_defective
bool
horizontal_steady
bool
linear_velocity_reliable
bool
low_earth_field
bool
magneto_disturbance
bool
magneto_ok
bool
min_altitude_constrained
bool
motion_state
moving_ground_state
phome_in_progress
bool
precise_home_enabled
bool
rth_battery_capacity
too_much_angle
bool
too_much_angle_at_low_altitude
bool
vcam_led_on
bool
vertical_steady
bool
vibration_level
wind_status
yaw_steady
bool
if any(self.statuses['defective_sensors']): self.log.warning("Invalid sensor detected") gps_status = self.statuses['gps_navigation_available']
- settings: SettingServer#
Drone shared settings server, implements a wrapper of libshsettings. Drone settings can also be inspected and modified in command-line with shs-cli.
See Settings for an exhaustive list of available settings.
- settings.subscribe(setting, callback)#
Register a callback to be called when the setting is updated. The callback arguments are the following: event_type, name, previous_value, new_value.
- settings.get(setting, fallback=None) float | int | str | bool #
Get the current value of given setting. The fallback is returned if the setting is not set or doesn’t exist.
- settings.set(setting, value)#
Change setting value.
Warning
This method is type-sensitive, make sure to pass int or float accordingly when assigning numerical values.
def on_rotation_speed_changed(event_type, name, previous_value, new_value): pass self.setting.subscribe("autopilot.rotation_speed", self.on_rotation_speed_changed) self.setting.set("autopilot.rotation_speed", 90.0)
- home: HomePositionManager#
Home position manager
- home.enable_home_update(enable)#
Enable or disable home update (includes automatic first_fix position).
- home.set_custom_position(coord_geo) libparrot_physics_binding.CoordGeo3 #
Set a custom home position of type libparrot_physics_binding.CoordGeo3
- home.get_current_home() libparrot_physics_binding.CoordGeo3 #
Get current home coordinates array (latitude, longitude, altitude_ato).
- home.get_current_home_type() fsup.enums.HomeType #
Get current home type.
- home.get(home_type)#
Get current saved position for given fsup.enums.HomeType.
from fsup.enums import HomeType # Get takeoff position takeoff = self.home.get(HomeType.TAKEOFF) self.log.info("Takeoff position: {0:.6f}, {0:.6f}".format( takeoff.latitude, takeoff.longitude))
- tlm_dctl: TlmSection#
DroneController telemetry consumer instance.
self.tlm_dctl.fetch_sample()
current_lat = self.tlm_dctl["position_absolute.latitude"]
current_lon = self.tlm_dctl["position_absolute.longitude"]
fsup.genmission#
- genmission::AbstractMission.get_manager(self, class_type, create_if_needed=True)
Get the manager instance of type class_type registered for this mission. If no current manager of this class is currently registered, and if create_if_needed is True, a new one is created and registered automatically (otherwise, the return value is None).
- genmission::AbstractMission.require_manager(self, class_type)
Like get_manager, but only registers it if necessary
fsup.message_center#
- class Supervisor#
The following services and channels are registered by default:
Channel
Service
Messages
intl_channel
intl
dctl_channel
dctl
autopilot_channel
autopilot
gdnc_channel
gdnc
gdnc_channel
gdnc_axes_camctrl
gdnc_channel
gdnc_zoom_camctrl
gdnc_global_channel
gdnc_global
[guidance_global.messages_pb2]
heimdall_channel
heimdall
[heimdall.messages_pb2]
[heimdall.messages_pb2]: add link when published on dpc-dev
Public Functions
- make_message_center(self)#
Message Center#
- class MessageCenter#
This class is used to send and receive messages.
Public Functions
- attach_client_service_pair(self, channel, pbuf_module, forward_events=True)#
Attach a client service pair to a channel.
A client service pair sends commands and receive events.
channel: channel on which messages are exchanged.
pbuf_module: protobuf module with Command/Event description.
forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.
The returned object is a service.ServicePair object.
import guidance.descent_mode_pb2 as descent_mode_pb self.gdnc_fp_svc = self.mc.attach_client_service_pair( self.mc.gdnc_channel, descent_mode_pb, forward_events=True)
Note
When adding new protobuf modules, you need to use this method for the transitions to work in the state machine.
- attach_internal_service_pair(self, channel, pbuf_module, forward_events=True)#
Attach an internal service pair.
Internal messages are used to trigger transitions inside the state machine from within the state machine itself.
channel: channel on which messages are exchanged. The supervisor has a dedicated builtin channel for that supervisor.intl_channel.
pbuf_module: protobuf module with Command/Event description.
forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.
The returned object is a service.ServicePair object.
- attach_server_service_pair(self, channel, pbuf_module, forward_events=True)#
Attach a server service pair to a channel.
A server service pair sends commands and receive events.
channel: channel on which messages are exchanged.
pbuf_module: protobuf module with Command/Event description.
forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.
The returned object is a service.ServicePair object.
- detach_service_pair(self, svc_pair)#
Detach the given service pair and stop notifications of events/messages.
For symmetry with attach methods, the following methods also exist:
detach_client_service_pair
detach_server_service_pair
detach_internal_service_pair
- observe(self, bindings, src=None)#
Observe a set of events/messages notified in the message center.
‘bindings’ is a dictionnary of ‘event’: ‘callback’ pairs. See message_center.events for a list of possible events and the expected signature of the callback. To observe a specific message, ‘event’ can be the fully qualified name of the currently selected field in the oneof named ‘id’ (see by ‘msghub_utils.msg_id’).
if ‘src’ is given, only events/message matching the source are observed. ‘src’ can be a channel (for CONNECT/DISCONNECT) or a service pair (for INACTIVE/ACTIVE/MESSAGE).
The returned object shall be kept while the observation is required. Call its Observer.unobserve() method when done.
- start_client_channel(self, addr_str)#
Start a client channel with given address in ‘pomp’ format.
Channels are reused, meaning they will share the same socket connection. The message center will internally track the usage to close the connection when the last channel is stopped. The returned channel can then be used to attach one or more msghub services.
- start_internal_channel(self)#
Start an internal channel.
Such channel can be used to post message from one state to the state machine and then trigger some transitions.
- start_server_channel(self, addr_str)#
Start a server channel with given address in ‘pomp’ format.
The returned channel can then be used to attach one or more msghub services.
- stop_channel(self, channel)#
Stop the given channel.
As client channel are reused, only the last call to stop will actually close the socket connection.
Service#
- class Service#
A service represent a set of protobuf messages grouped either as ‘Command’ or ‘Event’.
The root message description (Command/Event) shall contain a ‘oneof’ field named ‘id’ with all possibles messages that can be sent or received.
To send a message, there is 2 ways:
Call Service.send with the Command/Event message correctly filled. Example:
msg = svc.alloc() msg.some_field = "some_value" svc.send(msg)
Call the generated Service.sender.<name> with only the sub-message corresponding to the ‘oneof’ ‘id’ sub-field. Example (same message):
svc.sender.some_field("some_value")
Available attributes:
msg: message factory, each method self.msg.<field> returns an instance of the service class, such that the “id” field is set to the selected field. Messages that have no inner field (e.g. “Void” messages) are preallocated and identical from one call to another.
msg_class: class of protobuf messages
service_name(
string
): fully qualified name of protobuf message.sender: catalog of sender methods. Each method builds a temporary message as-if by calling
msg
and directly calls the Service.send method of the associated Service.
Simple message from enum value:
import colibrylite.estimation_mode_pb2 as cbry_est self.mc.dctl.cmd.sender.set_estimation_mode(cbry_est.FLYING)
Complex message from field structure:
self.manager.messages_svc.evt.sender.hover(delay=self.land_delay)
Public Functions
- alloc(self, *args, **kwargs)#
Create a new instance of the protobuf message.
- observe(self, bindings)#
Helper that observe some events/messages related to this service only.
The returned object shall be kept while the observation is required. Call its ‘unobserve’ method when done.
- observe_messages(self, bindings)#
Similar to Service.observe method but the callback signature for messages is different.
Instead of having a full Command/Event messages, only the sub-message corresponding to the ‘oneof’ ‘id’ sub-field.
The returned object shall be kept while the observation is required. Call its ‘unobserve’ method when done.
- send(self, pbuf_msg)#
Send a protobuf message.
It shall be an instance of Command or Event.
Service Pair#
- class ServicePair#
Class holding a pair of msghub services found in a protobuf description the two services correspond to the special messages ‘Command’ and ‘Event’ and are stored in the respective attributes ‘cmd’ and ‘evt’.
For client pair, command can be sent and event received. For a server pair, command can be received and event sent. For an internal pair, only events are managed.
- Kind : public Enum
CLIENT - command can be sent and event received.
SERVER - command can be received and event sent.
INTERNAL - only events are managed.
Observer#
- class Observer#
This class is used to represent a set of events and their callbacks being observed in a notifier.
This instance is used as a handle when canceling from the notifier.
Public Functions
- unobserve(self)#
Stop observing and sending notifications for this observer.
- class Notifier#
Public Functions
- notify(self, evt, *args, src=None, extra=None, observer=None)#
Manually trigger a notification with given event.
- observe(self, bindings, src=None, cls=Observer)#
Observe events and trigger given callbacks when they occur.
Events are usually described as string for protobuf messages, but they can also be enum values from message_center.events.
- unobserve(self, observer)#
Stop observing for given Observer object.
This has the same effect as Observer.unobserve().
from fsup.genstate import State
class MyState(State):
def enter(self, msg):
self.obs = self.mc.observe({
"DroneController.ground_close": lambda *_: self.on_ground_close(True),
"DroneController.ground_not_close": lambda *_: self.on_ground_close(False),
"DroneController.target_reached":
lambda _, msg: self.on_target_reached(msg.target_reached),
})
def exit(self, msg):
self.obs.unobserve()
def on_ground_close(self, is_close):
self.log.info("Ground is close = {}".format(is_close))
def on_target_reached(self, msg):
if msg.all_reached:
self.log.info("All targets reached!")
else:
pass # ...
import logging
import fsup.events as events
import my_channel_pb2 as pbuf_my_channel # Custom protobuf
class MyMission:
def __init__(self, mission_environment):
self.log = logging.getLogger("MyMission")
def on_activate(self):
# Listen to custom events while the mission is active.
# These events can only be used for transitions when the channel is active.
self.my_channel = self.mc.start_client_channel('unix:@my-channel')
self.my_channel_svc = self.mc.attach_client_service_pair(
self.my_channel, pbuf_my_channel, forward_events=False)
self.obs = self.my_channel_svc.observe({
events.Service.ACTIVE : self._enable_some_feature(True),
events.Service.INACTIVE : self._enable_some_feature(False),
})
def on_deactivate(self):
self.obs.unobserve()
self.my_channel_svc = None # Destroy service
def _enable_some_feature(self, enable):
self.log.info("Feature enabled = %s", enable)
Events#
The message_center.events module contains special events that can be listened to with observers.
- events.Channel : public Enum
CONNECTED (Parameters: channel, connection)
DISCONNECTED (Parameters: channel, connection)
- events.Service : public Enum
Service events.
INACTIVE - No peer in any channel handling/serving this service (Parameters: service)
ACTIVE - At least one peer connected to a channel handling/service this service (Parameters: service)
MESSAGE - Received a message for this service (Parameters: service, protobuf message)
- events.StateMachine : public Enum
State machine events.
STATE_CHANGED (Parameters: current state as a list of hierarchical states) Ex: [“landing”, “normal”, “descent”]
RESET - No parameter
EXIT - No parameter
fsup.timers#
- class Timer#
The
fsup.timers
module defines aTimer
class on top of timer methods from libpomp (i.e.pomp_timer_new
, etc.).
For example, assuming there is a loop pomp-loop object and a logger log in the current scope:
def tick(timer):
log.info("tick")
t = Timer(loop, 1, tick, repeat=True)
t.start()
fsup.cache#
- class Cache#
Dictionary with notifications on value change.
A cache is initialized with a dictionary of initial key/values.
Clients may call observe() to attach callback to value changes (the callback are called when the value is different from the previous value, as defined by != operator).
IMPORTANT: observe() returns a CacheObserver instance; it is necessary to call its request_notifications() method once after creation to be notified of any prior state for the values being observed (doing so will trigger all the callbacks associated with the observer). In some cases it is sufficient for an object that manages the lifecycle of observers to call notify_all_observers() on the Cache instance.
Public Functions
- get(self, key)#
- make_observer(self, bindings)#
Make an instance of CacheObserver for the given key/values bindings.
No event is observed until a call to observe() is made on the resulting object.
- notify_all_observers(self)#
Notify all observers with current cache values.
- notify_observer(self, observer)#
- observe(self, bindings)#
Observe changes for a subset of keys in cache.
- Parameters:
bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.
- Returns:
a CacheObserver instance
- reset(self, *notify=True)#
Reset to initial cache values.
- Parameters:
notify=True – also notify all observers for all keys
- set(self, key, value)#
- set_multiple(self, **kwargs)#
Set multiple cache entries at the same time.
For example, the following:
self.set(‘a’, 1) self.set(‘b’, 2)
Can be written:
self.set_multiple(a=1, b=2)
The main difference is that all the observers are notified after all entries are changed. In the original code, some observer could see a cache with ‘a’ being set to 1 while ‘b’ being still set to its previous value. This might not be desirable if certain entries have to always be consistent with each other (alternatively, a similar effect can be achieved by storing a tuple of values under a single key).
fsup.enums#
- class fsup.enums.DefectiveMotorFailureType#
- NONE#
- MOTOR_STALLED#
- PROPELLER_SECURITY#
- RC_EMERGENCY_STOP#
- DEFECTIVE_PROP#
libparrot_physics_binding#
- class libparrot_physics_binding.CoordGeo3#
Represents a 3D geographic coordinate.
Methods
- __init__(self, latitude, longitude, altitude)#
Construct a new CoordGeo3 object.
- is_valid(self)#
Check if the CoordGeo3 object is a valid geographic position.
- distance(self, other)#
Compute the distance to another CoordGeo3 object.
- to_ned(self, ned_origin)#
Convert to a CoordNed3 object. ned_origin is a CoordGeo3 object representing the NED origin.
Attributes
- latitude#
- longitude#
- altitude#
- class libparrot_physics_binding.CoordNed3#
Represents a 3D NED coordinate.
Methods
- __init__(self, x, y, z)#
Construct a new CoordNed3 object
- distance(self, other)#
Compute the distance to another CoordNed3 object.
- to_geo(self, ned_origin)#
Convert to a CoordGeo3 object. ned_origin is a CoordGeo3 object representing the NED origin.
Attributes
- x#
- y#
- z#
Managers#
Core#
event_cache#
- features.event_cache.EventCache : public Feature
Cache drone events end forward them as autopilot events.
The event cache keep track of different drone states (also known as statuses), when receiving events from various processes: Flight supervisor itself (internal messages), Drone Controller, Guidance, and others.
It also behaves like an fsup.cache.Cache object, thereby allowing client code to build observers for a set of keys in the cache, associated with callbacks on value change.
The only API this manager exposes are the methods delegating to the inner Cache object (e.g. observe(), get(), set(), …)
Public Functions
- get(self, key)#
Get current value associated with key.
- observe(self, bindings)#
Observe changes for a subset of keys in cache.
- Parameters:
bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.
- Returns:
an observer as defined in cache.py
- set(self, key, value)#
Set current value for key (it notifies observers).
geofence_manager#
- features.geofence_manager.GeofenceManager : public Feature
Public Functions
- overwrite_user_setting(self, enable)#
Overwrite user setting to enable.
- restore_user_setting(self)#
Restore user setting.
- set_geofence_center_to_current_position(self)#
Set the geofence’s center to the current position.
Public Members
- statuses#
oa_manager#
- features.oa_manager.OaDroneState : public IntEnum
Public Functions
- is_active(self)#
- features.oa_manager.OaDroneAvailability : public IntEnum
Public Functions
- is_available(self)#
- features.oa_manager.OaDesiredMode : public IntEnum
Public Functions
- is_desired(self)#
- features.oa_manager.ObstacleAvoidanceManager : public Feature
Public Functions
- is_obstacle_avoidance_drone_available(self)#
Return True if obstacle avoidance is currently available in the drone, independently from the current guidance mode.
- overwrite_user_setting(self, desired_mode)#
Overwrite user setting for desired mode.
- restore_user_setting(self)#
Restore user setting for desired mode.
- set_frozen_alert(self, status)#
Drone frozen.
- set_stuck_alert(self, status)#
Force stuck alert.
Public Members
- statuses#
takeoff_readyness_manager#
- features.takeoff_readyness_manager.TakeOffReadynessManager : public Feature
Public Functions
- get_ready(self)#
video_manager#
- features.video_manager.VideoManager : public Feature
List of video modes with a high CPU usage.
To be used to prioritize recording over other CPU intensive tasks.
Public Functions
- check_current_config(self, removed_camera_modes=None, removed_recording_capabilities=None, removed_photo_capabilities=None)#
Check the current video configuration and return True if it is compatible with some potential given removed capabilities.
See VideoManager.remove_capabilities() for parameter description.
- configure(self, config)#
Reconfigure the video pipeline.
config
is a dictionnary with the fields of the arsdk.camera2.Config protobuf message.selected_fields
will then be automatically be filled.from arsdk import camera2_pb2 as pbuf_camera2 self.supervisor.video_manager.configure(config={ 'camera_mode': pbuf_camera2.CAMERA_MODE_RECORDING, 'video_recording_mode': pbuf_camera2.VIDEO_RECORDING_MODE_STANDARD, 'video_recording_resolution': pbuf_camera2.VIDEO_RESOLUTION_1080P, 'video_recording_framerate': pbuf_camera2.FRAMERATE_120, 'video_recording_dynamic_range': pbuf_camera2.DYNAMIC_RANGE_STANDARD, })
- get_config(self)#
Get the current configuration as a dictionnary with the fields of the arsdk.camera2.Config protobuf message.
- is_high_perf(self)#
Check if the current video configuration needs the CPUs to be available to be running without skips.
- lock_config(self)#
Lock the current configuration and prevent external application to modify it.
- remove_capabilities(self, camera_modes=None, recording_capabilities=None, photo_capabilities=None)#
Ask video config to remove some capabilities temporarily.
camera_modes
is an array of arsdk.camera2_pb2.CameraMode.recording_capabilities
is an array of tuples containing:resolution (ex arsdk.camera2_pb2.VIDEO_RESOLUTION_DCI_4K)
framerate (ex arsdk.camera2_pb2.FRAMERATE_30)
dynamic_range (ex arsdk.camera2_pb2.DYNAMIC_RANGE_HDR10)
photo_capabilities
is an array of tuples containing:resolution (ex arsdk.camera2_pb2.)
format (ex arsdk.camera2_pb2.)
file_format (ex arsdk.camera2_pb2.)
If None or an empty array is given it will restore the video capabilities to the default.
from arsdk import camera2_pb2 as pbuf_camera2 self.supervisor.video_manager.remove_capabilities( recording_capabilities=[ ( pbuf_camera2.VIDEO_RESOLUTION_DCI_4K, pbuf_camera2.FRAMERATE_30, pbuf_camera2.DYNAMIC_RANGE_HDR10, )], )
- restore_default_capabilities(self)#
Restore video capabilities to their default values, without any restrictions.
- unlock_config(self)#
Unlock the current configuration and allow modifications.