Using Parrot AirSDK missions with Olympe#
Olympe integrates with the Parrot AirSDK and enables you to install AirSDK “missions” (i.e. Parrot and Parrot partners applications) onto a remote drone connected to Olympe.
Once installed onto the done, Olympe is able to exchange mission specific messages with the drone.
The example below illustrate this installation process and some basic interaction with the Air SDK “Hello, Drone!” mission.
1import logging
2import olympe
3import olympe.log
4import os
5import pprint
6from olympe.controller import Disconnected
7from olympe.messages.common.Common import Reboot
8from olympe.messages import mission
9
10
11DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
12HELLO_MISSION_URL = os.environ.get("HELLO_MISSION_URL", "com.parrot.missions.samples.hello.tar.gz")
13
14olympe.log.update_config({"loggers": {"olympe": {"level": "INFO"}}})
15logger = logging.getLogger("olympe")
16
17
18def test_hello_mission():
19 drone = olympe.Drone(DRONE_IP)
20 with drone.mission.from_path(HELLO_MISSION_URL).open() as hello:
21 # Mission messages modules are now available from the Python path
22 from olympe.airsdk.messages.parrot.missions.samples.hello.Command import Say, Hold
23 from olympe.airsdk.messages.parrot.missions.samples.hello.Event import count
24
25 # Mission messages are also available under the Mission.messages dictionary
26 assert hello.messages["parrot.missions.samples.hello"].Command.Say is Say
27 assert hello.messages["parrot.missions.samples.hello"].Event.count is count
28
29 # Install the 'hello' mission and reboot the drone
30 assert drone.connect()
31 assert hello.install(allow_overwrite=True)
32 logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
33 assert drone(Reboot() >> Disconnected()).wait()
34
35 # Connect to the drone after reboot, load and activate the 'hello' mission
36 assert drone.connect(retry=5)
37 assert hello.wait_ready(5)
38
39 # wait for the current mission to be activated
40 mission_activated = drone(mission.state(state="active"))
41 assert mission_activated.wait(), mission_activated.explain()
42
43 logger.info("Mission list: " + pprint.pformat(drone.mission.list_remote()))
44
45 # load and activate the hello mission
46 mission_activated = drone(
47 mission.load(uid=hello.uid)
48 >> mission.activate(uid=hello.uid)
49 )
50 assert mission_activated.wait(), mission_activated.explain()
51
52 # Make the drone say hello (nod its gimbal)
53 assert drone(Say()).wait()
54 counter = None
55
56 # Wait for 3 nod of the drone gimbal
57 for i in range(3):
58 if counter is None:
59 expectation = drone(count(_policy="wait")).wait(_timeout=10)
60 assert expectation, expectation.explain()
61 counter = expectation.received_events().last().args["value"]
62 else:
63 counter += 1
64 expectation = drone(count(value=counter)).wait(_timeout=10)
65 assert expectation, expectation.explain()
66
67 # Stop and disconnect
68 assert drone(Hold()).wait()
69 expectation = drone(count(_policy="wait", _timeout=15)).wait()
70 assert not expectation, expectation.explain()
71 assert drone(count(value=counter, _policy="check"))
72 assert drone.disconnect()
73
74
75if __name__ == "__main__":
76 test_hello_mission()