read_colorsensor.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 23 Jan 2022
  4. @author: Edward Hage
  5. """
  6. import numpy as np
  7. from dds import Listener, DomainParticipant, Topic, QosProvider, Qos
  8. from dds import PresentationQosPolicy, PartitionQosPolicy, DDSPresentationAccessScopeKind, DurabilityQosPolicy,DDSDurabilityKind
  9. from idlpython.std_msgs.Float32_.std_msgs.msg.dds_ import Float32___TypeSupport
  10. import time
  11. class ColorReader():
  12. class Float32Listener(Listener):
  13. def __init__(self):
  14. Listener.__init__(self)
  15. def on_data_available(self, entity):
  16. l = entity.take(10)
  17. for (sd, si) in l:
  18. #sd.print_vars()
  19. print("red sensor value: " + str(sd.data_))
  20. listener = Float32Listener()
  21. def __init__(self, topicname):
  22. print("in init")
  23. qp = QosProvider('file://DDS_ROSQoS.xml', 'DDS ROSQosProfile')
  24. dp = DomainParticipant(qos = qp.get_participant_qos())
  25. sub = dp.create_subscriber(Qos([PresentationQosPolicy(DDSPresentationAccessScopeKind.INSTANCE, False, False),
  26. PartitionQosPolicy(['']),
  27. DurabilityQosPolicy(DDSDurabilityKind.VOLATILE)]))
  28. topic = Topic(dp, topicname, Float32___TypeSupport(), qp.get_topic_qos(), None)
  29. self.reader = sub.create_datareader(topic, qp.get_reader_qos(), self.listener)
  30. reader = ColorReader("rt/redsensor")
  31. input("Press a key to stop ...")