| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- # -*- coding: utf-8 -*-
- """
- Created on 23 Jan 2022
- @author: Edward Hage
- """
- import numpy as np
- import cv2
- import struct
- from dds import Listener, DomainParticipant, Topic, QosProvider, Qos
- from dds import PresentationQosPolicy, PartitionQosPolicy, DDSPresentationAccessScopeKind, DurabilityQosPolicy,DDSDurabilityKind
- from idlpython.sensor_msgs.Image_.sensor_msgs.msg.dds_ import Image___TypeSupport
- class ImageReader():
-
- class ImageListener(Listener):
-
- def __init__(self):
- Listener.__init__(self)
-
- def on_data_available(self, entity):
- l = entity.take(1)
- print("here")
-
- for (sd, si) in l:
- current_frame = self.convert(sd)
-
- clipmin = 0.3
- clipmax = 2.0
-
- new_frame = current_frame.copy()
- np.clip(new_frame, clipmin, clipmax, out=new_frame)
- new_frame -= clipmin
- new_frame /= (clipmax-clipmin)/255.0
- new_frame = new_frame.astype(dtype=np.uint8, copy=False)
- im_color = cv2.applyColorMap(new_frame, cv2.COLORMAP_AUTUMN)
-
- cv2.imshow("camera", current_frame) # of new_frame
- cv2.waitKey(1) # 1 for automatic update
-
- def convert(self, img_msg):
- # dtype, n_channels = "32F", 1 # hardcoded data from Unity
- # dtype = np.dtype(dtype)
- # dtype = dtype.newbyteorder('>' if img_msg.is_bigendian_ else '<')
- # print( dtype)
- # print("totsize = " + str(len(img_msg.data_)) + " height = " + str(img_msg.height_) + " width " + str(img_msg.width_))
-
- dt = np.dtype('float32')
- dt = dt.newbyteorder('<')
- #im = np.ndarray(shape=(img_msg.height_, img_msg.width_, 1), dtype = 'float32', buffer = bytearray(img_msg.data_))
-
- im = np.ndarray(shape=(img_msg.height_, img_msg.width_, 1), dtype = dt, buffer = bytearray(img_msg.data_))
- return im
- listener = ImageListener()
-
- def __init__(self, topicname):
- qp = QosProvider('file://DDS_ROSQoS.xml', 'DDS ROSQosProfile')
- dp = DomainParticipant(qos = qp.get_participant_qos())
- sub = dp.create_subscriber(Qos([PresentationQosPolicy(DDSPresentationAccessScopeKind.INSTANCE, False, False),
- PartitionQosPolicy(['']),
- DurabilityQosPolicy(DDSDurabilityKind.VOLATILE)]))
- topic = Topic(dp, topicname, Image___TypeSupport(), qp.get_topic_qos(), None)
- self.reader = sub.create_datareader(topic, qp.get_reader_qos(), self.listener)
- reader = ImageReader("rt/depth_processor")
- input("Press a key to stop ...")
|