123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import logging
- import time
- import numpy
- from pydwf import DwfLibrary, DwfAnalogOutNode, DwfAnalogOutFunction, DwfAnalogInFilter, DwfAcquisitionMode, \
- DwfTriggerSource, DwfAnalogInTriggerType, DwfTriggerSlope, DwfEnumConfigInfo, DwfState, PyDwfError, AnalogOut, \
- AnalogIn
- from pydwf.utilities import openDwfDevice
- import data
- import context
- def single_experiment(test_frequency: float,
- test_amplitude: float,
- sampling_ratio: int) \
- -> [float, numpy.ndarray[float]]:
- logging.info("Performing experiment: f:" + str(test_frequency) + ", A:" + str(test_amplitude) + ", R:" + str(sampling_ratio))
- dwf: DwfLibrary = DwfLibrary()
- sampling_frequency: float = test_frequency * sampling_ratio
- def maximize_buffer(configuration_parameters):
- return configuration_parameters[DwfEnumConfigInfo.AnalogInBufferSize]
- try:
- with openDwfDevice(dwf, serial_number_filter=None, score_func=maximize_buffer) as device:
- configure_output(device.analogOut, test_frequency, test_amplitude)
- time.sleep(context.OUTPUT_STABILISATION_TIME)
- configure_input(device.analogIn, sampling_frequency, DwfAcquisitionMode.Record)
- samples: numpy.ndarray[float] = record_experiment(device.analogIn, sampling_ratio)
- trace_time: float = data.save_experiment_trace(samples, test_frequency, test_amplitude, sampling_ratio)
- except (PyDwfError, RuntimeError) as exception:
- logging.warning(exception)
- return single_experiment(test_frequency, test_amplitude, sampling_ratio)
- return trace_time, samples
- def configure_output(analog_output_device: AnalogOut,
- test_frequency: float,
- test_amplitude: float) \
- -> None:
- ch0: int = 0
- node = DwfAnalogOutNode.Carrier
- analog_output_device.reset(-1)
- analog_output_device.nodeEnableSet(ch0, node, True)
- analog_output_device.nodeFunctionSet(ch0, node, DwfAnalogOutFunction.Sine)
- analog_output_device.nodeFrequencySet(ch0, node, test_frequency)
- analog_output_device.nodeAmplitudeSet(ch0, node, test_amplitude)
- analog_output_device.nodeOffsetSet(ch0, node, 0)
- analog_output_device.configure(ch0, 1)
- def configure_input(analog_input_device: AnalogIn,
- sampling_frequency: float,
- acquisition_mode: DwfAcquisitionMode) \
- -> None:
- ch0: int = 0 # reads filter output
- ch1: int = 1 # reads filter input
- channels = (ch0, ch1)
- analog_input_device.reset()
- for channel in channels:
- analog_input_device.channelEnableSet(channel, True)
- analog_input_device.channelFilterSet(channel, DwfAnalogInFilter.Average)
- analog_input_device.acquisitionModeSet(acquisition_mode)
- analog_input_device.recordLengthSet(-1)
- analog_input_device.frequencySet(sampling_frequency)
- configure_input_trigger(analog_input_device, ch1)
- def configure_input_trigger(analog_input_device: AnalogIn,
- channel: int) \
- -> None:
- analog_input_device.triggerSourceSet(DwfTriggerSource.DetectorAnalogIn)
- analog_input_device.triggerChannelSet(channel)
- analog_input_device.triggerTypeSet(DwfAnalogInTriggerType.Edge)
- analog_input_device.triggerConditionSet(DwfTriggerSlope.Rise)
- analog_input_device.triggerPositionSet(0.0)
- analog_input_device.triggerLevelSet(0.0)
- analog_input_device.triggerHysteresisSet(0.001)
- def record_experiment(analog_input_device: AnalogIn,
- sampling_ratio: int) \
- -> numpy.ndarray[float]:
- num_samples: int = sampling_ratio * context.DATA_CYCLES
- acquired_samples: int = 0
- input_samples: list = []
- output_samples: list = []
- analog_input_device.configure(True, True)
- while True:
- status = analog_input_device.status(True)
- current_samples, current_samples_lost, current_samples_corrupted = analog_input_device.statusRecord()
- if current_samples != 0:
- output_samples.append(analog_input_device.statusData(0, current_samples))
- input_samples.append(analog_input_device.statusData(1, current_samples))
- logging.info("Acquired "+str(current_samples)+" samples")
- acquired_samples = acquired_samples + current_samples
- if current_samples_lost + current_samples_corrupted != 0:
- raise RuntimeError("Samples lost/corrupted: (" + str(current_samples_lost) + "," + str(current_samples_corrupted) + ")")
- if status == DwfState.Done or acquired_samples >= num_samples:
- break
- return numpy.array([numpy.concatenate(output_samples, axis=None)[:num_samples],
- numpy.concatenate(input_samples, axis=None)[:num_samples]])
|