| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- """
- Offline Tracking Simulator for the AGV.
- """
- # 1. Run the simulator normally until end of trace duration, using the csv-logs
- # - [TEST 1] Use the 'true' times for the values
- # - [TEST 2] Use the 'arrived_at' times for the values
- # - Passively obtain the system state (pseudo-checkpointing)
- # - PID component outputs
- # - odometry output [can be stored in a file for future reference]
- # 2. Whenever the error becomes too large (at time t)
- # - Terminate Simulator (termination function)
- # - Find last "good" system state
- # - store all past states (before the "good" state)
- # - Alter model variables w.r.t. last "good" system state (at time t-dt)
- # - Clear all collection units in the model
- # - Find new parameter values to ensure the correct position at time t
- # - Log/plot the parameter values
- # - Restart the simulation (back to 1.)
- import numpy as np
- import pandas as pd
- from CBD.simulator import Simulator
- from AGV import AGVVirtual
- import cv2
- from tqdm import tqdm
- import matplotlib.pyplot as plt
- EPS = 0.00001
- ERROR = 0.1
- WERR = np.pi / 2
- class TrackingSimulator:
- def __init__(self):
- self.model = None
- self.sim = None
- self.dt = 0.2
- self.setup()
- self.physical = pd.read_csv("trace_vidH.csv") # Trajectory of the recognized AGV
- self.physical["heading"] *= -1
- self.clean_path()
- self.set_state([self.physical["x"][0], self.physical["y"][0], self.physical["heading"][0]])
- self.path = list(zip(self.physical["x"], self.physical["y"], self.physical["heading"]))
- self.trace = []
- self.traces = []
- self.__offset_i = 1
- self.params = {0.0: (0.018, 0.211)}
- def setup(self):
- self.model = AGVVirtual("AGV", 0.018, 0.211, "obtained.csv", v=0.033, T=35, Kp=-0.01)
- self.sim = Simulator(self.model)
- self.sim.setDeltaT(self.dt)
- self.sim.setTerminationCondition(self.check)
- def check(self, model, curIt):
- x, y, w = self.get_state()
- time = self.sim.getTime()
- rtime = self.sim.getRelativeTime()
- off, a = self.offset2(time)
- if off > ERROR:
- return True
- if abs(round(rtime*1000) % round(self.dt*1000)) < EPS:
- self.trace.append((time, self.__offset_i, self.get_state(), off, self.get_params()))
- return False
- def run(self):
- TCP = self.model.getBlockByName("TCP")
- time = self.physical["time"][0]
- end_time = TCP.data[TCP.time_col][-1]
- try:
- back = None
- for i in tqdm(range(100)):
- if back is None:
- self.sim.run(end_time, time)
- back = self.sim.getTime()
- if len(self.traces) == 0 or len(self.traces[-1]) > 1:
- self.traces.append(self.trace[:])
- self.trace.clear()
- if back < end_time:
- # time, self.__offset_i, state, _, (r, d) = self.last_good_state()
- # if state is None: break
- now = back - 1.0
- current = self.get_state()
- if np.isnan(current[0]):
- print("WHY???!!!")
- target = self.get_target_state(now)
- self.setup()
- self.set_state(target)
- time = now
- # print(target[2])
- back = None
- # w_bad = current[2] - state[2]
- # w_good = target[2] - state[2]
- # if w_bad > w_good:
- # d *= 2
- # elif w_bad < w_good:
- # d /= 4
- # else:
- # v_bad = self.euclidean(state[0], state[1], current[0], current[1])
- # v_good = self.euclidean(state[0], state[1], target[0], target[1])
- # if v_bad < v_good:
- # r *= 2
- # else:
- # r /= 4
- # # TODO: what with "too slow" on a straight segment --> divide by zero!
- # # Given the trajectory, compute the wheel speeds
- # w = current[2] - state[2]
- # if np.isnan(w) or abs(w) < EPS:
- # v = self.euclidean(state[0], state[1], current[0], current[1])
- # else:
- # v = self.circle_radius(state, current) * abs(w)
- # phiL = (v - w * d / 2) / r
- # phiR = (v + w * d / 2) / r
- #
- # if phiL == 0 or phiR == 0 or phiL == -phiR:
- # print("HO! SHTAHP")
- # back = time
- # continue
- #
- # # Given the wheel speeds, compute r and d
- # w = target[2] - state[2]
- # if np.isnan(w) or abs(w) < EPS:
- # v = self.euclidean(state[0], state[1], target[0], target[1])
- # else:
- # v = self.circle_radius(state, target) * abs(w)
- # d2 = 2 * v / w * (phiR - phiL) / (phiR + phiL)
- # r2 = 1. / phiL * (v - w * d / 2)
- #
- # print(d2, r2)
- #
- # if np.isnan(w) or np.isnan(v) or np.isnan(r2) or np.isnan(d2) or np.isnan(phiL) or np.isnan(phiR) or np.isinf(d2) or np.isinf(r2):
- # print("HERE")
- # self.set_params(r, d)
- #
- # self.params[now] = r, d
- else:
- break
- finally:
- trd = self.params.items()
- t, r, d = [x for x, _ in trd], [x for _, (x, __) in trd], [x for _, (__, x) in trd]
- df = pd.DataFrame({"time": t, "r": r, "d": d})
- df.to_csv("tracking.csv")
- def clean_path(self):
- to_drop = []
- # dists = []
- for idx, row in self.physical.iterrows():
- subset = self.physical[self.physical["time"] <= row["time"] - 0.2]
- if len(subset) == 0: continue
- prev = subset.iloc[-1]
- dist = self.euclidean(prev["x"], prev["y"], row["x"], row["y"])
- # dists.append(dist)
- # REMOVE NOISE
- if dist > 0.0125:
- to_drop.append(idx)
- self.physical.drop(to_drop, inplace=True)
- # fig, ax = plt.subplots(1, 1)
- # r = ax.boxplot(dists)
- # fig.show()
- # print(sorted(dists))
- def get_state(self):
- dd = self.model.getBlockByName("plot").data
- if len(dd) == 0:
- x = self.model.findBlock("odo.init_x")[0].getValue()
- y = self.model.findBlock("odo.init_y")[0].getValue()
- heading = self.model.findBlock("odo.init_w")[0].getValue()
- else:
- x, y = self.model.getBlockByName("plot").data[-1]
- heading = self.model.getBlockByName("headingPlot").data_xy[1][-1]
- return x, y, heading
- def get_target_state(self, time):
- obj = self.physical[self.physical["time"] <= time].iloc[-1]
- return obj["x"], obj["y"], obj["heading"]
- def set_state(self, state):
- x, y, heading = state
- self.model.findBlock("odo.init_x")[0].setValue(x)
- self.model.findBlock("odo.init_y")[0].setValue(y)
- self.model.findBlock("odo.init_w")[0].setValue(heading)
- def get_params(self):
- d = self.model.findBlock("StaticPlant.WheelAxis")[0].getValue()
- r = 1. / self.model.findBlock("StaticPlant.WheelRadius")[0].getValue()
- return r, d
- def set_params(self, r, d):
- self.model.findBlock("StaticPlant.WheelAxis")[0].setValue(d)
- self.model.findBlock("StaticPlant.WheelRadius")[0].setValue(1./r)
- def last_good_state(self):
- if len(self.traces) >= 1:
- if len(self.traces[-1]) >= 1:
- if len(self.traces[-1]) > 20:
- return self.traces[-1][-20]
- elif len(self.traces[-1]) > 10:
- return self.traces[-1][-10]
- elif len(self.traces[-1]) > 5:
- return self.traces[-1][-5]
- else:
- self.traces.pop()
- return self.traces[-1][0]
- elif len(self.traces) >= 2 and len(self.traces[-2]) >= 1:
- return self.traces[-2][0]
- return 0.0, -1, None, 0, (0, 0)
- def offset(self):
- position = self.get_state()
- dist = float("inf")
- angles = float("-inf"), float("inf")
- self.__offset_i -= 1
- for p1 in self.path:
- p2 = self.path[self.__offset_i-1]
- d = self.distance_to_line(p2, p1, position)
- if d < dist:
- dist, angles = d, (p1[2], p2[2])
- if dist < ERROR:
- break
- self.__offset_i += 1
- self.__offset_i %= len(self.path)
- return dist, angles
- def offset2(self, ctime):
- moment = self.physical[self.physical["time"] <= ctime].iloc[-1]
- position = self.get_state()
- return self.euclidean(moment["x"], moment["y"], position[0], position[1]), \
- (moment["heading"] - np.pi/6, moment["heading"] + np.pi/6)
- def distance_to_line(self, p1, p2, p3):
- """
- Computes the distance from a point p3 to the line segment given by (p1, p2).
- Implementation based upon:
- https://stackoverflow.com/a/6853926
- """
- px = p2[0] - p1[0]
- py = p2[1] - p1[1]
- norm = px * px + py * py
- if norm == 0.0:
- ud = 0.0
- else:
- ud = float((p3[0] - p1[0]) * px + (p3[1] - p1[1]) * py) / norm
- ud = min(max(ud, 0.0), 1.0) # limits line segment to endpoints
- xd = p1[0] + ud * px
- yd = p1[1] + ud * py
- return self.euclidean(p3[0], p3[1], xd, yd)
- @staticmethod
- def euclidean(x1, y1, x2, y2):
- dx = x2 - x1
- dy = y2 - y1
- return ((dx * dx) + (dy * dy)) ** 0.5
- def circle_radius(self, A, B):
- cot = lambda x: 1. / np.tan(x)
- x = (B[1] - A[1] + cot(B[2]) * B[0] - cot(A[2]) * A[0]) / (cot(B[2]) - cot(A[2]))
- y = -cot(A[2]) * (x - A[0]) + A[1]
- return self.euclidean(x, y, A[0], A[1])
- if __name__ == '__main__':
- ts = TrackingSimulator()
- ts.run()
- fig = plt.figure()
- ax = fig.add_subplot(111, projection='3d')
- ax.set_xlabel("X (m)")
- ax.set_ylabel("Y (m)")
- ax.set_zlabel("Time (s)")
- ax.plot(ts.physical["x"], ts.physical["y"], ts.physical["time"], label="physical", ls=':')
- for trace in ts.traces:
- if len(trace) < 5: continue
- r, d = trace[-1][4]
- times = np.asarray(trace)[:,0]
- ax.plot([x[2][0] for x in trace], [x[2][1] for x in trace], times, label=f"r = {r:.2f}; d = {d:.2f}")
- # ax.legend()
- plt.tight_layout()
- plt.savefig("tracking.png")
- plt.show()
- # fig, axs = plt.subplots(3, 3)
- # axs[0].set_xlabel("X (m)")
- # axs[0].set_ylabel("Y (m)")
- # axs[1].set_xlabel("Time (s)")
- # # ax.plot(ts.physical["x"], ts.physical["y"], ts.physical["time"], label="physical", ls=':')
- # # for trace in ts.traces:
- # # if len(trace) < 5: continue
- # # r, d = trace[-1][4]
- # # times = np.asarray(trace)[:,0]
- # # ax.plot([x[2][0] for x in trace], [x[2][1] for x in trace], times, label=f"r = {r:.2f}; d = {d:.2f}")
- # # # ax.legend()
- # # plt.savefig("tracking.png")
- # plt.show()
|