bkh.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from CBD.Core import CBD
  2. from CBD.lib.std import TimeBlock, GenericBlock
  3. from CBD.lib.endpoints import SignalCollectorBlock
  4. class SinGen(CBD):
  5. def __init__(self, name="SinGen"):
  6. CBD.__init__(self, name, input_ports=[], output_ports=[])
  7. # Create the blocks
  8. self.addBlock(TimeBlock("time"))
  9. self.addBlock(GenericBlock("sin", block_operator="sin"))
  10. self.addBlock(SignalCollectorBlock("collector"))
  11. # Connect the blocks
  12. self.addConnection("time", "sin")
  13. self.addConnection("sin", "collector")
  14. sinGen = SinGen("SinGen")
  15. from CBD.realtime.plotting import PlotManager, Backend, LinePlot, follow
  16. from CBD.simulator import Simulator
  17. from bokeh.plotting import figure, curdoc
  18. from bokeh.client import push_session
  19. fig = figure(plot_width=500, plot_height=500, y_range=(-1, 1))
  20. curdoc().add_root(fig)
  21. manager = PlotManager(Backend.BOKEH)
  22. manager.register("sin", sinGen.findBlock('collector')[0], fig, LinePlot(color='red'))
  23. def set_xlim(limits):
  24. lower, upper = limits
  25. fig.x_range.start = lower
  26. fig.x_range.end = upper
  27. manager.connect('sin', 'update', lambda d: set_xlim(follow(d[0], 10.0, lower_bound=0.0)))
  28. session = push_session(curdoc())
  29. session.show()
  30. import time
  31. time.sleep(2)
  32. sim = Simulator(sinGen)
  33. sim.setRealTime()
  34. sim.setDeltaT(0.1)
  35. sim.run(20.0)
  36. # NOTE: currently, there can be 'flickering' of the plot
  37. while manager.is_opened():
  38. session.push()
  39. time.sleep(0.1)