rmq_publisher.py 4.89 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, Vector3
from std_msgs.msg import Bool
from sensor_msgs.msg import LaserScan
from ackermann_msgs.msg import AckermannDrive
import datetime
import json
import base64
import pika

from msg_utils.msg_conversion import msg_to_dict

# import msg_utils as util


class RabbitMQPublisher(Node):
    def __init__(self):
        super().__init__("rabbitmq_topic_forwarder")

        self.init_rabbitmq()

        self.config = {"prefix_topic_name": True, "group_values_by_topic": True}

        self.forward_this = {
            "cmd_simple_control": {"type": Vector3, "fields": ["x", "y", "z"]},
            "cmd_vel": {"type": Twist, "fields": ["linear.y", "angular.z"]},
            "em_stop": {"type": Bool, "fields": ["data"]},
            "scan": {"type": LaserScan, "fields": ["ranges"]},
            "move_cmd": {"type": AckermannDrive, "fields": ["steering_angle", "speed"]},
        }

        self.topic_subscriptions_ = {}
        for topic_name, topic_config in self.forward_this.items():
            topic_msg_type = topic_config["type"]
            field_list = topic_config["fields"]

            self.topic_subscriptions_[topic_name] = self.create_subscription(
                topic_msg_type,
                topic_name,
                callback=self.generate_cb(topic_name, field_list),
                qos_profile=10
            )
            self.rmq_channel.queue_declare(queue=topic_name)
            self.get_logger().info(f"Initialized forwarding of topic '{topic_name}'")

    def generate_cb(self, topic_name, fields_to_forward):
        return lambda msg: self.on_msg_received(msg, topic_name, fields_to_forward)

    def get_current_time(self):
        now = datetime.datetime.now(datetime.timezone.utc)
        now_rounded = now.replace(microsecond=int(now.microsecond / 100000) * 100000)

        return now_rounded

    def on_msg_received(self, msg, topic_name, fields_to_forward):
        msg_dict = msg_to_dict(msg)
        # msg_dict = util.msg_conversion.msg_to_dict(msg)

        values_to_forward = {}
        for field_name in fields_to_forward:
            field_value = None

            # Extract field value from dict
            if "." in field_name:  # dot indicates nested field
                segments = field_name.split(".")
                field_value = msg_dict
                for segment in segments:
                    field_value = field_value[segment]
            else:
                field_value = msg_dict[field_name]

            # Construct and publish json message with field data
            if field_value is not None:
                values_to_forward[field_name] = field_value

        time = self.get_current_time()
        timestamp = time.isoformat()

        if self.config["prefix_topic_name"]:
            values_to_forward = {
                topic_name + "." + fname: fvalue
                for fname, fvalue in values_to_forward.items()
            }

        if len(values_to_forward) > 0:
            if self.config["group_values_by_topic"]:
                message = {fname: fvalue for fname, fvalue in values_to_forward.items()}
                message["time"] = timestamp
                message_json = json.dumps(message)

                print(f"sending message {message}")
                self.publish_to_rmq(message_json)
            else:
                for fname, fvalue in values_to_forward.items():
                    message = {"time": timestamp, fname: fvalue}
                    message_json = json.dumps(message)

                    print(f"sending message {message}")
                    self.publish_to_rmq(message_json)

    def publish_to_rmq(self, msg):
        self.rmq_channel.basic_publish(
            exchange="fmi_digital_twin", routing_key="desktoprobotti", body=msg
        )

    def init_rabbitmq(self):
        try:
            self.rmq_connection = pika.BlockingConnection(
                pika.ConnectionParameters("localhost")
            )
            self.rmq_channel = self.rmq_connection.channel()

            print("Declaring exchange")
            self.rmq_channel.exchange_declare(
                exchange="fmi_digital_twin", exchange_type="direct"
            )

            print("Creating queue")
            result = self.rmq_channel.queue_declare(queue="", exclusive=True)
            queue_name = result.method.queue

            # print("Queue name:")
            # print(queue_name)

            self.rmq_channel.queue_bind(
                exchange="fmi_digital_twin",
                queue=queue_name,
                routing_key="desktoprobotti",
            )
        except Exception:
            print("RabbitMQ Initialization failed. Is rabbitmq running?")
            print("Exiting")
            exit()


def main(args=None):
    rclpy.init(args=args)
    rmq_node = RabbitMQPublisher()
    rclpy.spin(rmq_node)
    rclpy.shutdown()


if __name__ == "__main__":
    main()