Usage#

This section describes how to use the SDK.

Please complete all the setup steps given in Setup if you haven’t done so.

Examples#

Example scripts to demonstrate the use of our SDK.

Single Upstream (one-to-one)#

This is the simplest scenario where you are only interested in messages from a single upstream process.

Run the following steps for a demo:

  • Method 1:

    1. In a terminal, run sender_health

    2. In another terminal, run receiver_health

  • Method 2:

    1. In a terminal, run python3 -m akkits.scripts.examples.sender_health

    2. In another terminal, run python3 -m akkits.scripts.examples.receiver_health

Tip

You can also run

  • sender_tracklet + receiver_tracklet

  • sender_point_cloud + receiver_point_cloud

Multiple Upstreams (many-to-one)#

This is a scenario where you are interested in messages from multiple upstream processes (tracklets, health status, etc).

Here, we use a zmq.Poller so that we can effectively receive data from multiple sockets / publishers without resorting to using a sleep function which might lead to busy-loop. See this and this for more explanations.

Run the following steps for a demo:

  • Method 1:

    1. In a terminal, run sender_health

    2. In another terminal, run sender_tracklet

    3. In another terminal, run receiver_tracklet_health

  • Method 2:

    1. In a terminal, run python3 -m akkits.scripts.examples.sender_health

    2. In another terminal, run python3 -m akkits.scripts.examples.sender_tracklet

    3. In another terminal, run python3 -m akkits.scripts.examples.receiver_tracklet_health

Mocking Fusion Box Error Codes#

Scripts that allow you to mock various faults and send out error codes. Each of these scripts will continue to run forever until a KeyboardInterrupt (Ctrl+C) is issued or it is killed.

Faulty Fusion Controller#

You can send health status messages with specific status codes for DeviceStatus, AlgoStatus, SensorStatus by running:

  • Method 1:

    $ mock_faulty_fusion_controller --device_status <code> --algo_status <code> --sensor_status <code>   
    
  • Method 2:

    $ python3 src/akkits/scripts/mock/faulty_fusion_controller.py --device_status <code> --algo_status <code> --sensor_status <code>   
    

The script will send out a health status heartbeat approximately every one second that contains the specified status codes.

For example:

  • All Normal (default):

    $ mock_faulty_fusion_controller
    
  • DeviceStatus.OutOfStorage, AlgoStatus.Error, SensorStatus.None_:

    $ mock_faulty_fusion_controller --device_status 2 --algo_status 3 --sensor_status 0   
    

Faulty LiDAR Sensor#

You can simulate a faulty sensor connection by running:

  • Method 1 (If you have installed akkits package):

    $ mock_faulty_lidar
    
  • Method 2 (If you have clone the repository):

    $ python3 src/akkits/scripts/mock/faulty_lidar.py
    

The script will send out a health status heartbeat approximately every one second.

  • Upon starting the script, heartbeats will be sent out with SensorStatus.Normal. At the same time, the user is prompted for an input in the terminal.

  • If any input is received, the heartbeats will switch to SensorStatus.LostConnection, and the user will be prompted again.

  • If any input is received, the heartbeats will switch back to SensorStatus.Normal, and the user will be prompted again.

  • This process is repeated.

Output Tracklets Messages#

You can simulate fusion box tracklet message output by running:

  • Method 1 (If you have installed akkits package):

    $ mock_tracklet_message
    
  • Method 2 (If you have clone the repository):

    $ python3 src/akkits/scripts/mock/replay_tracklets.py
    

Tools#

Recording Point Cloud#

You can record the point cloud message from fusion box by running:

  • Method 1 (If you have installed akkits package):

    $ record_point_cloud --ip_addr <ip/address/of/fusion/box> --ext <.pkl/or/.bin> --output_dir <path/to/store/recorded/files>
    
  • Method 2 (If you have clone the repository):

    $ python3 src/akkits/scripts/tools/record_point_cloud.py --ip_addr <ip/address/of/fusion/box> --ext <.pkl/or/.bin> --output_dir <path/to/store/recorded/files>
    

If ext is set to .pkl, it stores the whole point cloud messages within a single .pkl file at directory path specified in output_dir argument.

If ext is set to .bin, it only stores the point_cloud fields which is an array of floats in a point cloud message as a binary file at directory path specified in output_dir argument. Multiple point cloud messages are stored into separate binary files with filenames recorded as the unix timestamp of the point cloud message. It also generates a metadata files with <unix_timstampe>_metadata.txt. The contains the number of attributes (excluding x,y,z) and the number of columns to reshape the binary into shape (-1, column_count).

Recording Tracklet#

You can record the tracklet message from fusion box by running:

  • Method 1 (If you have installed akkits package):

    $ record_tracklet --ip_addr <ip/address/of/fusion/box> --output_dir <path/to/store/recorded/file>
    
  • Method 2 (If you have clone the repository):

    $ python3 src/akkits/scripts/tools/record_tracklet.py --ip_addr <ip/address/of/fusion/box> --output_dir <path/to/store/recorded/file>
    

Replay Tracklet#

You can replay the recorded tracklet message from fusion box by running:

  • Method 1 (If you have installed akkits package):

    $ mock_tracklet_message --ip_addr <targeted/ip/address> --output_dir <path/storing/recorded/file>
    
  • Method 2 (If you have clone the repository):

    $ python3 src/akkits/scripts/tools/replay_tracklets.py --ip_addr <targeted/ip/address> --output_dir <path/to/storing/recorded/file>