Flight supervisor API#

This page describes the API for adding custom states to Flight Supervisor.

fsup.genstate#

genstate::State.guidance_modes(cls)
genstate.State : public object

Base State class.

All new states must inherit this.

Public Functions

can_enter(self, msg)#

Block transition if this returns False on the target state.

The transition message is given as an argument.

When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).

These methods are only checked for the lowest substates of the source and target states.

For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().

Due to how transitions are done, only implement this for stages or level 1 states (flying.manual, landing.normal etc.).

Note

This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.

can_exit(self, msg)#

Block transition if this returns False on the source state.

The transition message is given as an argument.

When implementing this, make sure to check the transition message in order not to block critical behavior (emergency landing etc.).

These methods are only checked for the lowest substates of the source and target states.

For instance, a transition from hovering.manual.gotofix to flying.rth.waypoint_path will only check gotofix.can_exit() and waypoint_path.can_enter().

Note

This can be used to make conditional transitions with fallbacks. If a transition fails because of can_enter/exit(), the state machine will try to apply the next transition in the list.

enter(self, msg)#

Called when the state is entered, with the message causing the transition.

exit(self, msg)#

Called when the state is exited, with the message causing the transition.

get_config(self, keys)#

Return a subset of the full configuration with only the given keys.

Doing this in the state init ensures that the config keys are effectively available instead of raising an exception while running.

on_activate(self)#

Called when the parent state machine is being activated.

States are activated in no particular order.

on_deactivate(self)#

Called when the parent state machine is being deactivated.

States are deactivated in no particular order.

set_timeout(self, seconds, reason='undefined')#

Trigger State.step() with a time_out(reason) message after X seconds.

step(self, msg)#

Called when a message is received while the state is active.

It is always called once after State.enter().

wakeup_after(self, seconds, reason='wakeup')#

Trigger State.step() with a wake_up(reason) message after X seconds.

wakeup_periodically(self, seconds, reason='tick')#

Trigger State.step() with a wake_up(reason) message every X seconds.

Public Static Attributes

set_guidance_mode = set_guidance_local_mode#

Attributes

loop#

Opaque pomp loop object, can be used with libpomp. The Python API is automatically generated from the C headers.

import libpomp

# Create timer
self.alert_pomp_cb = libpomp.pomp_timer_cb_t(lambda t, d: self.alerts.process())
self.battery_alert_timer = libpomp.pomp_timer_new(self.loop, self.alert_pomp_cb, None)

# Start timer
libpomp.pomp_timer_set_periodic(self.battery_alert_timer, 1000, 1000)

# Clear timer
libpomp.pomp_timer_clear(self.battery_alert_timer)

# Destroy timer object
libpomp.pomp_timer_destroy(self.battery_alert_timer)
log: logging.Logger#

Standard Logger object with the current state name already set.

self.log.debug("X = ...")
self.log.info("Recalculating trajectory")
self.log.warning("Inconsistent value for sensor ...")
self.log.error("Unable to ...")
mc: MessageCenter#

Global message_center.events instance. This can be used to either end events or observe them to trigger callbacks.

parent: State#

Parent state/stage object. This can be used to store objects or call methods mutual to substates.

class Rth(State):
    def do_something():
        pass

class RthFlightplan(State):
    def step():
        self.parent.do_something()

class RthAscent(State):
    def step():
        self.parent.do_something()
name: str#

Full state name, including parents (ex: “flying.rth.ascent”)

statuses: dict#

Dictionary containing various drone statuses for sensors and position. It contains the following values, or None when the value is unknown:

Key

Type

battery_critical

bool

battery_power_level

fsup.enums.BatteryAlertLevel

defective_motor_failure_type

fsup.enums.DefectiveMotorFailureType

defective_motor_id

fsup.enums.DefectiveMotorId

defective_sensors

{v._name_: bool for v in fsup.enums.Sensor}

fcam_acquisition_sensor_ok

bool

fcam_axes_stabilization_ok

bool

fcam_ok

bool

freefall_detected

bool

fstcam_acquisition_sensor_ok

bool

fstcam_axes_stabilization_ok

bool

fstcam_luminosity_ok

bool

fstcam_ok

bool

fstcam_stereo_calibration_ok

bool

full_steady

bool

gps_navigation_available

bool

gps_satellites

int

gps_sensor_ok

bool

ground_distance_defective

bool

horizontal_steady

bool

linear_velocity_reliable

bool

low_earth_field

bool

magneto_disturbance

bool

magneto_ok

bool

min_altitude_constrained

bool

motion_state

fsup.enums.MotionState

moving_ground_state

ColibryLite::Messages::EventDetectionState

phome_in_progress

bool

precise_home_enabled

bool

rth_battery_capacity

fsup.enums.RthBatteryCapacity

too_much_angle

bool

too_much_angle_at_low_altitude

bool

vcam_led_on

bool

vertical_steady

bool

vibration_level

fsup.enums.VibrationLevel

wind_status

fsup.enums.WindStatus

yaw_steady

bool

if any(self.statuses['defective_sensors']):
    self.log.warning("Invalid sensor detected")
gps_status = self.statuses['gps_navigation_available']
settings: SettingServer#

Drone shared settings server, implements a wrapper of libshsettings. Drone settings can also be inspected and modified in command-line with shs-cli.

See Settings for an exhaustive list of available settings.

settings.subscribe(setting, callback)#

Register a callback to be called when the setting is updated. The callback arguments are the following: event_type, name, previous_value, new_value.

settings.get(setting, fallback=None) float | int | str | bool#

Get the current value of given setting. The fallback is returned if the setting is not set or doesn’t exist.

settings.set(setting, value)#

Change setting value.

Warning

This method is type-sensitive, make sure to pass int or float accordingly when assigning numerical values.

def on_rotation_speed_changed(event_type, name, previous_value, new_value):
    pass
self.setting.subscribe("autopilot.rotation_speed", self.on_rotation_speed_changed)
self.setting.set("autopilot.rotation_speed", 90.0)
home: HomePositionManager#

Home position manager

home.enable_home_update(enable)#

Enable or disable home update (includes automatic first_fix position).

home.set_custom_position(coord_geo) libparrot_physics_binding.CoordGeo3#

Set a custom home position of type libparrot_physics_binding.CoordGeo3

home.get_current_home() libparrot_physics_binding.CoordGeo3#

Get current home coordinates array (latitude, longitude, altitude_ato).

home.get_current_home_type() fsup.enums.HomeType#

Get current home type.

home.get(home_type)#

Get current saved position for given fsup.enums.HomeType.

from fsup.enums import HomeType

# Get takeoff position
takeoff = self.home.get(HomeType.TAKEOFF)
self.log.info("Takeoff position: {0:.6f}, {0:.6f}".format(
    takeoff.latitude, takeoff.longitude))
tlm_dctl: TlmSection#

DroneController telemetry consumer instance.

self.tlm_dctl.fetch_sample()
current_lat = self.tlm_dctl["position_absolute.latitude"]
current_lon = self.tlm_dctl["position_absolute.longitude"]
State.mission: Mission#

Current mission object.

fsup.genmission#

genmission::AbstractMission.get_manager(self, class_type, create_if_needed=True)
Get the manager instance of type class_type registered for
this mission.

If no current manager of this class is currently registered, and if
create_if_needed is True, a new one is created and registered
automatically (otherwise, the return value is None).
genmission::AbstractMission.require_manager(self, class_type)
Like get_manager, but only registers it if necessary

fsup.message_center#

class Supervisor#

The following services and channels are registered by default:

Channel

Service

Messages

intl_channel

intl

flight_supervisor.internal_pb2

dctl_channel

dctl

drone_controller.drone_controller_pb2

autopilot_channel

autopilot

flight_supervisor.autopilot_dmip2pbuf

gdnc_channel

gdnc

guidance.guidance_pb2

gdnc_channel

gdnc_axes_camctrl

guidance.axes_cam_controller_pb2

gdnc_channel

gdnc_zoom_camctrl

guidance.zoom_cam_controller_pb2

gdnc_global_channel

gdnc_global

[guidance_global.messages_pb2]

heimdall_channel

heimdall

[heimdall.messages_pb2]

[heimdall.messages_pb2]: add link when published on dpc-dev

Public Functions

make_message_center(self)#

Message Center#

class MessageCenter#

This class is used to send and receive messages.

Public Functions

attach_client_service_pair(self, channel, pbuf_module, forward_events=True)#

Attach a client service pair to a channel.

A client service pair sends commands and receive events.

channel: channel on which messages are exchanged.

pbuf_module: protobuf module with Command/Event description.

forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.

The returned object is a service.ServicePair object.

import guidance.descent_mode_pb2 as descent_mode_pb

self.gdnc_fp_svc = self.mc.attach_client_service_pair(
    self.mc.gdnc_channel, descent_mode_pb, forward_events=True)

Note

When adding new protobuf modules, you need to use this method for the transitions to work in the state machine.

attach_internal_service_pair(self, channel, pbuf_module, forward_events=True)#

Attach an internal service pair.

Internal messages are used to trigger transitions inside the state machine from within the state machine itself.

channel: channel on which messages are exchanged. The supervisor has a dedicated builtin channel for that supervisor.intl_channel.

pbuf_module: protobuf module with Command/Event description.

forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.

The returned object is a service.ServicePair object.

attach_server_service_pair(self, channel, pbuf_module, forward_events=True)#

Attach a server service pair to a channel.

A server service pair sends commands and receive events.

channel: channel on which messages are exchanged.

pbuf_module: protobuf module with Command/Event description.

forward_events: True to forward and notify received messages in the state machine. If False, only explicit observers will be notified.

The returned object is a service.ServicePair object.

detach_service_pair(self, svc_pair)#

Detach the given service pair and stop notifications of events/messages.

For symmetry with attach methods, the following methods also exist:

  • detach_client_service_pair

  • detach_server_service_pair

  • detach_internal_service_pair

observe(self, bindings, src=None)#

Observe a set of events/messages notified in the message center.

‘bindings’ is a dictionnary of ‘event’: ‘callback’ pairs. See message_center.events for a list of possible events and the expected signature of the callback. To observe a specific message, ‘event’ can be the fully qualified name of the currently selected field in the oneof named ‘id’ (see by ‘msghub_utils.msg_id’).

if ‘src’ is given, only events/message matching the source are observed. ‘src’ can be a channel (for CONNECT/DISCONNECT) or a service pair (for INACTIVE/ACTIVE/MESSAGE).

The returned object shall be kept while the observation is required. Call its Observer.unobserve() method when done.

start_client_channel(self, addr_str)#

Start a client channel with given address in ‘pomp’ format.

Channels are reused, meaning they will share the same socket connection. The message center will internally track the usage to close the connection when the last channel is stopped. The returned channel can then be used to attach one or more msghub services.

start_internal_channel(self)#

Start an internal channel.

Such channel can be used to post message from one state to the state machine and then trigger some transitions.

start_server_channel(self, addr_str)#

Start a server channel with given address in ‘pomp’ format.

The returned channel can then be used to attach one or more msghub services.

stop_channel(self, channel)#

Stop the given channel.

As client channel are reused, only the last call to stop will actually close the socket connection.

Service#

class Service#

A service represent a set of protobuf messages grouped either as ‘Command’ or ‘Event’.

The root message description (Command/Event) shall contain a ‘oneof’ field named ‘id’ with all possibles messages that can be sent or received.

To send a message, there is 2 ways:

  • Call Service.send with the Command/Event message correctly filled. Example:

msg = svc.alloc()
msg.some_field = "some_value"
svc.send(msg)

  • Call the generated Service.sender.<name> with only the sub-message corresponding to the ‘oneof’ ‘id’ sub-field. Example (same message):

svc.sender.some_field("some_value")

Available attributes:

  • msg: message factory, each method self.msg.<field> returns an instance of the service class, such that the “id” field is set to the selected field. Messages that have no inner field (e.g. “Void” messages) are preallocated and identical from one call to another.

  • msg_class: class of protobuf messages

  • service_name(string): fully qualified name of protobuf message.

  • sender: catalog of sender methods. Each method builds a temporary message as-if by calling msg and directly calls the Service.send method of the associated Service.

Simple message from enum value:

import colibrylite.estimation_mode_pb2 as cbry_est

self.mc.dctl.cmd.sender.set_estimation_mode(cbry_est.FLYING)

Complex message from field structure:

self.manager.messages_svc.evt.sender.hover(delay=self.land_delay)

Public Functions

alloc(self, *args, **kwargs)#

Create a new instance of the protobuf message.

observe(self, bindings)#

Helper that observe some events/messages related to this service only.

The returned object shall be kept while the observation is required. Call its ‘unobserve’ method when done.

observe_messages(self, bindings)#

Similar to Service.observe method but the callback signature for messages is different.

Instead of having a full Command/Event messages, only the sub-message corresponding to the ‘oneof’ ‘id’ sub-field.

The returned object shall be kept while the observation is required. Call its ‘unobserve’ method when done.

send(self, pbuf_msg)#

Send a protobuf message.

It shall be an instance of Command or Event.

Service Pair#

class ServicePair#

Class holding a pair of msghub services found in a protobuf description the two services correspond to the special messages ‘Command’ and ‘Event’ and are stored in the respective attributes ‘cmd’ and ‘evt’.

For client pair, command can be sent and event received. For a server pair, command can be received and event sent. For an internal pair, only events are managed.

Kind : public Enum

CLIENT - command can be sent and event received.

SERVER - command can be received and event sent.

INTERNAL - only events are managed.

Observer#

class Observer#

This class is used to represent a set of events and their callbacks being observed in a notifier.

This instance is used as a handle when canceling from the notifier.

Public Functions

unobserve(self)#

Stop observing and sending notifications for this observer.

class Notifier#

Public Functions

notify(self, evt, *args, src=None, extra=None, observer=None)#

Manually trigger a notification with given event.

observe(self, bindings, src=None, cls=Observer)#

Observe events and trigger given callbacks when they occur.

Events are usually described as string for protobuf messages, but they can also be enum values from message_center.events.

unobserve(self, observer)#

Stop observing for given Observer object.

This has the same effect as Observer.unobserve().

Standard message observing#
from fsup.genstate import State

class MyState(State):
    def enter(self, msg):
        self.obs = self.mc.observe({
            "DroneController.ground_close": lambda *_: self.on_ground_close(True),
            "DroneController.ground_not_close": lambda *_: self.on_ground_close(False),
            "DroneController.target_reached":
                lambda _, msg: self.on_target_reached(msg.target_reached),
        })

    def exit(self, msg):
        self.obs.unobserve()

    def on_ground_close(self, is_close):
        self.log.info("Ground is close = {}".format(is_close))

    def on_target_reached(self, msg):
        if msg.all_reached:
            self.log.info("All targets reached!")
        else:
            pass  # ...
Observing custom channels#
import logging
import fsup.events as events
import my_channel_pb2 as pbuf_my_channel  # Custom protobuf

class MyMission:
    def __init__(self, mission_environment):
            self.log = logging.getLogger("MyMission")

    def on_activate(self):
        # Listen to custom events while the mission is active.
        # These events can only be used for transitions when the channel is active.
        self.my_channel = self.mc.start_client_channel('unix:@my-channel')
        self.my_channel_svc = self.mc.attach_client_service_pair(
            self.my_channel, pbuf_my_channel, forward_events=False)

        self.obs = self.my_channel_svc.observe({
            events.Service.ACTIVE : self._enable_some_feature(True),
            events.Service.INACTIVE : self._enable_some_feature(False),
        })

    def on_deactivate(self):
        self.obs.unobserve()
        self.my_channel_svc = None  # Destroy service

    def _enable_some_feature(self, enable):
        self.log.info("Feature enabled = %s", enable)

Events#

The message_center.events module contains special events that can be listened to with observers.

events.Channel : public Enum

CONNECTED (Parameters: channel, connection)

DISCONNECTED (Parameters: channel, connection)

Public Static Attributes

CONNECTED = auto()#
DISCONNECTED = auto()#
events.Service : public Enum

Service events.

INACTIVE - No peer in any channel handling/serving this service (Parameters: service)

ACTIVE - At least one peer connected to a channel handling/service this service (Parameters: service)

MESSAGE - Received a message for this service (Parameters: service, protobuf message)

Public Static Attributes

ACTIVE = auto()#
INACTIVE = auto()#
MESSAGE = auto()#
events.StateMachine : public Enum

State machine events.

STATE_CHANGED (Parameters: current state as a list of hierarchical states) Ex: [“landing”, “normal”, “descent”]

RESET - No parameter

EXIT - No parameter

Public Static Attributes

EXIT = auto()#
RESET = auto()#
STATE_CHANGED = auto()#

fsup.timers#

class Timer#

The fsup.timers module defines a Timer class on top of timer methods from libpomp (i.e.

pomp_timer_new, etc.).

Public Functions

cancel(self)#

Cancel the timer.

destroy(self)#

Free the allocated resources in current object.

This renders the Timer object unusable. This method can be called multiple times, and is called automatically by the class finalizer.

start(self)#

Activate the timer.

If the timer is already active, cancel it first. This method can be called multiple times during the life cycle of the Timer object, provided the timer is not destroyed.

For example, assuming there is a loop pomp-loop object and a logger log in the current scope:

def tick(timer):
    log.info("tick")

t = Timer(loop, 1, tick, repeat=True)
t.start()

fsup.cache#

class Cache#

Dictionary with notifications on value change.

A cache is initialized with a dictionary of initial key/values.

Clients may call observe() to attach callback to value changes (the callback are called when the value is different from the previous value, as defined by != operator).

IMPORTANT: observe() returns a CacheObserver instance; it is necessary to call its request_notifications() method once after creation to be notified of any prior state for the values being observed (doing so will trigger all the callbacks associated with the observer). In some cases it is sufficient for an object that manages the lifecycle of observers to call notify_all_observers() on the Cache instance.

Public Functions

get(self, key)#
make_observer(self, bindings)#

Make an instance of CacheObserver for the given key/values bindings.

No event is observed until a call to observe() is made on the resulting object.

notify_all_observers(self)#

Notify all observers with current cache values.

notify_observer(self, observer)#
observe(self, bindings)#

Observe changes for a subset of keys in cache.

Parameters:

bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.

Returns:

a CacheObserver instance

reset(self, *notify=True)#

Reset to initial cache values.

Parameters:

notify=True – also notify all observers for all keys

set(self, key, value)#
set_multiple(self, **kwargs)#

Set multiple cache entries at the same time.

For example, the following:

self.set(‘a’, 1) self.set(‘b’, 2)

Can be written:

self.set_multiple(a=1, b=2)

The main difference is that all the observers are notified after all entries are changed. In the original code, some observer could see a cache with ‘a’ being set to 1 while ‘b’ being still set to its previous value. This might not be desirable if certain entries have to always be consistent with each other (alternatively, a similar effect can be achieved by storing a tuple of values under a single key).

fsup.enums#

class fsup.enums.BatteryAlert#
NONE#
WARNING#
CRITICAL#
class fsup.enums.BatteryAlertLevel#
SHUTDOWN#
CRITICAL#
LOW#
OK#
UNKNOWN#
class fsup.enums.BatteryDataRole#
NONE#
HOST#
DEVICE#
class fsup.enums.BatteryPowerRole#
NONE#
SINK#
SOURCE#
class fsup.enums.DefectiveMotorFailureType#
NONE#
MOTOR_STALLED#
PROPELLER_SECURITY#
RC_EMERGENCY_STOP#
DEFECTIVE_PROP#
class fsup.enums.DefectiveMotorId#
NONE#
FRONT_LEFT#
FRONT_RIGHT#
REAR_RIGHT#
REAR_LEFT#
class fsup.enums.MotionState#
STEADY#
MOVING#
class fsup.enums.MotorError#
NONE#
EEPROM#
MOTOR_STALLED#
PROPELLER_SECURITY#
COMM_LOST#
RC_EMERGENCY_STOP#
REAL_TIME#
MOTOR_SETTING#
TEMPERATURE#
BATTERY_VOLTAGE#
LIPO_CELLS#
MOSFETS#
BOOTLOADER#
ASSERT#
class fsup.enums.RthBatteryCapacity#
OK#
WARNING#
CRITICAL#
TOOLATE#
UNKNOWN#
class fsup.enums.Sensor#
IMU#
MAGNETOMETER#
BAROMETER#
GPS#
VERTICAL_RANGE_FINDER#
V_CAM_SPEED#
class fsup.enums.VibrationLevel#
OK#
WARNING#
CRITICAL#
class fsup.enums.WindStatus#
OK#
WARNING#
CRITICAL#

libparrot_physics_binding#

class libparrot_physics_binding.CoordGeo3#

Represents a 3D geographic coordinate.

Methods

__init__(self, latitude, longitude, altitude)#

Construct a new CoordGeo3 object.

is_valid(self)#

Check if the CoordGeo3 object is a valid geographic position.

distance(self, other)#

Compute the distance to another CoordGeo3 object.

to_ned(self, ned_origin)#

Convert to a CoordNed3 object. ned_origin is a CoordGeo3 object representing the NED origin.

Attributes

latitude#
longitude#
altitude#
class libparrot_physics_binding.CoordNed3#

Represents a 3D NED coordinate.

Methods

__init__(self, x, y, z)#

Construct a new CoordNed3 object

distance(self, other)#

Compute the distance to another CoordNed3 object.

to_geo(self, ned_origin)#

Convert to a CoordGeo3 object. ned_origin is a CoordGeo3 object representing the NED origin.

Attributes

x#
y#
z#

Managers#

Core#

event_cache#

features.event_cache.EventCache : public Feature

Cache drone events end forward them as autopilot events.

The event cache keep track of different drone states (also known as statuses), when receiving events from various processes: Flight supervisor itself (internal messages), Drone Controller, Guidance, and others.

It also behaves like an fsup.cache.Cache object, thereby allowing client code to build observers for a set of keys in the cache, associated with callbacks on value change.

The only API this manager exposes are the methods delegating to the inner Cache object (e.g. observe(), get(), set(), …)

Public Functions

get(self, key)#

Get current value associated with key.

observe(self, bindings)#

Observe changes for a subset of keys in cache.

Parameters:

bindings – dictionary mapping cache keys to functions accepting 2 arguments: the key and the value associated with it in the cache.

Returns:

an observer as defined in cache.py

set(self, key, value)#

Set current value for key (it notifies observers).

geofence_manager#

features.geofence_manager.GeofenceManager : public Feature

Public Functions

overwrite_user_setting(self, enable)#

Overwrite user setting to enable.

restore_user_setting(self)#

Restore user setting.

set_geofence_center_to_current_position(self)#

Set the geofence’s center to the current position.

Public Members

statuses#

oa_manager#

features.oa_manager.OaDroneState : public IntEnum

Public Functions

is_active(self)#

Public Static Attributes

ACTIVE = autopilot_enums.OA_STATE_ACTIVE#
INACTIVE = autopilot_enums.OA_STATE_INACTIVE#
features.oa_manager.OaDroneAvailability : public IntEnum

Public Functions

is_available(self)#

Public Static Attributes

AVAILABLE = autopilot_enums.OA_AVAILABILITY_AVAILABLE#
UNAVAILABLE = autopilot_enums.OA_AVAILABILITY_UNAVAILABLE#
features.oa_manager.OaDesiredMode : public IntEnum

Public Functions

is_desired(self)#

Public Static Attributes

DISABLED = autopilot_enums.OA_MODE_DISABLED#
STANDARD = autopilot_enums.OA_MODE_STANDARD#
features.oa_manager.ObstacleAvoidanceManager : public Feature

Public Functions

is_obstacle_avoidance_drone_available(self)#

Return True if obstacle avoidance is currently available in the drone, independently from the current guidance mode.

overwrite_user_setting(self, desired_mode)#

Overwrite user setting for desired mode.

restore_user_setting(self)#

Restore user setting for desired mode.

set_frozen_alert(self, status)#

Drone frozen.

set_stuck_alert(self, status)#

Force stuck alert.

Public Members

statuses#

takeoff_readyness_manager#

features.takeoff_readyness_manager.TakeOffReadynessManager : public Feature

Public Functions

get_ready(self)#

video_manager#

features.video_manager.VideoManager : public Feature

List of video modes with a high CPU usage.

To be used to prioritize recording over other CPU intensive tasks.

Public Functions

check_current_config(self, removed_camera_modes=None, removed_recording_capabilities=None, removed_photo_capabilities=None)#

Check the current video configuration and return True if it is compatible with some potential given removed capabilities.

See VideoManager.remove_capabilities() for parameter description.

configure(self, config)#

Reconfigure the video pipeline.

config is a dictionnary with the fields of the arsdk.camera2.Config protobuf message. selected_fields will then be automatically be filled.

from arsdk import camera2_pb2 as pbuf_camera2

self.supervisor.video_manager.configure(config={
    'camera_mode': pbuf_camera2.CAMERA_MODE_RECORDING,
    'video_recording_mode': pbuf_camera2.VIDEO_RECORDING_MODE_STANDARD,
    'video_recording_resolution': pbuf_camera2.VIDEO_RESOLUTION_1080P,
    'video_recording_framerate': pbuf_camera2.FRAMERATE_120,
    'video_recording_dynamic_range':
       pbuf_camera2.DYNAMIC_RANGE_STANDARD,
})
get_config(self)#

Get the current configuration as a dictionnary with the fields of the arsdk.camera2.Config protobuf message.

is_high_perf(self)#

Check if the current video configuration needs the CPUs to be available to be running without skips.

lock_config(self)#

Lock the current configuration and prevent external application to modify it.

remove_capabilities(self, camera_modes=None, recording_capabilities=None, photo_capabilities=None)#

Ask video config to remove some capabilities temporarily.

camera_modes is an array of arsdk.camera2_pb2.CameraMode.

recording_capabilities is an array of tuples containing:

  • resolution (ex arsdk.camera2_pb2.VIDEO_RESOLUTION_DCI_4K)

  • framerate (ex arsdk.camera2_pb2.FRAMERATE_30)

  • dynamic_range (ex arsdk.camera2_pb2.DYNAMIC_RANGE_HDR10)

photo_capabilities is an array of tuples containing:

  • resolution (ex arsdk.camera2_pb2.)

  • format (ex arsdk.camera2_pb2.)

  • file_format (ex arsdk.camera2_pb2.)

If None or an empty array is given it will restore the video capabilities to the default.

from arsdk import camera2_pb2 as pbuf_camera2

self.supervisor.video_manager.remove_capabilities(
    recording_capabilities=[
    (
pbuf_camera2.VIDEO_RESOLUTION_DCI_4K,
pbuf_camera2.FRAMERATE_30,
pbuf_camera2.DYNAMIC_RANGE_HDR10,
    )],
)
restore_default_capabilities(self)#

Restore video capabilities to their default values, without any restrictions.

unlock_config(self)#

Unlock the current configuration and allow modifications.