Usage#
This section describes how to use the SDK.
Please complete all the setup steps given in Setup if you haven’t done so.
Examples#
Example scripts to demonstrate the use of our SDK.
Single Upstream (one-to-one)#
This is the simplest scenario where you are only interested in messages from a single upstream process.
Run the following steps for a demo:
Method 1:
In a terminal, run
sender_health
In another terminal, run
receiver_health
Method 2:
In a terminal, run
python3 -m akkits.scripts.examples.sender_health
In another terminal, run
python3 -m akkits.scripts.examples.receiver_health
Tip
You can also run
sender_tracklet
+receiver_tracklet
sender_point_cloud
+receiver_point_cloud
Multiple Upstreams (many-to-one)#
This is a scenario where you are interested in messages from multiple upstream processes (tracklets, health status, etc).
Here, we use a zmq.Poller
so that we can effectively receive data from multiple sockets / publishers without resorting to using a sleep
function which might lead to busy-loop. See this and this for more explanations.
Run the following steps for a demo:
Method 1:
In a terminal, run
sender_health
In another terminal, run
sender_tracklet
In another terminal, run
receiver_tracklet_health
Method 2:
In a terminal, run
python3 -m akkits.scripts.examples.sender_health
In another terminal, run
python3 -m akkits.scripts.examples.sender_tracklet
In another terminal, run
python3 -m akkits.scripts.examples.receiver_tracklet_health
Mocking Fusion Box Error Codes#
Scripts that allow you to mock various faults and send out error codes. Each of these scripts will continue to run forever until a KeyboardInterrupt
(Ctrl+C
) is issued or it is killed.
Faulty Fusion Controller#
You can send health status messages with specific status codes for DeviceStatus
, AlgoStatus
, SensorStatus
by running:
Method 1:
$ mock_faulty_fusion_controller --device_status <code> --algo_status <code> --sensor_status <code>
Method 2:
$ python3 src/akkits/scripts/mock/faulty_fusion_controller.py --device_status <code> --algo_status <code> --sensor_status <code>
The script will send out a health status heartbeat approximately every one second that contains the specified status codes.
For example:
All
Normal
(default):$ mock_faulty_fusion_controller
DeviceStatus.OutOfStorage
,AlgoStatus.Error
,SensorStatus.None_
:$ mock_faulty_fusion_controller --device_status 2 --algo_status 3 --sensor_status 0
Faulty LiDAR Sensor#
You can simulate a faulty sensor connection by running:
Method 1 (If you have installed
akkits
package):$ mock_faulty_lidar
Method 2 (If you have clone the repository):
$ python3 src/akkits/scripts/mock/faulty_lidar.py
The script will send out a health status heartbeat approximately every one second.
Upon starting the script, heartbeats will be sent out with
SensorStatus.Normal
. At the same time, the user is prompted for an input in the terminal.If any input is received, the heartbeats will switch to
SensorStatus.LostConnection
, and the user will be prompted again.If any input is received, the heartbeats will switch back to
SensorStatus.Normal
, and the user will be prompted again.This process is repeated.
Output Tracklets Messages#
You can simulate fusion box tracklet message output by running:
Method 1 (If you have installed
akkits
package):$ mock_tracklet_message
Method 2 (If you have clone the repository):
$ python3 src/akkits/scripts/mock/replay_tracklets.py
Tools#
Recording Point Cloud#
You can record the point cloud message from fusion box by running:
Method 1 (If you have installed
akkits
package):$ record_point_cloud --ip_addr <ip/address/of/fusion/box> --ext <.pkl/or/.bin> --output_dir <path/to/store/recorded/files>
Method 2 (If you have clone the repository):
$ python3 src/akkits/scripts/tools/record_point_cloud.py --ip_addr <ip/address/of/fusion/box> --ext <.pkl/or/.bin> --output_dir <path/to/store/recorded/files>
If ext
is set to .pkl
, it stores the whole point cloud messages within a single .pkl
file at directory path specified in output_dir
argument.
If ext
is set to .bin
, it only stores the point_cloud
fields which is an array of floats in a point cloud message as a binary file at directory path specified in output_dir
argument. Multiple point cloud messages are stored into separate binary files with filenames recorded as the unix timestamp of the point cloud message. It also generates a metadata files with <unix_timstampe>_metadata.txt
. The contains the number of attributes (excluding x,y,z
) and the number of columns to reshape the binary into shape (-1, column_count)
.
Recording Tracklet#
You can record the tracklet message from fusion box by running:
Method 1 (If you have installed
akkits
package):$ record_tracklet --ip_addr <ip/address/of/fusion/box> --output_dir <path/to/store/recorded/file>
Method 2 (If you have clone the repository):
$ python3 src/akkits/scripts/tools/record_tracklet.py --ip_addr <ip/address/of/fusion/box> --output_dir <path/to/store/recorded/file>
Replay Tracklet#
You can replay the recorded tracklet message from fusion box by running:
Method 1 (If you have installed
akkits
package):$ mock_tracklet_message --ip_addr <targeted/ip/address> --output_dir <path/storing/recorded/file>
Method 2 (If you have clone the repository):
$ python3 src/akkits/scripts/tools/replay_tracklets.py --ip_addr <targeted/ip/address> --output_dir <path/to/storing/recorded/file>