|
|
@@ -0,0 +1,357 @@
|
|
|
+"""
|
|
|
+Image and matrix manipulation library blocks for
|
|
|
+OpenCV, Numpy and an Intel Realsense D455 Camera
|
|
|
+"""
|
|
|
+import pyrealsense2 as rs
|
|
|
+import numpy as np
|
|
|
+import cv2
|
|
|
+
|
|
|
+from CBD.CBD import BaseBlock
|
|
|
+
|
|
|
+class Pipeline(BaseBlock):
|
|
|
+ def __init__(self, block_name, width=640, height=480, fps=30):
|
|
|
+ BaseBlock.__init__(self, block_name, [], ["image", "depth", "dscale"])
|
|
|
+ self.size = width, height
|
|
|
+ self.pipeline = rs.pipeline()
|
|
|
+ self.config = rs.config()
|
|
|
+ self.config.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps)
|
|
|
+ self.config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
|
|
|
+
|
|
|
+ self.profile = self.pipeline.start(self.config)
|
|
|
+
|
|
|
+ # Getting the depth sensor's depth scale (see rs-align example for explanation)
|
|
|
+ depth_sensor = self.profile.get_device().first_depth_sensor()
|
|
|
+ self.depth_scale = depth_sensor.get_depth_scale()
|
|
|
+
|
|
|
+ align_to = rs.stream.color
|
|
|
+ self.align = rs.align(align_to)
|
|
|
+
|
|
|
+ # Warmup and focus
|
|
|
+ for i in range(10):
|
|
|
+ self.pipeline.wait_for_frames()
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ frames = self.pipeline.wait_for_frames()
|
|
|
+ aligned_frames = self.align.process(frames)
|
|
|
+
|
|
|
+ aligned_depth_frame = aligned_frames.get_depth_frame()
|
|
|
+ color_frame = aligned_frames.get_color_frame()
|
|
|
+
|
|
|
+ depth_image = np.asanyarray(aligned_depth_frame.get_data())
|
|
|
+ color_image = np.asanyarray(color_frame.get_data())
|
|
|
+
|
|
|
+ self.appendToSignal(color_image, "image")
|
|
|
+ self.appendToSignal(depth_image, "depth")
|
|
|
+
|
|
|
+
|
|
|
+class ClippingFilter(BaseBlock):
|
|
|
+ def __init__(self, block_name, close, far, scale):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+ self.close = close / scale
|
|
|
+ self.far = far / scale
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ depth_image = self.getInputSignal(curIteration).value
|
|
|
+ depth_image_3d = np.dstack((depth_image, depth_image, depth_image)) # depth image is 1 channel, color is 3 channels
|
|
|
+ white = white = 255 * np.ones(depth_image_3d.shape, np.uint8)
|
|
|
+ bg_removed = np.where((depth_image_3d > self.far) | (depth_image_3d < self.close), 0, white)
|
|
|
+
|
|
|
+ self.appendToSignal(bg_removed)
|
|
|
+
|
|
|
+
|
|
|
+class Blur(BaseBlock):
|
|
|
+ def __init__(self, block_name, ksize):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+ self.ksize = ksize
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ blurred = cv2.blur(img, self.ksize)
|
|
|
+
|
|
|
+ self.appendToSignal(blurred)
|
|
|
+
|
|
|
+
|
|
|
+class Dilate(BaseBlock):
|
|
|
+ def __init__(self, block_name, ksize, iterations):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+ self.iterations = iterations
|
|
|
+ self.kernel = np.ones(ksize, np.uint8)
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ dilation = cv2.dilate(img, self.kernel, iterations=self.iterations)
|
|
|
+
|
|
|
+ self.appendToSignal(dilation)
|
|
|
+
|
|
|
+
|
|
|
+class Erode(BaseBlock):
|
|
|
+ def __init__(self, block_name, ksize, iterations):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+ self.iterations = iterations
|
|
|
+ self.kernel = np.ones(ksize, np.uint8)
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ dilation = cv2.erode(img, self.kernel, iterations=self.iterations)
|
|
|
+
|
|
|
+ self.appendToSignal(dilation)
|
|
|
+
|
|
|
+
|
|
|
+class ColorConvert(BaseBlock):
|
|
|
+ def __init__(self, block_name, code):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+ self.code = code
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ img = cv2.cvtColor(img, self.mode)
|
|
|
+ self.appendToSignal(img)
|
|
|
+
|
|
|
+
|
|
|
+class FindContours(BaseBlock):
|
|
|
+ def __init__(self, block_name, mode, method):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["contours", "hierarchy"])
|
|
|
+ self.mode = mode
|
|
|
+ self.method = method
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ contours, hierarchy = cv2.findContours(img, self.mode, self.method)
|
|
|
+ self.appendToSignal(contours, "contours")
|
|
|
+ self.appendToSignal(hierarchy, "hierarchy")
|
|
|
+
|
|
|
+
|
|
|
+class MergeContours(BaseBlock):
|
|
|
+ def __init__(self, block_name):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["merged", "center"])
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ contours = self.getInputSignal(curIteration).value
|
|
|
+
|
|
|
+ points = []
|
|
|
+ for idx, c in enumerate(contours):
|
|
|
+ for pt in c:
|
|
|
+ points.append(pt[0])
|
|
|
+ points = np.array(points)
|
|
|
+ tl = min(points[:,0]), min(points[:,1])
|
|
|
+ br = max(points[:,0]), max(points[:,1])
|
|
|
+ center = tl[0] + (br[0] - tl[0]) // 2, tl[1] + (br[1] - tl[1]) // 2
|
|
|
+
|
|
|
+ ctrs = sorted(points, key=lambda x, o=center: self.clockwiseangle_and_distance(x, o))
|
|
|
+ ctr = np.array(ctrs).reshape((-1, 1, 2)).astype(np.int32)
|
|
|
+
|
|
|
+ self.appendToSignal(ctr, "merged")
|
|
|
+ self.appendToSignal(np.asarray(center), "center")
|
|
|
+
|
|
|
+ def clockwiseangle_and_distance(point, origin):
|
|
|
+ # Based on: https://stackoverflow.com/a/41856340
|
|
|
+ refvec = np.array([0, 1])
|
|
|
+ # Vector between point and the origin: v = p - o
|
|
|
+ vector = point - origin
|
|
|
+ # Length of vector: ||v||
|
|
|
+ lenvector = np.linalg.norm(vector)
|
|
|
+ # If length is zero there is no angle
|
|
|
+ if lenvector == 0:
|
|
|
+ return -np.pi, 0
|
|
|
+ # Normalize vector: v/||v||
|
|
|
+ normalized = vector / lenvector
|
|
|
+ dotprod = np.dot(normalized, refvec)
|
|
|
+ diffprod = refvec[1]*normalized[0] - refvec[0]*normalized[1] # x1*y2 - y1*x2
|
|
|
+ angle = np.arctan2(diffprod, dotprod)
|
|
|
+ # Negative angles represent counter-clockwise angles so we need to subtract them
|
|
|
+ # from 2*pi (360 degrees)
|
|
|
+ if angle < 0:
|
|
|
+ return 2 * np.pi + angle, lenvector
|
|
|
+ # I return first the angle because that's the primary sorting criterium
|
|
|
+ # but if two vectors have the same angle then the shorter distance should come first.
|
|
|
+ return angle, lenvector
|
|
|
+
|
|
|
+
|
|
|
+class DrawPoint(BaseBlock):
|
|
|
+ def __init__(self, block_name, color=(0, 0, 0), size=3):
|
|
|
+ BaseBlock.__init__(self, block_name, ["image", "position"], [])
|
|
|
+ self.size = size
|
|
|
+ self.color = color
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration, "image").value
|
|
|
+ pos = self.getInputSignal(curIteration, "position").value
|
|
|
+
|
|
|
+ cv2.circle(img, (pos[0], pos[1]), self.size, self.color, -1)
|
|
|
+
|
|
|
+
|
|
|
+class UnitVector(BaseBlock):
|
|
|
+ def __init__(self, block_name):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1", "IN2"], ["vector", "angle"])
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ p1 = self.getInputSignal(curIteration, "IN1").value
|
|
|
+ p2 = self.getInputSignal(curIteration, "IN2").value
|
|
|
+
|
|
|
+ v = (p1 - p2)
|
|
|
+ d = np.linalg.norm(v)
|
|
|
+ if d > 0:
|
|
|
+ v /= d
|
|
|
+ v = np.intp(v)
|
|
|
+ self.appendToSignal(v, "vector")
|
|
|
+ self.appendToSignal(np.arctan2(v[1], v[0]), "angle")
|
|
|
+ else:
|
|
|
+ self.appendToSignal(v, "vector")
|
|
|
+ self.appendToSignal(0.0, "angle")
|
|
|
+
|
|
|
+
|
|
|
+class UnitVectorFromAngle(BaseBlock):
|
|
|
+ def __init__(self, block_name):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], ["OUT1"])
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ ang = self.getInputSignal(curIteration).value
|
|
|
+ pnt = np.cos(ang), np.sin(ang)
|
|
|
+ self.appendToSignal(np.asarray(pnt))
|
|
|
+
|
|
|
+
|
|
|
+class DrawScaledUnitVector(BaseBlock):
|
|
|
+ def __init__(self, block_name, color=(0, 0, 0), length=10, width=1, tip=0.3):
|
|
|
+ BaseBlock.__init__(self, block_name, ["image", "point", "vector"], [])
|
|
|
+ self.length = length
|
|
|
+ self.color = color
|
|
|
+ self.width = width
|
|
|
+ self.tip = tip
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration, "image").value
|
|
|
+ pnt = self.getInputSignal(curIteration, "point").value
|
|
|
+ vec = self.getInputSignal(curIteration, "vector").value
|
|
|
+
|
|
|
+ target = (pnt + vec) * self.length
|
|
|
+ cv2.arrowedLine(img, pnt, (target[0], target[1]), self.color, self.width, tipLength=self.tip)
|
|
|
+
|
|
|
+
|
|
|
+class DrawBestFitContours(BaseBlock):
|
|
|
+ def __init__(self, block_name, color=(0, 0, 0), width=1, perc=0.01):
|
|
|
+ BaseBlock.__init__(self, block_name, ["image", "contour"], [])
|
|
|
+ self.color = color
|
|
|
+ self.width = width
|
|
|
+ self.perc = perc
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration, "image").value
|
|
|
+ ctr = self.getInputSignal(curIteration, "contour").value
|
|
|
+
|
|
|
+ approx = cv2.approxPolyDP(ctr, self.perc * cv2.arcLength(ctr, True), True)
|
|
|
+ if approx is not None:
|
|
|
+ for i, c in enumerate(approx):
|
|
|
+ a = approx[i - 1][0]
|
|
|
+ b = c[0]
|
|
|
+ cv2.line(img, (a[0], a[1]), (b[0], b[1]), self.color, self.width)
|
|
|
+
|
|
|
+
|
|
|
+class DrawBestFitRotatedRect(BaseBlock):
|
|
|
+ def __init__(self, block_name, color=(0, 0, 0), width=1):
|
|
|
+ BaseBlock.__init__(self, block_name, ["image", "contour"], [])
|
|
|
+ self.color = color
|
|
|
+ self.width = width
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration, "image").value
|
|
|
+ ctr = self.getInputSignal(curIteration, "contour").value
|
|
|
+
|
|
|
+ rect = cv2.minAreaRect(ctr)
|
|
|
+ box = cv2.boxPoints(rect)
|
|
|
+ box = np.intp(box)
|
|
|
+ cv2.drawContours(img, [box], 0, self.color, self.width)
|
|
|
+
|
|
|
+
|
|
|
+from CBD.lib.std import DelayBlock
|
|
|
+class ConditionalDelayBlock(DelayBlock):
|
|
|
+ def __init__(self, block_name, condition=lambda x, v: x >= v):
|
|
|
+ DelayBlock.__init__(self, block_name)
|
|
|
+ self.condition = condition
|
|
|
+ self.value = None
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ if curIteration == 0:
|
|
|
+ self.appendToSignal(self.getInputSignal(curIteration, "IC").value)
|
|
|
+ elif curIteration == 1:
|
|
|
+ self.value = self.getInputSignal(curIteration - 1, "IN1").value
|
|
|
+ self.appendToSignal(self.value)
|
|
|
+ else:
|
|
|
+ self.appendToSignal(self.value)
|
|
|
+ cur = self.getInputSignal(curIteration, "IN1").value
|
|
|
+ if self.condition(cur, self.value):
|
|
|
+ self.value = cur
|
|
|
+
|
|
|
+
|
|
|
+from pykalman import KalmanFilter
|
|
|
+class KalmanFilterBlock(BaseBlock):
|
|
|
+ def __init__(self, block_name, initial=(0, 0, 0)):
|
|
|
+ BaseBlock.__init__(self, block_name, ["position", "heading"], ["pred_position, pred_heading"])
|
|
|
+ self.seen = np.asarray([initial, initial])
|
|
|
+ MarkedMeasure = np.ma.masked_less(self.seen, 0)
|
|
|
+ Transition_Matrix = [[1,0,0,1,0,0],[0,1,0,0,1,0],[0,0,1,0,0,1],[0,0,0,1,0,0],[0,0,0,0,1,0],[0,0,0,0,0,1]]
|
|
|
+ Observation_Matrix = [[1,0,0,0,0,0],[0,1,0,0,0,0],[0,0,1,0,0,0]]
|
|
|
+ xinit = MarkedMeasure[0,0]
|
|
|
+ yinit = MarkedMeasure[0,1]
|
|
|
+ winit = MarkedMeasure[0,2]
|
|
|
+ vxinit = MarkedMeasure[1,0]-MarkedMeasure[0,0]
|
|
|
+ vyinit = MarkedMeasure[1,1]-MarkedMeasure[0,1]
|
|
|
+ vwinit = MarkedMeasure[1,2]-MarkedMeasure[0,2]
|
|
|
+ initstate = [xinit, yinit, winit, vxinit, vyinit, vwinit]
|
|
|
+ initcovariance = 1.0e-3 * np.eye(6)
|
|
|
+ transistionCov = 1.0e-4 * np.eye(6)
|
|
|
+ observationCov = 1.0e-1 * np.eye(3)
|
|
|
+ self.kf = KalmanFilter(transition_matrices=Transition_Matrix,
|
|
|
+ observation_matrice =Observation_Matrix,
|
|
|
+ initial_state_mean=initstate,
|
|
|
+ initial_state_covariance=initcovariance,
|
|
|
+ transition_covariance=transistionCov,
|
|
|
+ observation_covariance=observationCov)
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ pnt = self.getInputSignal(curIteration, "position").value
|
|
|
+ hdn = self.getInputSignal(curIteration, "heading").value
|
|
|
+
|
|
|
+ self.seen = np.vstack((self.seen, np.asarray([pnt[0], pnt[1], hdn])))
|
|
|
+ next_mean, _ = self.kf.smooth(np.asarray(self.seen))
|
|
|
+ center = int(next_mean[-1, 0]), int(next_mean[-1, 1])
|
|
|
+ eh = next_mean[-1,2]
|
|
|
+
|
|
|
+ self.appendToSignal(center, "position")
|
|
|
+ self.appendToSignal(eh, "heading")
|
|
|
+
|
|
|
+
|
|
|
+class HStack(BaseBlock):
|
|
|
+ def __init__(self, block_name, N=2):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN{}".format(n) for n in range(N)], ["OUT1"])
|
|
|
+ self.N = N
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ imgs = []
|
|
|
+ for i in range(self.N):
|
|
|
+ imgs.append(self.getInputSignal(curIteration, "IN{}".format(i)).value)
|
|
|
+ self.appendToSignal(np.hstack(tuple(imgs)))
|
|
|
+
|
|
|
+
|
|
|
+class VStack(BaseBlock):
|
|
|
+ def __init__(self, block_name, N=2):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN{}".format(n) for n in range(N)], ["OUT1"])
|
|
|
+ self.N = N
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ imgs = []
|
|
|
+ for i in range(self.N):
|
|
|
+ imgs.append(self.getInputSignal(curIteration, "IN{}".format(i)).value)
|
|
|
+ self.appendToSignal(np.vstack(tuple(imgs)))
|
|
|
+
|
|
|
+
|
|
|
+class Show(BaseBlock):
|
|
|
+ def __init__(self, block_name, title):
|
|
|
+ BaseBlock.__init__(self, block_name, ["IN1"], [])
|
|
|
+ self.title = title
|
|
|
+
|
|
|
+ def compute(self, curIteration):
|
|
|
+ img = self.getInputSignal(curIteration).value
|
|
|
+ cv2.namedWindow(self.title, cv2.WINDOW_AUTOSIZE)
|
|
|
+ cv2.imshow(self.title, img)
|
|
|
+
|
|
|
+
|