# -*- coding: utf-8 -*- """ Created on 23 Jan 2022 @author: Edward Hage """ import numpy as np import cv2 import struct from dds import Listener, DomainParticipant, Topic, QosProvider, Qos from dds import PresentationQosPolicy, PartitionQosPolicy, DDSPresentationAccessScopeKind, DurabilityQosPolicy,DDSDurabilityKind from idlpython.sensor_msgs.Image_.sensor_msgs.msg.dds_ import Image___TypeSupport class ImageReader(): class ImageListener(Listener): def __init__(self): Listener.__init__(self) def on_data_available(self, entity): l = entity.take(1) print("here") for (sd, si) in l: current_frame = self.convert(sd) clipmin = 0.3 clipmax = 2.0 new_frame = current_frame.copy() np.clip(new_frame, clipmin, clipmax, out=new_frame) new_frame -= clipmin new_frame /= (clipmax-clipmin)/255.0 new_frame = new_frame.astype(dtype=np.uint8, copy=False) im_color = cv2.applyColorMap(new_frame, cv2.COLORMAP_AUTUMN) cv2.imshow("camera", current_frame) # of new_frame cv2.waitKey(1) # 1 for automatic update def convert(self, img_msg): # dtype, n_channels = "32F", 1 # hardcoded data from Unity # dtype = np.dtype(dtype) # dtype = dtype.newbyteorder('>' if img_msg.is_bigendian_ else '<') # print( dtype) # print("totsize = " + str(len(img_msg.data_)) + " height = " + str(img_msg.height_) + " width " + str(img_msg.width_)) dt = np.dtype('float32') dt = dt.newbyteorder('<') #im = np.ndarray(shape=(img_msg.height_, img_msg.width_, 1), dtype = 'float32', buffer = bytearray(img_msg.data_)) im = np.ndarray(shape=(img_msg.height_, img_msg.width_, 1), dtype = dt, buffer = bytearray(img_msg.data_)) return im listener = ImageListener() def __init__(self, topicname): qp = QosProvider('file://DDS_ROSQoS.xml', 'DDS ROSQosProfile') dp = DomainParticipant(qos = qp.get_participant_qos()) sub = dp.create_subscriber(Qos([PresentationQosPolicy(DDSPresentationAccessScopeKind.INSTANCE, False, False), PartitionQosPolicy(['']), DurabilityQosPolicy(DDSDurabilityKind.VOLATILE)])) topic = Topic(dp, topicname, Image___TypeSupport(), qp.get_topic_qos(), None) self.reader = sub.create_datareader(topic, qp.get_reader_qos(), self.listener) reader = ImageReader("rt/depth_processor") input("Press a key to stop ...")