Files
PX4-Autopilot/msg/tools/uorb_rtps_classifier.py
T
TSC21 c478e2985a microRTPS: simplify the attribution of the RTPS IDs by makiing it automatic
1. The RTPS IDs are now automatically assigned to the topics
2. Only the topics that get defined to be sent or received in the urtps_bridge_topics.yaml (renamed, since now it doesn't contain IDs) receive the IDs
3. Any addition or removal on the urtps_bridge_topics.yaml file might update the topic IDs - this will require that the agent and the client ID list has to be in sync. This will further require a robustification of the way we check the IDs and the message definitions when starting the bridge.
2021-08-12 08:44:53 +02:00

187 lines
8.3 KiB
Python

#!/usr/bin/env python3
################################################################################
#
# Copyright (c) 2018-2021 PX4 Development Team. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
################################################################################
import argparse
import difflib
import errno
import os
from typing import Dict, List, Tuple
import yaml
class Classifier():
"""Class to classify RTPS msgs as to send, receive and set their IDs."""
def __init__(self, yaml_file, msg_folder) -> None:
self.msg_folder = msg_folder
self.msg_map = self.parse_yaml_msgs_file(yaml_file)
# Check if base types are defined correctly
self.check_base_type()
# Get messages to send and to receive
self.msgs_to_send: Dict[str, int] = dict()
self.msgs_to_receive: Dict[str, int] = dict()
self.alias_msgs_to_send: List[Tuple[str, str]] = []
self.alias_msgs_to_receive: List[Tuple[str, str]] = []
self.msg_id_map: List[str] = []
# Create message map
self.setup_msg_map()
self.msg_files_send = self.set_msg_files_send()
self.msg_files_receive = self.set_msg_files_receive()
def setup_msg_map(self) -> None:
"""Setup dictionary with an ID map for the messages."""
for topic in self.msg_map['rtps']:
if 'send' in list(topic.keys()):
if 'base' in list(topic.keys()):
self.alias_msgs_to_send.append(
(topic['msg'], topic['base']))
else:
self.msgs_to_send.update({topic['msg']: 0})
if 'receive' in list(topic.keys()):
if 'base' in list(topic.keys()):
self.alias_msgs_to_receive.append(
(topic['msg'], topic['base']))
else:
self.msgs_to_receive.update({topic['msg']: 0})
self.msg_id_map.append(topic['msg'])
def set_msg_files_send(self) -> list:
"""
Append the path to the files which messages are marked to
be sent.
"""
return [os.path.join(self.msg_folder, msg + ".msg")
for msg in list(self.msgs_to_send.keys())]
def set_msg_files_receive(self) -> list:
"""
Append the path to the files which messages are marked to
be received.
"""
return [os.path.join(self.msg_folder, msg + ".msg")
for msg in list(self.msgs_to_receive.keys())]
def check_base_type(self) -> None:
"""Check if alias message has correct base type."""
registered_alias_msgs = list(
topic['base'] for topic in self.msg_map['rtps'] if 'base' in list(topic.keys()))
base_types = []
for topic in self.msg_map['rtps']:
if 'base' not in list(topic.keys()):
base_types.append(topic['msg'])
incorrect_base_types = list(
set(registered_alias_msgs) - set(base_types))
base_types_suggestion = {}
for incorrect in incorrect_base_types:
base_types_suggestion.update({incorrect: difflib.get_close_matches(
incorrect, base_types, n=1, cutoff=0.6)})
if len(base_types_suggestion) > 0:
raise AssertionError(
('\n' + '\n'.join('\t- The multi-topic message base type \'{}\' does not exist.{}'.format(k, (' Did you mean \'' + v[0] + '\'?' if v else '')) for k, v in list(base_types_suggestion.items()))))
@staticmethod
def parse_yaml_msgs_file(yaml_file) -> dict:
"""Parses a yaml file into a dict."""
try:
with open(yaml_file, 'r') as file:
return yaml.safe_load(file)
except OSError as err:
if err.errno == errno.ENOENT:
raise IOError(errno.ENOENT, os.strerror(
errno.ENOENT), yaml_file)
raise
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--send", dest='send',
action="store_true", help="Get topics to be sent")
parser.add_argument("-a", "--alias", dest='alias',
action="store_true", help="Get alias topics")
parser.add_argument("-r", "--receive", dest='receive',
action="store_true", help="Get topics to be received")
parser.add_argument("-i", "--ignore", dest='ignore',
action="store_true", help="Get topics to be ignored")
parser.add_argument("-p", "--path", dest='path',
action="store_true", help="Get msgs and its paths")
parser.add_argument("-m", "--topic-msg-dir", dest='msgdir', type=str,
help="Topics message dir, by default msg/", default="msg")
parser.add_argument("-y", "--rtps-ids-file", dest='yaml_file', type=str,
help="RTPS msg IDs definition file absolute path, by default use relative path to msg, tools/urtps_bridge_topics.yaml",
default='tools/urtps_bridge_topics.yaml')
# Parse arguments
args = parser.parse_args()
msg_dir = args.msgdir
if args.msgdir == 'msg':
msg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
else:
msg_dir = os.path.abspath(args.msgdir)
classifier = (Classifier(os.path.abspath(args.yaml_file), msg_dir) if os.path.isabs(args.yaml_file)
else Classifier(os.path.join(msg_dir, args.yaml_file), msg_dir))
if args.send:
if args.path:
print(('send files: ' + ', '.join(str(msg_file)
for msg_file in classifier.msg_files_send) + '\n'))
else:
if args.alias:
print((', '.join(str(msg)
for msg in sorted(classifier.msgs_to_send)) + (' alias ' + ', '.join(msg[0]
for msg in classifier.alias_msgs_to_send) if len(classifier.alias_msgs_to_send) > 0 else '') + '\n'))
else:
print((', '.join(str(msg)
for msg in sorted(classifier.msgs_to_send))))
if args.receive:
if args.path:
print(('receive files: ' + ', '.join(str(msg_file)
for msg_file in classifier.msg_files_receive) + '\n'))
else:
if args.alias:
print((', '.join(str(msg)
for msg in sorted(classifier.msgs_to_receive)) + (' alias ' + ', '.join(msg[0]
for msg in classifier.alias_msgs_to_receive) if len(classifier.alias_msgs_to_receive) > 0 else '') + '\n'))
else:
print((', '.join(str(msg)
for msg in sorted(classifier.msgs_to_receive))))