msg_conversion.py 2.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

ros_primitive_types = [
    "bool",
    "byte",
    "char",
    "int",
    "uint",
    "float",
    "double",
    "int8",
    "int16",
    "int32",
    "int64",
    "uint8",
    "uint16",
    "uint32",
    "uint64",
    "float32",
    "float64",
    "string",
]

def is_primitive(field_type):
    return (field_type in ros_primitive_types)

def is_array(field_type):
    return field_type.startswith("sequence<")

def is_nested(field_type):
    return ("/" in field_type)

def msg_to_dict(msg):
    msg_structure = msg._fields_and_field_types
    print(f"ros msg to dict: Processing message structure {str(msg_structure)}")

    msg_dict = {}
    for field_name, field_type in msg_structure.items():
        # print(f"Processing field '{field_name}', of type '{field_type}'")

        field_value = getattr(msg, field_name)
        # print(f"Field has value '{field_value}'")

        if is_primitive(field_type):
            msg_dict[field_name] = field_value

        elif is_array(field_type):
            arr_type = field_type[len("sequence<") : -1]
            if is_primitive(arr_type):
                msg_dict[field_name] = [value for value in field_value]
            else:
                # NOT TESTED!
                msg_dict[field_name] = [msg_to_dict(value) for value in field_value]

        elif is_nested(field_type):
            msg_dict[field_name] = msg_to_dict(field_value)

    print(f"ros msg to dict: Message dict is {msg_dict}")
    return msg_dict


import importlib


def dict_to_msg(msg_dict, msg_type):
    msg_structure = msg_type._fields_and_field_types
    print(f"dict to ROS msg: Processing message structure {str(msg_structure)}")

    kwargs_dict = {}

    for field_name, field_type in msg_structure.items():
        if is_primitive(field_type):
            kwargs_dict[field_name] = msg_dict[field_name]

        elif is_array(field_type):
            arr_type = field_type[len("sequence<") : -1]
            if is_primitive(arr_type):
                kwargs_dict[field_name] = [value for value in msg_dict[field_name]]
            else:
                # NOT TESTED!
                kwargs_dict[field_name] = [dict_to_msg(value, arr_type) for value in msg_dict[field_name]]

        elif is_nested(field_type):
            print(f"Field is another message type. Importing type")
            typestr = field_type.split("/")

            module_name, member = typestr[0], typestr[1]
            module = importlib.import_module(module_name)
            module_msg = getattr(module, "msg")
            type_ = getattr(module_msg, member)

            kwargs_dict[field_name] = dict_to_msg(msg_dict[field_name], type_)

    instance = msg_type(**kwargs_dict)

    print(f"dict to ROS msg: instance is {str(instance)}")
    return instance