experiment.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import logging
  2. import time
  3. import numpy
  4. from pydwf import DwfLibrary, DwfAnalogOutNode, DwfAnalogOutFunction, DwfAnalogInFilter, DwfAcquisitionMode, \
  5. DwfTriggerSource, DwfAnalogInTriggerType, DwfTriggerSlope, DwfEnumConfigInfo, DwfState, PyDwfError, AnalogOut, \
  6. AnalogIn
  7. from pydwf.utilities import openDwfDevice
  8. import data
  9. import context
  10. def single_experiment(test_frequency: float,
  11. test_amplitude: float,
  12. sampling_ratio: int) \
  13. -> [float, numpy.ndarray[float]]:
  14. logging.info("Performing experiment: f:" + str(test_frequency) + ", A:" + str(test_amplitude) + ", R:" + str(sampling_ratio))
  15. dwf: DwfLibrary = DwfLibrary()
  16. sampling_frequency: float = test_frequency * sampling_ratio
  17. def maximize_buffer(configuration_parameters):
  18. return configuration_parameters[DwfEnumConfigInfo.AnalogInBufferSize]
  19. try:
  20. with openDwfDevice(dwf, serial_number_filter=None, score_func=maximize_buffer) as device:
  21. configure_output(device.analogOut, test_frequency, test_amplitude)
  22. time.sleep(context.OUTPUT_STABILISATION_TIME)
  23. configure_input(device.analogIn, sampling_frequency, DwfAcquisitionMode.Record)
  24. samples: numpy.ndarray[float] = record_experiment(device.analogIn, sampling_ratio)
  25. trace_time: float = data.save_experiment_trace(samples, test_frequency, test_amplitude, sampling_ratio)
  26. except (PyDwfError, RuntimeError) as exception:
  27. logging.warning(exception)
  28. return single_experiment(test_frequency, test_amplitude, sampling_ratio)
  29. return trace_time, samples
  30. def configure_output(analog_output_device: AnalogOut,
  31. test_frequency: float,
  32. test_amplitude: float) \
  33. -> None:
  34. ch0: int = 0
  35. node = DwfAnalogOutNode.Carrier
  36. analog_output_device.reset(-1)
  37. analog_output_device.nodeEnableSet(ch0, node, True)
  38. analog_output_device.nodeFunctionSet(ch0, node, DwfAnalogOutFunction.Sine)
  39. analog_output_device.nodeFrequencySet(ch0, node, test_frequency)
  40. analog_output_device.nodeAmplitudeSet(ch0, node, test_amplitude)
  41. analog_output_device.nodeOffsetSet(ch0, node, 0)
  42. analog_output_device.configure(ch0, 1)
  43. def configure_input(analog_input_device: AnalogIn,
  44. sampling_frequency: float,
  45. acquisition_mode: DwfAcquisitionMode) \
  46. -> None:
  47. ch0: int = 0 # reads filter output
  48. ch1: int = 1 # reads filter input
  49. channels = (ch0, ch1)
  50. analog_input_device.reset()
  51. for channel in channels:
  52. analog_input_device.channelEnableSet(channel, True)
  53. analog_input_device.channelFilterSet(channel, DwfAnalogInFilter.Average)
  54. analog_input_device.acquisitionModeSet(acquisition_mode)
  55. analog_input_device.recordLengthSet(-1)
  56. analog_input_device.frequencySet(sampling_frequency)
  57. configure_input_trigger(analog_input_device, ch1)
  58. def configure_input_trigger(analog_input_device: AnalogIn,
  59. channel: int) \
  60. -> None:
  61. analog_input_device.triggerSourceSet(DwfTriggerSource.DetectorAnalogIn)
  62. analog_input_device.triggerChannelSet(channel)
  63. analog_input_device.triggerTypeSet(DwfAnalogInTriggerType.Edge)
  64. analog_input_device.triggerConditionSet(DwfTriggerSlope.Rise)
  65. analog_input_device.triggerPositionSet(0.0)
  66. analog_input_device.triggerLevelSet(0.0)
  67. analog_input_device.triggerHysteresisSet(0.001)
  68. def record_experiment(analog_input_device: AnalogIn,
  69. sampling_ratio: int) \
  70. -> numpy.ndarray[float]:
  71. num_samples: int = sampling_ratio * context.DATA_CYCLES
  72. acquired_samples: int = 0
  73. input_samples: list = []
  74. output_samples: list = []
  75. analog_input_device.configure(True, True)
  76. while True:
  77. status = analog_input_device.status(True)
  78. current_samples, current_samples_lost, current_samples_corrupted = analog_input_device.statusRecord()
  79. if current_samples != 0:
  80. output_samples.append(analog_input_device.statusData(0, current_samples))
  81. input_samples.append(analog_input_device.statusData(1, current_samples))
  82. logging.info("Acquired "+str(current_samples)+" samples")
  83. acquired_samples = acquired_samples + current_samples
  84. if current_samples_lost + current_samples_corrupted != 0:
  85. raise RuntimeError("Samples lost/corrupted: (" + str(current_samples_lost) + "," + str(current_samples_corrupted) + ")")
  86. if status == DwfState.Done or acquired_samples >= num_samples:
  87. break
  88. return numpy.array([numpy.concatenate(output_samples, axis=None)[:num_samples],
  89. numpy.concatenate(input_samples, axis=None)[:num_samples]])