Using Parrot AirSDK missions with Olympe#

Olympe integrates with the Parrot AirSDK and enables you to install AirSDK “missions” (i.e. Parrot and Parrot partners applications) onto a remote drone connected to Olympe.

Once installed onto the done, Olympe is able to exchange mission specific messages with the drone.

The example below illustrate this installation process and some basic interaction with the Air SDK “Hello, Drone!” mission.

 1import logging
 2import olympe
 3import olympe.log
 4import os
 5import pprint
 6from olympe.controller import Disconnected
 7from olympe.messages.common.Common import Reboot
 8from olympe.messages import mission
 9
10
11DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
12HELLO_MISSION_URL = os.environ.get("HELLO_MISSION_URL", "com.parrot.missions.samples.hello.tar.gz")
13
14olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})
15logger = logging.getLogger("olympe")
16
17
18def test_hello_mission():
19    drone = olympe.Drone(DRONE_IP)
20    with drone.mission.from_path(HELLO_MISSION_URL).open() as hello:
21        # Mission messages modules are now available from the Python path
22        from olympe.airsdk.messages.parrot.missions.samples.hello.Command import Say, Hold
23        from olympe.airsdk.messages.parrot.missions.samples.hello.Event import count
24
25        # Mission messages are also available under the Mission.messages dictionary
26        assert hello.messages["parrot.missions.samples.hello"].Command.Say is Say
27        assert hello.messages["parrot.missions.samples.hello"].Event.count is count
28
29        # Install the 'hello' mission and reboot the drone
30        assert drone.connect()
31        assert hello.install(allow_overwrite=True)
32        logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
33        assert drone(Reboot() >> Disconnected()).wait()
34
35        # Connect to the drone after reboot, load and activate the 'hello' mission
36        assert drone.connect(retry=5)
37        assert hello.wait_ready(5)
38
39        # wait for the current mission to be activated
40        mission_activated = drone(mission.state(state="active"))
41        assert mission_activated.wait(), mission_activated.explain()
42
43        logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
44
45        # load and activate the hello mission
46        mission_activated = drone(
47            mission.load(uid=hello.uid)
48            >> mission.activate(uid=hello.uid)
49        )
50        assert mission_activated.wait(), mission_activated.explain()
51
52        # Make the drone say hello (nod its gimbal)
53        assert drone(Say()).wait()
54        counter = None
55
56        # Wait for 3 nod of the drone gimbal
57        for i in range(3):
58            if counter is None:
59                expectation = drone(count(_policy="wait")).wait(_timeout=10)
60                assert expectation, expectation.explain()
61                counter = expectation.received_events().last().args["value"]
62            else:
63                counter += 1
64                expectation = drone(count(value=counter)).wait(_timeout=10)
65                assert expectation, expectation.explain()
66
67        # Stop and disconnect
68        assert drone(Hold()).wait()
69        expectation = drone(count(_policy="wait", _timeout=15)).wait()
70        assert not expectation, expectation.explain()
71        assert drone(count(value=counter, _policy="check"))
72        assert drone.disconnect()
73
74
75if __name__ == "__main__":
76    test_hello_mission()