import Vortex import rclpy from rclpy.node import Node from std_msgs.msg import Float64 import sys # create a ROS node class VortexRosSubscribingNode(Node): def __init__(self, nodeName): super().__init__(nodeName) pass # create a subscriber and register it to the given node class Subscriber(): def __init__(self, node): self.subscriber_1 = node.create_subscription(Float64, '/throttle', self.listener_callback_1, 10) self.subscriber_2 = node.create_subscription(Float64, '/steering', self.listener_callback_2, 10) self.subscriber_1 # prevent unused variable warning self.subscriber_2 # prevent unused variable warning self.throttle = 0.0 self.steering = 0.0 pass def listener_callback_1(self, msg): self.throttle = msg.data pass def listener_callback_2(self, msg): self.steering = msg.data pass # called once by the vortex dynamics module when the simulation is started def on_simulation_start(extension): rclpy.init(args=sys.argv) extension.node = VortexRosSubscribingNode("vortex_subscriber") extension.subscriber = Subscriber(extension.node) pass # called once by the vortex dynamics module when the simulation is stopped def on_simulation_stop(extension): extension.node.destroy_node() rclpy.shutdown() pass # called every simulation steps by the vortex dynamics module during simulation def pre_step(extension): rclpy.spin_once(extension.node, executor=None, timeout_sec=0.0) # timeout == 0.0 so that spin_once do not wait for a task to be ready # make sure that this script has two output fields of type float and # use the connection editor of the Vortex Editor to feed the subscribed value to a vortex property. if extension.getOutput("Throttle") is not None: extension.getOutput("Throttle").value = extension.subscriber.throttle if extension.getOutput("Steering") is not None: extension.getOutput("Steering").value = extension.subscriber.steering pass