User guide#

This guide will walk you through Olympe API using a series of examples that increasingly demonstrate more advanced usage.

Before continuing you should first read the Olympe installation procedure.

For your own safety and the safety of others, the following examples will use a simulated ANAFI drone but remember that Olympe is also capable of communicating with a physical drone. As far as Olympe is concerned, the main difference between a physical and a simulated drone is the drone IP address (192.168.42.1 for a physical drone and 10.202.0.1 for a simulated one).

At the end of each example, if you are using a simulated drone, remember to reset the simulation before getting into the next example. Each example assumes that the drone is landed with a fully charged battery. Just enter sphinx-cli action -m world fwman world_reset_all in a terminal to reset the current simulation.

The full code of each example can be found in the src/olympe/doc/examples/ folder.

Create a simulated drone#

First things first, you need a drone to connect to. For this example we will use Sphinx to create a simulated drone and then connect to it using Olympe before sending our first commands.

If you haven’t installed Sphinx yet, now is a good time to install it.

Then in a shell enter the following commands:

$ sudo systemctl start firmwared.service
$ sphinx "/opt/parrot-sphinx/usr/share/sphinx/drones/anafi_ai.drone"::firmware="ftp://<login>:<pass>@ftp2.parrot.biz/versions/anafi2/pc/%23latest/images/anafi2-pc.ext2.zip"

Where login is the one from your Parrot partner FTP account and pass is the associated password.

The core application is now waiting for an UE4 application to connect… In a second shell, do:

$ parrot-ue4-empty

The above commands start a simulation of an ANAFI Ai drone in an empty world. In the following examples, we will be using the virtual ethernet interface of the simulated drone, and reach it at 10.202.0.1.

For more information on Sphinx, please consult its comprehensive user documentation.

Set up your shell environment#

Don’t forget to set up your Python environment using the shell.

$ source ~/code/parrot-groundsdk/./products/olympe/linux/env/shell
(olympe-python3) $

ARSDK messages explained#

At its core, Olympe basically just send and receive ARSDK messages to control a drone. The following sequence diagram shows what is happening when an Olympe scripts sends a TakeOff() command to a drone.

blockdiag Olympe Drone TakeOff() FlyingStateChanged(state='motor_ramping') FlyingStateChanged(state='takingoff')

Take off command sequence#

When Olympe sends a command message like the TakeOff() message above, it then waits for a response from the drone, the FlyingStateChanged(state="takingoff") event message in this case.

Sometimes, the drone can also spontaneously notify its controller (Olympe) of a particular event. Olympe provides a way to monitor such events (or a combination of such events). The following sequence diagram illustrates this scenario with a GPSFixStateChanged(0) that informs Olympe that the GPS fix has been lost.

blockdiag Olympe Drone GPSFixStateChanged(0)

Losing the GPS fix#

As a user of Olympe, you might also be punctually interested in the current state of the drone without monitoring every received message from the drone. To do this, Olympe just remembers the last received event that provides this state information and expose this information through the olympe.Drone.get_state() method.

blockdiag Olympe Drone GPSFixStateChanged(0) get_state

Getting current GPS fix status#

As demonstrated in the following usage examples, Olympe provides a relatively simple API to perform the above actions (and much more) using the following olympe.Drone class methods:

ARSDK message Python types are available in the olympe.messages.<feature_name>[.<class_name>] modules. Likewise, ARSDK enum Python types are available in the olympe.enums.<feature_name>[.<class_name>] modules.

See the Messages Reference Documentation for more information.

Olympe’s basics#

Taking off - “Hello world” example#

The first thing you might want to do with Olympe is making your drone to take off. In this example we’ll write a simple Python script that will connect to the simulated drone we’ve just created and then send it a TakeOff() command.

Create the following Python takeoff.py script somewhere in your home directory:

 1import olympe
 2import os
 3import time
 4from olympe.messages.ardrone3.Piloting import TakeOff, Landing
 5
 6DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 7
 8
 9def test_takeoff():
10    drone = olympe.Drone(DRONE_IP)
11    drone.connect()
12    assert drone(TakeOff()).wait().success()
13    time.sleep(10)
14    assert drone(Landing()).wait().success()
15    drone.disconnect()
16
17
18if __name__ == "__main__":
19    test_takeoff()

First, this script imports the olympe Python package and then the TakeOff() command message from the Piloting feature module (one of the Ardrone3 features module). A “feature” is just a collection of related command and event messages that the drone exchanges its controller (FreeFlight, SkyController, Olympe, …).

Next, this script creates the drone object of the olympe.Drone class. This class constructor requires only one argument: the IP address of the drone. For a simulated drone, we can use 10.202.0.1 which is the default drone IP address over the virtual Ethernet interface. For a physical drone, it would be 192.168.42.1 which is the de default drone IP address over Wi-Fi. Finally, when connected to a SkyController over USB, the SkyController is reachable at 192.168.43.1.

olympe.Drone.connect() actually performs the connection to the drone. This would fail and return False if the drone is unreachable (or non-existent) for some reason.

Then, drone(TakeOff()).wait() sends the TakeOff() command message to the drone and then waits for the drone to acknowledge the command. When the wait() function returns, our simulated drone should be taking off. For now, we will always use the drone(....).wait() construct to send command message and will explain later what the wait() function does and what we could do differently with or without it.

Finally, olympe.Drone.disconnect() disconnect Olympe from the drone properly.

To execute this script and see your drone taking off, from the same shell/terminal you’ve just source’d the shell script:

(olympe-python3) $ python ./takeoff.py

Getting the current value of a drone state or setting#

In this example, we will be using the get_state() method to query the current value of the “maximum tilt” drone’s setting.

When the maximum tilt drone setting is changed by the controller (Olympe) with the MaxTilt() command message the drone sends the MaxTiltChanged() event message in response. Changing a drone setting will be demonstrated in the following example. Here, we’re just interested in getting the current drone setting.

When Olympe connects to a drone it also asks the drone to send back all its states and settings event messages. Olympe can later provide you with this information through the olympe.Drone.get_state() method. So if Olympe is connected to a drone, olympe.Drone.get_state() always returns the current drone state associated to an event message.

In this case, we will be passing the MaxTiltChanged() message to the olympe.Drone.get_state() method. This will return a dictionary of the MaxTiltChanged() event message which provides the following parameters:

MaxTiltChanged Parameters
  • current (float) – Current max tilt

  • min (float) – Range min of tilt

  • max (float) – Range max of tilt

Note: Don’t be confused here, the “min” and “max” parameters are actually the minimum and the maximum values for the “maximum tilt” setting. Here, we are only interested in the “current” value of this setting.

Let’s get down to some practice! First, reset the simulation (sphinx-cli action -m world fwman world_reset_all in a terminal).

Create the following Python maxtiltget.py script somewhere in your home directory:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
 4
 5DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 6
 7
 8def test_maxtiltget():
 9    drone = olympe.Drone(DRONE_IP)
10    drone.connect()
11    print("Drone MaxTilt = ", drone.get_state(MaxTiltChanged)["current"])
12    drone.disconnect()
13
14
15if __name__ == "__main__":
16    test_maxtiltget()

To execute this script and see your drone taking off, from the same shell/terminal you’ve just source’d the shell script:

(olympe-python3) $ python ./maxtiltget.py

This should print the current maximum tilt value in your terminal. The following sequence diagram illustrate what is happening in this simple example.

blockdiag Olympe Drone also send all event messages get_state(MaxTiltChanged) connect connected disconnect disconnected

Getting the drone MaxTilt#

Changing a drone setting - Understand the “expectation” mechanism#

In this example, we will change the “maximum tilt” drone setting. This setting indirectly controls the maximum drone acceleration and speed. The more the drone can tilt, the more the drone gain speed.

The maximum tilt setting itself must be within a minimum and maximum range. A drone with a max tilt value of 0° is not particularly useful while a maximum tilt over 180° might only be useful for a racer drone. For ANAFI the maximum tilt setting must be within 1° and 40°.

You might be wondering:

  • What is happening when you send an invalid setting value (ex: 180°)?

  • How does the drone respond to that?

  • How do we catch this kind of error with Olympe?

Let’s see how this is done in the following example. Some important explanations will follow.

First, reset the simulation (sphinx-cli action -m world fwman world_reset_all in a terminal).

Create the following Python maxtilt.py script somewhere in your home directory:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.PilotingSettings import MaxTilt
 4
 5DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 6
 7
 8def test_maxtilt():
 9    drone = olympe.Drone(DRONE_IP)
10    drone.connect()
11    maxTiltAction = drone(MaxTilt(10)).wait()
12    if maxTiltAction.success():
13        print("MaxTilt(10) success")
14    elif maxTiltAction.timedout():
15        print("MaxTilt(10) timedout")
16    else:
17        # If ".wait()" is called on the ``maxTiltAction`` this shouldn't happen
18        print("MaxTilt(10) is still in progress")
19    maxTiltAction = drone(MaxTilt(0)).wait()
20    if maxTiltAction.success():
21        print("MaxTilt(0) success")
22    elif maxTiltAction.timedout():
23        print("MaxTilt(0) timedout")
24    else:
25        # If ".wait()" is called on the ``maxTiltAction`` this shouldn't happen
26        print("MaxTilt(0) is still in progress")
27    drone.disconnect()
28
29
30if __name__ == "__main__":
31    test_maxtilt()

This time, the script starts by importing the MaxTilt() command from the ardrone3 feature. Then, it connects to the drone and sends two MaxTilt commands. The first one with a 10° tilt value, the second with a 0° tilt value.

Note that this time, we are assigning into the maxTiltAction variable the object returned by the .wait() method. For now, all you have to know is that you can call .success() -> bool on an “action” object (the more general term is “expectation” object) if you want to know if your command succeeded or not. The success() function just returns True in case of success and False otherwise.

You can also call .timedout() -> bool on an “action” object to know if your command message timed out. This .timedout() method is not particularly useful in this example because we always call .wait() on the action object, so the action is either successful or has timed out.

To execute this script, from the same shell/terminal you have source’d the shell script in:

(olympe-python3) $ python ./maxtilt.py

If all goes well, you should see the following output in your terminal:

MaxTilt(10) success
MaxTilt(0) timedout

Obviously, the 10° maximum tilt value is correct, so the first command succeeded while the second command failed to set an incorrect 0° maximum tilt value.

It is important to understand how Olympe knows if a particular command succeeded or not. When Olympe sends a command message, it usually implicitly expects an event message in return.

Up until now, we have only explicitly used command messages. Command messages and event messages are somewhat similar. They are both associated with an internal unique ID and eventually with some arguments (ex: the maximum tilt value) and they both travel from one source to a destination.

A command message travel from the controller (Olympe) to the drone while an event message travel the other way around.

Here, when Olympe sends the MaxTilt(10) command message it implicitly expects a MaxTiltChanged(10) event message in return. If the event is received in time, everything is fine: maxTiltAction.success() is True and maxTiltAction.timedout() is False. Otherwise, the maxTiltAction times out (maxTiltAction.success() is False and maxTiltAction.timedout() is True).

The following sequence diagram illustrates what is happening here. For the second maximum tilt command, when Olympe sends the MaxTilt(0) command message it receives a MaxTiltChanged(1) event message because 0° is an invalid setting value, so the drone just informs the controller that it has set the minimum setting value instead (1°). Olympe does not assume that this response means “No, I won’t do what you are asking”. Instead, it still waits for a MaxTiltChanged(0) event that will never come and the command message times out: (maxTiltAction.success() is False and maxTiltAction.timedout() is True). This behavior is identical for every command message: when Olympe sends a command message to a drone, it either result in a success or a timeout.

blockdiag Olympe Drone maxTiltAction pending maxTiltAction successful maxTiltAction pending maxTiltAction still pending maxTiltAction timedout connect connected MaxTilt(10) MaxTiltChanged(current=10., min=1., max=40.) MaxTilt(0) MaxTiltChanged(current=1., min=1., max=40.) disconnect disconnected

Setting the drone MaxTilt#

The arsdk protocol defined in arsdk-xml does not provide a way to report errors uniformly. This is why Olympe cannot detect errors like this one and just time out instead. Olympe associates to each command a default timeout that can be overridden with the _timeout message parameter. For example:

maxTiltAction = drone(MaxTilt(10, _timeout=1)).wait()

Moving around - Waiting for a ‘hovering’ flying state#

In this example, we will move our drone around using the moveBy() command message.

First, reset the simulation (sphinx-cli action -m world fwman world_reset_all in a terminal).

Create the following Python moveby.py script somewhere in your home directory:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
 4
 5DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 6
 7
 8def test_moveby():
 9    drone = olympe.Drone(DRONE_IP)
10    drone.connect()
11    assert drone(TakeOff()).wait().success()
12    drone(moveBy(10, 0, 0, 0)).wait()
13    assert drone(Landing()).wait().success()
14    drone.disconnect()
15
16
17if __name__ == "__main__":
18    test_moveby()

This script starts by importing the olympe package and then the TakeOff(), moveBy() and Landing() command messages from the ardrone3.Piloting feature module. It then connects to the drone and send the TakeOff(), moveBy() and Landing() commands.

This script should work as-is, right? Let’s see.

To execute this script, from the same shell/terminal you’ve source’d the shell script:

(olympe-python3) $ python ./moveby.py

The output of this script should be:

moveBy timedout

Wait! The drone takes off and then eventually lands without performing the moveBy?! What happened?

When olympe sends a command message to the drone it expects an acknowledgment event message from the drone in return. In this script, drone(TakeOff()).wait() sends the TakeOff() command to the drone and then waits for the drone taking off event message as an acknowledgment from the drone. Olympe knows that after a TakeOff() command it should expect a FlyingStateChanged(state='takingoff') and automatically waits for that event for you.

The problem with the moveBy() command is that it is rejected by the drone as long as the drone is not in the “hovering” flying state. In this case it is rejected because after the TakeOff() command the drone is in takingoff flying state. So, to fix this script we will have to wait for the hovering state before sending the moveBy() command. The following sequence diagram illustrates what is happening with this first attempt to use the moveBy() command.

blockdiag Olympe Drone takeOffAction pending takeOffAction successful takeOffAction successful moveByAction pending silently rejected, moveBy is unavailable moveBy available moveByAction timedout connect connected TakeOff() FlyingStateChanged(state='motor_ramping') FlyingStateChanged(state='takingoff') moveBy() FlyingStateChanged(state='hovering') disconnect disconnected

Attempt to use the moveBy command#

Edit moveby.py with the following corrected script:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
 4from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 5
 6DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 7
 8
 9def test_moveby2():
10    drone = olympe.Drone(DRONE_IP)
11    drone.connect()
12    assert drone(
13        TakeOff()
14        >> FlyingStateChanged(state="hovering", _timeout=5)
15    ).wait().success()
16    assert drone(
17        moveBy(10, 0, 0, 0)
18        >> FlyingStateChanged(state="hovering", _timeout=5)
19    ).wait().success()
20    assert drone(Landing()).wait().success()
21    drone.disconnect()
22
23
24if __name__ == "__main__":
25    test_moveby2()

This new script will wait for the hovering state after each command sent to the drone. To do that, it imports the FlyingStateChanged() event message from the same ardrone3.PilotingState module feature.

Note: The expectations for each command message are defined in the arsdk-xml source repo along with the command and event messages themselves.

 9def test_moveby2():
10    drone = olympe.Drone(DRONE_IP)
11    drone.connect()
12    assert drone(

In this new example after the drone connection, the above code tells Olympe to:

  1. Send the TakeOff() command

  2. Then, to implicitly wait for the expectations of the TakeOff() command: FlyingStateChanged(state='takingoff')

  3. Then, to explicitly wait for the drone hovering flying state event: FlyingStateChanged(state='hovering')

Here, the drone() functor accepts more than just a command message. The drone() takes an expression that may be a combination of command and event messages to process. The “>>” operator is used to combine two expressions with an “and then” semantic. This example could be read as “Take off and then wait a maximum of 5 seconds for the ‘hovering’ flying state”).

The rest of the example should be easy to understand now. After the drone has taken off, this script waits for the drone “hovering” state and then sends the moveBy command, waits for the “hovering” state again and then sends the landing command. The following sequence diagram illustrates this second (successful) attempt to use the moveBy() command.

blockdiag Olympe Drone takeOffAction pending takeOffAction pending takeOffAction successful waiting for FlyingStateChanged(state='hovering') moveBy available moveByAction pending moveByAction successful connect connected TakeOff() FlyingStateChanged(state='motor_ramping') FlyingStateChanged(state='takingoff') FlyingStateChanged(state='hovering') moveBy() moveByEnd() disconnect disconnected

Using the moveBy command#

Let’s check everything works! Reset the simulation (sphinx-cli action -m world fwman world_reset_all in a terminal) and execute this script, from the same shell/terminal you have source’d the shell script:

(olympe-python3) $ python ./moveby.py

And it should work now! The drone should take off, perform a forward move by 10 meters and then land.

Explore available ARSDK commands#

If you followed this guide so far, you might want to explore the Messages Reference Documentation. If you are looking for a specific message or feature, you can also use the search box within the sidebar on the left.

Alternatively, you can also use Olympe in an interactive Python console with ipython and leverage the autocompletion and the help function to browse the available ARSDK messages.

(olympe-python3) $ ipython
In [1]: import olympe
In [2]: from olympe.messages.<TAB>
olympe.messages.animation       olympe.messages.camera          olympe.messages.debug           olympe.messages.generic         olympe.messages.mapper          olympe.messages.precise_home    olympe.messages.thermal
olympe.messages.ardrone3        olympe.messages.common          olympe.messages.drone_manager   olympe.messages.gimbal          olympe.messages.mediastore      olympe.messages.rth             olympe.messages.user_storage
olympe.messages.battery         olympe.messages.controller_info olympe.messages.follow_me       olympe.messages.leds            olympe.messages.powerup         olympe.messages.skyctrl         olympe.messages.wifi

In [3]: from olympe.messages.ardrone3.Piloting import <TAB>
AutoTakeOffMode  CancelMoveTo     Emergency        Landing          PCMD             StopPilotedPOI   UserTakeOff      moveTo
CancelMoveBy     Circle           FlatTrim         NavigateHome     StartPilotedPOI  TakeOff          moveBy

In [4]: from olympe.messages.ardrone3.Piloting import TakeOff
In [5]: help(TakeOff)
Help on ardrone3.Piloting.TakeOff object:

class ardrone3.Piloting.TakeOff(ArsdkMessage)
 |  Ardrone3.Piloting.TakeOff
 |
 |
 |  Ask the drone to take off.
 |
 |  Result: On the quadcopters: the drone takes off if its :py:func:`~olympe.messages.ardrone3.PilotingState.FlyingStateChanged`
 |  was landed. On the fixed wings, the landing process is aborted if the
 |  :py:func:`~olympe.messages.ardrone3.PilotingState.FlyingStateChanged` was landing. Then, event :py:func:`~olympe.messages.ardrone3.PilotingState.FlyingStateChanged`
 |  is triggered.

Using Parrot AirSDK missions with Olympe#

Olympe integrates with the Parrot AirSDK and enables you to install AirSDK “missions” (i.e. Parrot and Parrot partners applications) onto a remote drone connected to Olympe.

Once installed onto the done, Olympe is able to exchange mission specific messages with the drone.

The example below illustrate this installation process and some basic interaction with the Air SDK “Hello, Drone!” mission.

 1import logging
 2import olympe
 3import olympe.log
 4import os
 5import pprint
 6from olympe.controller import Disconnected
 7from olympe.messages.common.Common import Reboot
 8from olympe.messages import mission
 9
10
11DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
12HELLO_MISSION_URL = os.environ.get("HELLO_MISSION_URL", "com.parrot.missions.samples.hello.tar.gz")
13
14olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})
15logger = logging.getLogger("olympe")
16
17
18def test_hello_mission():
19    drone = olympe.Drone(DRONE_IP)
20    with drone.mission.from_path(HELLO_MISSION_URL).open() as hello:
21        # Mission messages modules are now available from the Python path
22        from olympe.airsdk.messages.parrot.missions.samples.hello.Command import Say, Hold
23        from olympe.airsdk.messages.parrot.missions.samples.hello.Event import count
24
25        # Mission messages are also available under the Mission.messages dictionary
26        assert hello.messages["parrot.missions.samples.hello"].Command.Say is Say
27        assert hello.messages["parrot.missions.samples.hello"].Event.count is count
28
29        # Install the 'hello' mission and reboot the drone
30        assert drone.connect()
31        assert hello.install(allow_overwrite=True)
32        logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
33        assert drone(Reboot() >> Disconnected()).wait()
34
35        # Connect to the drone after reboot, load and activate the 'hello' mission
36        assert drone.connect(retry=5)
37        assert hello.wait_ready(5)
38
39        # wait for the current mission to be activated
40        mission_activated = drone(mission.state(state="active"))
41        assert mission_activated.wait(), mission_activated.explain()
42
43        logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
44
45        # load and activate the hello mission
46        mission_activated = drone(
47            mission.load(uid=hello.uid)
48            >> mission.activate(uid=hello.uid)
49        )
50        assert mission_activated.wait(), mission_activated.explain()
51
52        # Make the drone say hello (nod its gimbal)
53        assert drone(Say()).wait()
54        counter = None
55
56        # Wait for 3 nod of the drone gimbal
57        for i in range(3):
58            if counter is None:
59                expectation = drone(count(_policy="wait")).wait(_timeout=10)
60                assert expectation, expectation.explain()
61                counter = expectation.received_events().last().args["value"]
62            else:
63                counter += 1
64                expectation = drone(count(value=counter)).wait(_timeout=10)
65                assert expectation, expectation.explain()
66
67        # Stop and disconnect
68        assert drone(Hold()).wait()
69        expectation = drone(count(_policy="wait", _timeout=15)).wait()
70        assert not expectation, expectation.explain()
71        assert drone(count(value=counter, _policy="check"))
72        assert drone.disconnect()
73
74
75if __name__ == "__main__":
76    test_hello_mission()

Olympe Expectation objects and the Olympe eDSL#

Before introducing more advanced feature, it seems important to take a moment to have a better understanding of the Olympe specific usage of Python operators to compose “Expectation” objects inside the drone() functor.

Olympe Expectation objects#

First, let’s explain what is an “Expectation” object. “Expectation” objects are a special kind of “Future”-like objects from the Python stdlib. People more familiar with Javascript might want to compare “Expectation” classes with the “Promise” class from the standard.

Olympe creates “Expectation” objects whenever a message object is “called”. For example, takeoff_expectation = TakeOff() creates a takeoff_expectation object from the ardrone3.Piloting.TakeOff command message we’ve been using in the previous example.

Simply creating an expectation object has no side effect, you must pass the expectation object to a drone object to “schedule” it. Continuing with our previous example drone(takeoff_expectation) will “schedule” the takeoff_expectation. Here “scheduling” this expectation actually means sending the TakeOff message to the drone and wait for the TakeOff message default expectations (FlyingStateChanged(state="takingoff")). Let us pause on that. This means that an expectation objects: - has a potential side effect when it is “scheduled” by a drone object (here we send the TakeOff message) - may have “sub-expectation(s)” (here the FlyingStateChanged(state="takingoff") event message)

For convenience, the drone() functor returns the expectation object it has received in parameter. This enables the possibility to create and schedule an expectation object in one expression, for example: takeoff_expectation = drone(TakeOff()).

Olympe Expectation eDSL#

Now that we know that one “Expectation” object can be comprised of other expectation objects, like this is the case of the takeoff_expectation in the previous paragraph, we might want to compose expectation objects ourselves.

Olympe supports the composition of expectation objects with 3 Python binary operators: | (“OR”), & (“AND”), and >> (“AND THEN”). This feature has been briefly introduced in the Moving around - Waiting for a 'hovering' flying state previous example where the >> “and then” operator is used to wait for the “hovering” flying state after a moveBy command.

expectation_object = drone(
    moveBy(10, 0, 0, 0)
    >> FlyingStateChanged(state="hovering", _timeout=5)
)

This specific syntax that makes use of the Python operator overloading feature is what is called an “embedded Domain Specific Language” and we might refer to it as the Olympe “Expectation eDSL”.

Here, the drone() functor accepts more than just one command message expectation. The drone() functor takes an expression that may be a combination of command and event messages to process. This expression actually results in the creation of a compound expectation object. The “>>” operator is used to combine two expressions with an “and then” semantic. This example could be read as “Take off and then wait a maximum of 5 seconds for the ‘hovering’ flying state”).

You can choose to schedule an Olympe eDSL expression with or without waiting for the end of its execution, just call .wait() on the expectation object to block the current thread until the expectation object is done (i.e. successful or timedout).

When a compound expectation fails (or times out) you might want to understand what went wrong. To that end, you can use the .explain() that returns a string representation of the compound expectation. The .explain() method highlights in green the part of the expectation that was successful and in red the part of the compound expectation that has failed.

Programmatic eDSL construct#

You also have the ability to construct a compound expectation object programmatically before scheduling it, for example:

expectation_object = (
    moveBy(10, 0, 0, 0)
    >> FlyingStateChanged(state="hovering", _timeout=5)
)
for i in range(3):
    expectation_object = expectation_object
        >> moveBy(10, 0, 0, 0)
        >> FlyingStateChanged(state="hovering", _timeout=5)
    )
drone(expectation_object).wait(60)
assert expectation_object.success()

Each expectation part of a compound expectation may be given a specific _timeout value in seconds that is independent of the global compound expectation timeout value that may be specified later to the .wait() method. When .wait() is called without a timeout value, this method blocks the current thread indefinitely until the expectation succeeds or until a blocking sub-expectation has timedout. In the example above, if any of the FlyingStateChanged expectations times out after 5 seconds, the call to drone(expectation_object).wait(60) returns and expectation_object.success() would return False. Likewise, if the drone takes more than 60 seconds to complete this moveBy, the expectation_object compound expectation times out and expectation_object.success() returns False even if no individual FlyingStateChanged expectation has timedout.

Olympe eDSL operators semantic#

To conclude this quick tour of the expectation eDSL, let’s focus on the specific semantic of each of the supported operator.

The >> “and then” operator is probably the most useful of them. When an “and then” compound expectation is scheduled, the left hand side expectation is scheduled and awaited. When the left hand side expectation is satisfied the right-hand side expectation is scheduled and awaited. If the left-hand side expectation times out, the left-hand side is never scheduled nor awaited and the compound expectation times out. The compound “and then” expectation is successful when the right-hand side is successful.

The & “and” operator schedules and awaits both the left-hand side and right-hand side expectations objects simultaneously. The compound “and” expectation is successful when both the left-hand side and the right hand side expectation are successful without any specific order requirement. The compound expectation times out if the left-hand side or the right-hand side of the expectation times out.

The | “or” operator schedules and awaits both the left-hand side and right-hand side expectations objects simultaneously. The compound “or” expectation is successful if one of the left-hand side and the right hand side expectation is successful (or both). The compound expectation times out if both the left-hand side and the right-hand side of the expectation times out.

You should now understand the basics of Olympe and should be able to write your own scripts. The rest of this guide will walk you through the most advanced (nevertheless important) features of Olympe.

Advanced usage examples#

Using Olympe asynchronously#

In the basic examples above we’ve always performed actions synchronously because we were always immediately waiting for an action to complete with the .wait() method.

In this example, we will send some flying commands to the drone asynchronously. While the drone executes those commands, we will start a video recording and change the gimbal velocity target along the pitch axis. After sending those camera-related commands, we will call the .wait() method on the “flying action” and then stop the recording.

Create the following python asyncaction.py script somewhere in your home directory:

 1import os
 2import olympe
 3from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
 4from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 5from olympe.messages.camera import start_recording, stop_recording
 6from olympe.messages import gimbal
 7
 8DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 9
10
11def test_asyncaction():
12    with olympe.Drone(DRONE_IP) as drone:
13        drone.connect()
14
15        # Start a flying action asynchronously
16        flyingAction = drone(
17            TakeOff()
18            >> FlyingStateChanged(state="hovering", _timeout=5)
19            >> moveBy(10, 0, 0, 0)
20            >> FlyingStateChanged(state="hovering", _timeout=5)
21            >> Landing()
22        )
23
24        # Start video recording while the drone is flying
25        if not drone(start_recording(cam_id=0)).wait().success():
26            assert False, "Cannot start video recording"
27
28        # Send a gimbal pitch velocity target while the drone is flying
29        cameraAction = drone(gimbal.set_target(
30            gimbal_id=0,
31            control_mode="velocity",
32            yaw_frame_of_reference="none",
33            yaw=0.0,
34            pitch_frame_of_reference="none",
35            pitch=0.1,
36            roll_frame_of_reference="none",
37            roll=0.0,
38        )).wait()
39
40        if not cameraAction.success():
41            assert False, "Cannot set gimbal velocity target"
42
43        # Wait for the end of the flying action
44        if not flyingAction.wait().success():
45            assert False, "Cannot complete the flying action"
46
47        # Stop video recording while the drone is flying
48        if not drone(stop_recording(cam_id=0)).wait().success():
49            assert False, "Cannot stop video recording"
50
51        # Leaving the with statement scope: implicit drone.disconnect() but that
52        # is still a good idea to perform the drone disconnection explicitly
53        drone.disconnect()
54
55
56if __name__ == "__main__":
57    test_asyncaction()

Reset the simulation (sphinx-cli action -m world fwman world_reset_all in a terminal) and execute this script, from the same shell/terminal you have source’d the shell script:

In this example, the olympe.Drone.__call__() functor process commands and events asynchronously so that multiple commands can be sent to the drone and processed concurrently. The events associated to asynchronous actions are interleaved in an undefined order. The following sequence diagram illustrates a possible sequence of event for this script.

blockdiag Olympe Drone flyingAction pending start record pending start record successful flyingAction pending cameraAction successful flyingAction pending waiting for FlyingStateChanged(state='hovering') flyingAction pending flyingAction pending flyingAction pending flyingAction pending waiting for FlyingStateChanged(state='hovering') flyingAction pending flyingAction pending flyingAction pending flyingAction successful stop record pending stop record successful connect connected TakeOff() VideoV2(record='start') VideoStateChangedV2(state='started') FlyingStateChanged(state='motor_ramping') set_target(control_mode='velocity', ...) FlyingStateChanged(state='takingoff') FlyingStateChanged(state='hovering') moveBy() FlyingStateChanged(state='flying') moveByEnd() FlyingStateChanged(state='hovering') Landing() FlyingStateChanged(state='landing') FlyingStateChanged(state='landed') VideoV2(record='stop') VideoStateChangedV2(state='stopped') disconnect disconnected

Asynchronous command examples#

Using Olympe enums types#

In addition to the ARSDK messages, Olympe also provides Python Enum and Bitfield types in the olympe.enums.<feature_name>[.<class_name>] modules.

Most of the time, you shouldn’t really need to import an enum type into your script because enum types are implicitly constructed from a string when you create a message object, so the following examples are roughly equivalent:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 4from olympe.enums.ardrone3.PilotingState import FlyingStateChanged_State as FlyingState
 5
 6olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
 7
 8DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 9
10
11def test_enums():
12    drone = olympe.Drone(DRONE_IP)
13    drone.connect()
14    assert drone(FlyingStateChanged(state="landed")).wait(5).success()
15    # is equivalent to
16    assert drone(FlyingStateChanged(state=FlyingState.landed)).wait(5).success()
17    drone.disconnect()
18
19
20if __name__ == "__main__":
21    test_enums()

Olympe enum types should behave very much like Python 3 enum.Enum.

Bitfields types are associated to each ARSDK enums types and are occasionally used by ARSDK messages. A Bitfield type can hold any combination of its associated Enum type values.

Bitfield example:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 4from olympe.enums.ardrone3.PilotingState import FlyingStateChanged_State as FlyingState
 5
 6olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
 7
 8DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 9
10
11def test_bitfield():
12    drone = olympe.Drone(DRONE_IP)
13    drone.connect()
14
15    flying_states = FlyingState._bitfield_type_("takingoff|hovering|flying")
16
17    if drone.get_state(FlyingStateChanged)["state"] in flying_states:
18        assert False, "The drone should not be flying"
19    else:
20        print("The drone is not in flight")
21    drone.disconnect()
22
23
24if __name__ == "__main__":
25    test_bitfield()

Additional usage examples are available in the unit tests of olympe.arsdkng.enums.

Using Olympe exptectation eDSL#

Before continuing with this Olympe example, you might want to read the Olympe eDSL section.

Sometimes it can be useful to send a command to the drone only if it is in a specific state. For example, if the drone is already hovering when you start an Olympe script, you might want to skip the usual taking off command. This can be useful if a previous execution of your script left your drone in a hovering state.

 1import olympe
 2import os
 3from olympe.messages.ardrone3.Piloting import TakeOff
 4from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 5from olympe.enums.ardrone3.PilotingState import FlyingStateChanged_State
 6from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
 7
 8DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 9
10
11def test_takeoff_if_necessary_1():
12    with olympe.Drone(DRONE_IP) as drone:
13        drone.connect()
14        print("Takeoff if necessary...")
15        if (drone.get_state(FlyingStateChanged)["state"] is not
16                FlyingStateChanged_State.hovering):
17            drone(GPSFixStateChanged(fixed=1, _timeout=10, _policy="check_wait")).wait()
18            drone(
19                TakeOff(_no_expect=True)
20                & FlyingStateChanged(state="takingoff", _policy="wait", _timeout=5)
21            ).wait()
22        drone.disconnect()
23
24
25if __name__ == "__main__":
26    test_takeoff_if_necessary_1()

Here olympe.Drone.get_state() is used to check the current flying state of the drone. If the drone is not in hovering, we check and eventually wait for a GPS fix. Note that “check_wait” is the default value for the _policy parameter. The possible values for the _policy parameter are:

  • “check”, to check the current state of the drone (i.e. match the last event message of this kind received from the drone).

  • “wait”, to wait for a new event message from the drone (even if the last event message of this kind that has been received would have matched).

  • “check_wait” (the default), to “check” the current state of the drone and if necessary “wait” for a matching event message.

In the above example we are using a compound expectation expression to send a taking off command when the drone has a GPS fix and when it is not already in the hovering state.

The default expectations for the TakeOff command are: FlyingStateChanged(state='motor_ramping', _policy='wait') & FlyingStateChanged(state='takingoff', _policy='wait') (see the TakeOff() command documentation). When the controller receives the “takingoff” flying state a few milliseconds after the TakeOff command has been sent, the drone has just climbed a few centimeters. Here, we don’t really care for this “takingoff” flying state and this is why we are disabling the default expectations of the TakeOff command. TakeOff(_no_expect=True) sends the takeoff command and does not wait for the default expectations for this command. Instead of the default expectations, we are directly expecting the “hovering” flying state. We are using the ‘&’ (“AND”) operator instead of ‘>>’ (“THEN”) to wait for this event while Olympe sends the TakeOff command concurrently. If the “>>” (“THEN”) operator were to be used instead, we might (theoretically) miss the FlyingStateChanged event drone response while Olympe sends the ‘TakeOff’ message.

As demonstrated below, this problem can also be solved without using any control flow statements:

 1import olympe
 2import os
 3from olympe.messages.ardrone3.Piloting import TakeOff
 4from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
 5from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
 6
 7DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
 8
 9
10def test_takeoff_if_necessary_2():
11    with olympe.Drone(DRONE_IP) as drone:
12        drone.connect()
13        print("Takeoff if necessary...")
14        drone(
15            FlyingStateChanged(state="hovering", _policy="check")
16            | (
17                GPSFixStateChanged(fixed=1, _timeout=10)
18                >> (
19                    TakeOff(_no_expect=True)
20                    & FlyingStateChanged(state="takingoff", _policy="wait", _timeout=5)
21                )
22            )
23        ).wait()
24        drone.disconnect()
25
26
27if __name__ == "__main__":
28    test_takeoff_if_necessary_2()

Here, the ‘|’ (“OR”) operator is used to “check” if the current flying state is “hovering”. If not, we wait for a GPS fix if necessary with the implicit “check_wait” policy. Then (“>>”) we send the taking off command and override the default expectations to wait for the “hovering” flying state as before.

Capture the video streaming and its metadata#

Once you are connected to a drone with Olympe, to start the video streaming just call the olympe.Drone.streaming.start() function and the drone will start sending its video stream to Olympe. Call olympe.Drone.streaming.stop() the video streaming.

Realtime capture#

Before you start the video streaming, you can register some callback functions that will be called whenever Olympe receive/decode a new video frame. See set_streaming_callbacks().

Record the live/replayed video stream for a post-processing#

Before you start the video streaming, you can specify some output files that will be used by Olympe to record the video stream and its metadata. set_streaming_output_files().

Video streaming example#

The following example shows how to get the video stream from the drone using Olympe. Internally, Olympe leverages Parrot libpdraw to:

  • initialize the video streaming from the drone

  • decode the H.264 video stream

  • register user provided callback functions that are called for each (encoded or decoded) frame with its associated metadata

  • record the live video stream from the drone to the disk

When using Olympe to access the video stream you can’t use the PDrAW standalone executable to view the video stream (the drone only supports one video client at a time).

For this example, we first create a fixture class that will hold our olympe.Drone object and some H.264 statistics.

30class StreamingExample:
31    def __init__(self):
32        # Create the olympe.Drone object from its IP address
33        self.drone = olympe.Drone(DRONE_IP)
34        self.tempd = tempfile.mkdtemp(prefix="olympe_streaming_test_")
35        print(f"Olympe streaming example output dir: {self.tempd}")
36        self.h264_frame_stats = []
37        self.h264_stats_file = open(os.path.join(self.tempd, "h264_stats.csv"), "w+")
38        self.h264_stats_writer = csv.DictWriter(
39            self.h264_stats_file, ["fps", "bitrate"]
40        )
41        self.h264_stats_writer.writeheader()
42        self.frame_queue = queue.Queue()
43        self.processing_thread = threading.Thread(target=self.yuv_frame_processing)
44        self.renderer = None

Our objective is to start the video stream, fly the drone around, perform some live video processing, stop the video stream and finally perform some video postprocessing.

197def test_streaming():
198    streaming_example = StreamingExample()
199    # Start the video stream
200    streaming_example.start()
201    # Perform some live video processing while the drone is flying
202    streaming_example.fly()
203    # Stop the video stream
204    streaming_example.stop()
205    # Recorded video stream postprocessing
206    # streaming_example.replay_with_vlc()

Before we start the video streaming, we must connect to the drone and optionally register our callback functions and output files for the recorded video stream.

46    def start(self):
47        # Connect the the drone
48        assert self.drone.connect(retry=3)
49
50        if DRONE_RTSP_PORT is not None:
51            self.drone.streaming.server_addr = f"{DRONE_IP}:{DRONE_RTSP_PORT}"
52
53        # You can record the video stream from the drone if you plan to do some
54        # post processing.
55        self.drone.streaming.set_output_files(
56            video=os.path.join(self.tempd, "streaming.mp4"),
57            metadata=os.path.join(self.tempd, "streaming_metadata.json"),
58        )
59
60        # Setup your callback functions to do some live video processing
61        self.drone.streaming.set_callbacks(
62            raw_cb=self.yuv_frame_cb,
63            h264_cb=self.h264_frame_cb,
64            start_cb=self.start_cb,
65            end_cb=self.end_cb,
66            flush_raw_cb=self.flush_cb,
67        )
68        # Start video streaming
69        self.drone.streaming.start()
70        self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
71        self.running = True
72        self.processing_thread.start()

The StreamingExample.yuv_frame_cb() and StreamingExample.h264_frame_cb() receives an olympe.VideoFrame() object in parameter that you can use to access a video frame data (see: olympe.VideoFrame.as_ndarray(), olympe.VideoFrame.as_ctypes_pointer()) and its metadata (see: olympe.VideoFrame.info() and olympe.VideoFrame.vmeta()).

The full code of this example can be found in src/olympe/doc/examples/streaming.py.

Post-processing a recorded video#

You can also use Olympe to perform some post-processing on an .MP4 file downloaded from the drone.

import argparse
import olympe
import os
import sys
import time
from olympe.video.pdraw import Pdraw, PdrawState
from olympe.video.renderer import PdrawRenderer


DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")


def yuv_frame_cb(yuv_frame):
    """
    This function will be called by Olympe for each decoded YUV frame.

        :type yuv_frame: olympe.VideoFrame
    """
    import cv2

    # the VideoFrame.info() dictionary contains some useful information
    # such as the video resolution
    info = yuv_frame.info()
    height, width = (  # noqa
        info["raw"]["frame"]["info"]["height"],
        info["raw"]["frame"]["info"]["width"],
    )

    # yuv_frame.vmeta() returns a dictionary that contains additional
    # metadata from the drone (GPS coordinates, battery percentage, ...)

    # convert pdraw YUV flag to OpenCV YUV flag
    cv2_cvt_color_flag = {
        olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
        olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
    }[yuv_frame.format()]

    # yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
    # i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame

    # Use OpenCV to convert the yuv frame to RGB
    cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)  # noqa


def main(argv):
    parser = argparse.ArgumentParser(description="Olympe Pdraw Example")
    parser.add_argument(
        "-u",
        "--url",
        default=f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live",
        help=(
            "Media resource (rtsp:// or file://) URL.\n"
            "See olympe.Pdraw.play documentation"
        ),
    )
    parser.add_argument("-m", "--media-name", default="DefaultVideo")
    args = parser.parse_args(argv)
    pdraw = Pdraw()
    # Uncomment the following line, to test this OpenCV frame processing callback function
    # This function requires `pip3 install opencv-python`.
    # pdraw.set_callbacks(raw_cb=yuv_frame_cb)
    pdraw.play(url=args.url, media_name=args.media_name)
    renderer = PdrawRenderer(pdraw=pdraw)
    assert pdraw.wait(PdrawState.Playing, timeout=5)
    if args.url.endswith("/live"):
        # Let's see the live video streaming for 10 seconds
        time.sleep(10)
        pdraw.close()
        timeout = 5
    else:
        # When replaying a video, the pdraw stream will be closed automatically
        # at the end of the video
        # For this is example, this is the replayed video maximal duration:
        timeout = 90
    assert pdraw.wait(PdrawState.Closed, timeout=timeout)
    renderer.stop()
    pdraw.dispose()


def test_pdraw():
    main([])


if __name__ == "__main__":
    main(sys.argv[1:])

See the Pdraw documentation for more information.

Connect to a physical drone or to a SkyController#

Warning

DISCLAIMER You should really carefully validate your code before trying to control a physical drone through Olympe. Use at your own risk.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PARROT COMPANY BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Connect to a physical drone#

To connect olympe to a physical drone, you first need to connect to your linux box to a drone Wi-Fi access point. Once you are connected to your drone over Wi-Fi, you just need to specify the drone ip address on its Wi-Fi interface (“192.168.42.1”).

 1import olympe
 2import os
 3
 4DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
 5
 6
 7def test_physical_drone():
 8    drone = olympe.Drone(DRONE_IP)
 9    drone.connect()
10    drone.disconnect()
11
12
13if __name__ == "__main__":
14    test_physical_drone()

Connect to a SkyController#

To connect Olympe to a physical SkyController, you first need to connect to your Linux node to the SkyController 3 USB-C port. Then you should be able to connect to your SkyController with its RNDIS IP address (“192.168.53.1”).

 1import olympe
 2from olympe.messages.skyctrl.CoPiloting import setPilotingSource
 3
 4SKYCTRL_IP = "192.168.53.1"
 5
 6if __name__ == "__main__":
 7    drone = olympe.Drone(SKYCTRL_IP)
 8    drone.connect()
 9    drone(setPilotingSource(source="Controller")).wait()
10    drone.disconnect()

Pair a SkyController with a drone#

If your SkyController is not already connected to a drone, you may have to pair it first.

  1from collections import OrderedDict
  2import os
  3
  4import olympe
  5from olympe.messages.drone_manager import (
  6    discover_drones,
  7    connection_state,
  8    connect,
  9    forget,
 10)
 11
 12olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})
 13
 14SKYCTRL_IP = os.environ.get("SKYCTRL_IP", "192.168.53.1")
 15DRONE_SSID = os.environ.get("DRONE_SSID", "Anafi_PC_000000")
 16DRONE_SECURITY_KEY = os.environ.get("DRONE_SECURITY_KEY", "")
 17DRONE_SERIAL = os.environ.get("DRONE_SERIAL", "000000")
 18
 19
 20class SkyControllerExample:
 21    def __init__(self):
 22        self.skyctrl = olympe.SkyController(SKYCTRL_IP)
 23
 24    def skyctrl_connect(self):
 25        self.skyctrl.connect()
 26
 27    def update_drones(self):
 28        discover_results = self.skyctrl(discover_drones()).wait(_timeout=10)
 29        assert discover_results.success(), "Update drone discovery timedout"
 30        drone_list_items = discover_results.received_events()
 31        known_drones = OrderedDict()
 32        visible_drones = OrderedDict()
 33        active_drone = None
 34        for drone_list_item in drone_list_items:
 35            if drone_list_item.args["visible"] == 1:
 36                visible_drones[drone_list_item.args["serial"]] = drone_list_item.args
 37            if drone_list_item.args["active"] == 1:
 38                active_drone = drone_list_item.args["serial"]
 39            if drone_list_item.args["connection_order"] != 0:
 40                known_drones[drone_list_item.args["serial"]] = drone_list_item.args
 41
 42        self.active_drone = active_drone
 43        self.known_drones = known_drones
 44        self.visible_drones = visible_drones
 45
 46        print("Active drone: ", self.active_drone)
 47        print("Known drones: ", ", ".join(self.known_drones))
 48        print("Visible drones: ", ", ".join(self.visible_drones))
 49
 50    def connect_drone(self, drone_serial, drone_security_key=""):
 51        self.update_drones()
 52        if self.active_drone == drone_serial:
 53            print(f"SkyController is already connected to {drone_serial}")
 54            return True
 55        print(f"SkyController is not currently connected to {drone_serial}")
 56        if drone_serial in self.visible_drones:
 57            print(f"Connecting to {drone_serial}...")
 58            connection = self.skyctrl(
 59                connect(serial=drone_serial, key=drone_security_key)
 60                >> connection_state(state="connected", serial=drone_serial)
 61            ).wait(_timeout=10)
 62        elif drone_serial in self.known_drones:
 63            print(
 64                f"{drone_serial} is a known drone but is not currently visible"
 65            )
 66            return
 67        elif drone_serial is not None:
 68            print(
 69                f"{drone_serial} is an unknown drone and not currently visible"
 70            )
 71            return
 72        if connection.success():
 73            print(f"Connected to {drone_serial}")
 74            return True
 75        else:
 76            print(f"Failed to connect to {drone_serial}")
 77
 78    def forget_drone(self, drone_serial):
 79        if drone_serial == self.active_drone:
 80            print(f"Forgetting {drone_serial} ...")
 81            self.skyctrl(
 82                forget(serial=drone_serial)
 83                >> connection_state(state="disconnecting", serial=drone_serial)
 84            ).wait(_timeout=10)
 85        elif drone_serial in self.known_drones:
 86            print(f"Forgetting {drone_serial} ...")
 87            self.skyctrl(forget(serial=drone_serial)).wait(_timeout=10)
 88        else:
 89            print(f"{drone_serial} is an unknown drone")
 90
 91    def disconnect_skyctrl(self):
 92        self.skyctrl.disconnect()
 93
 94
 95def main():
 96    example = SkyControllerExample()
 97    print("@ Connection to SkyController")
 98    example.skyctrl_connect()
 99    example.update_drones()
100    print("@ Connection to a drone")
101    if example.connect_drone(DRONE_SERIAL, DRONE_SECURITY_KEY):
102        example.update_drones()
103        print("@ Forgetting a drone")
104        example.forget_drone(DRONE_SERIAL)
105        example.update_drones()
106    print("@ Disconnection from SkyController")
107    example.disconnect_skyctrl()
108
109
110def test_skyctrl_drone_pairing():
111    main()
112
113
114if __name__ == "__main__":
115    main()