123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
- # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from pypdevs.tracers.tracerBase import BaseTracer
- from pypdevs.util import runTraceAtController, toStr, DEVSException
- from math import floor
- class VCDRecord(object):
- """
- A data class to keep information about VCD variables
- """
- def __init__(self, identifier_nr, model_name, port_name):
- """
- Constructor.
- :param identifier_nr: the actual identifier
- :param model_name: name of the model
- :param port_name: name of the port
- """
- self.model_name = model_name
- self.port_name = port_name
- self.identifier = identifier_nr
- # BitSize cannot be given since there is no event created on this wire
- # Set to None to make sure that it will be changed
- self.bit_size = None
- class TracerVCD(BaseTracer):
- """
- A tracer for VCD output. Should only be used for binary signals!
- """
- def __init__(self, uid, server, filename):
- """
- Constructor
- :param uid: the UID of the tracer
- :param server: the server to make remote requests on
- :param filename: file to save the trace to
- """
- super(TracerVCD, self).__init__(uid, server)
- if server.getName() == 0:
- self.filename = filename
- else:
- self.filename = None
- def startTracer(self, recover):
- """
- Starts up the tracer
- :param recover: whether or not this is a recovery call (so whether or not the file should be appended to)
- """
- if self.filename is None:
- # Nothing to do here as we aren't the controller
- return
- elif recover:
- self.vcd_file = open(self.filename, 'ab+')
- else:
- self.vcd_file = open(self.filename, 'wb')
- self.vcd_var_list = []
- self.vcd_prevtime = 0.0
- self.vcdHeader()
- def stopTracer(self):
- """
- Stop the tracer
- """
- self.vcd_file.flush()
- def vcdHeader(self):
- """
- Create the VCD file header by doing calls to the coordinator
- """
- self.vcd_file.write(("$date\n").encode())
- from datetime import date
- self.vcd_file.write(("\t" + date.today().isoformat() + "\n" +
- "$end\n" +
- "$version\n" +
- "\tPyDEVS VCD export\n" +
- "$end\n" +
- "$comment\n" +
- "\tGenerated from DEVS-code\n" +
- "$end\n" +
- "$timescale 1ns $end\n").encode())
- variables = self.server.getProxy(0).getVCDVariables()
- counter = 0
- for i in variables:
- model, port = i
- self.vcd_var_list.append(VCDRecord(counter, model, port))
- counter += 1
- modelList = []
- for i in range(len(self.vcd_var_list)):
- if self.vcd_var_list[i].model_name not in modelList:
- modelList.append(self.vcd_var_list[i].model_name)
- for module in modelList:
- self.vcd_file.write(("$scope %s %s $end\n" % (module, module)).encode())
- for var in range(len(self.vcd_var_list)):
- if self.vcd_var_list[var].model_name == module:
- self.vcd_file.write("$var wire ".encode())
- if self.vcd_var_list[var].bit_size is None:
- self.vcd_file.write("1".encode())
- else:
- bitsize = str(self.vcd_var_list[var].bit_size)
- self.vcd_file.write(bitsize.encode())
- self.vcd_file.write((" %s %s $end\n"
- % (self.vcd_var_list[var].identifier,
- self.vcd_var_list[var].port_name)).encode())
- self.vcd_file.write(("$upscope $end\n").encode())
- self.vcd_file.write(("$enddefinitions $end\n").encode())
- self.vcd_file.write(("$dumpvars \n").encode())
- for var in range(len(self.vcd_var_list)):
- self.vcd_file.write(("b").encode())
- if self.vcd_var_list[var].bit_size is None:
- # The wire is a constant error signal, so the wire is never used
- # Assume 1 bit long
- self.vcd_file.write(("z").encode())
- else:
- for i in range(self.vcd_var_list[var].bit_size):
- self.vcd_file.write(("z").encode())
- self.vcd_file.write((" %s\n" % self.vcd_var_list[var].identifier).encode())
- self.vcd_file.write(("$end\n").encode())
- def trace(self, model_name, time, port_name, vcd_state):
- """
- Trace a VCD entry
- :param model_name: name of the model
- :param time: time at which transition happened
- :param port_name: name of the port
- :param vcd_state: state to trace on the specified port
- """
- # Check if the signal is a valid binary signal
- for i in range(len(vcd_state)):
- if (i == 0):
- if vcd_state[i] == 'b':
- continue
- else:
- raise DEVSException(("Port %s in model %s does not carry " +
- "a binary signal\n" +
- "VCD exports require a binary signal, " +
- "not: %s") % (port_name, model_name, vcd_state))
- char = vcd_state[i]
- if char not in ["0", "1", "E", "x"]:
- raise DEVSException(("Port %s in model %s does not carry " +
- "a binary signal\n" +
- "VCD exports require a binary signal, " +
- "not: %s") % (port_name, model_name, vcd_state))
- # Find the identifier of this wire
- for i in range(len(self.vcd_var_list)):
- if (self.vcd_var_list[i].model_name == model_name and
- self.vcd_var_list[i].port_name == port_name):
- identifier = str(self.vcd_var_list[i].identifier)
- break
- # If the bit_size is not yet defined, define it now
- if self.vcd_var_list[i].bit_size is None:
- self.vcd_var_list[i].bit_size = len(vcd_state)-1
- elif self.vcd_var_list[i].bit_size != len(vcd_state) - 1:
- raise DEVSException("Wire has changing bitsize!\n" +
- "You are probably not using bit encoding!")
- # Now we have to convert between logisim and VCD notation
- vcd_state = vcd_state.replace('x', 'z')
- vcd_state = vcd_state.replace('E', 'x')
- # identifier will be defined, otherwise the record was not in the list
- if time > self.vcd_prevtime:
- # Convert float to integer without losing precision
- # ex. 5.0 --> 50, 5.5 --> 55
- t = time[0]
- vcd_time = int(str(int(floor(t))) +
- str(int(t - floor(t)) * (len(str(t)) - 2)))
- if (self.vcd_prevtime != vcd_time):
- # The time has passed, so add a new VCD header
- self.vcd_file.write(("#" + str(vcd_time) + "\n").encode())
- self.vcd_prevtime = vcd_time
- self.vcd_file.write((vcd_state + " " + identifier + "\n").encode())
- def traceConfluent(self, aDEVS):
- """
- The trace functionality for VCD output at a confluent transition
- :param aDEVS: the model that transitioned
- """
- name = toStr(aDEVS.getModelFullName())
- for I in range(len(aDEVS.IPorts)):
- port_name = aDEVS.IPorts[I].getPortName()
- signal_bag = aDEVS.my_input.get(aDEVS.IPorts[I], [])
- if signal_bag is not None:
- for port_signal in signal_bag:
- runTraceAtController(self.server,
- self.uid,
- aDEVS,
- [name,
- aDEVS.time_last,
- toStr(port_name),
- toStr(port_signal)])
- for I in range(len(aDEVS.OPorts) ):
- if aDEVS.OPorts[I] in aDEVS.my_output:
- port_name = aDEVS.OPorts[I].getPortName()
- signal_bag = aDEVS.my_output.get(aDEVS.OPorts[I], [])
- if signal_bag is not None:
- for port_signal in signal_bag:
- runTraceAtController(self.server,
- self.uid,
- aDEVS,
- [name,
- aDEVS.time_last,
- toStr(port_name),
- toStr(port_signal)])
- def traceInternal(self, aDEVS):
- """
- The trace functionality for VCD output at an internal transition
- :param aDEVS: the model that transitioned
- """
- name = toStr(aDEVS.getModelFullName())
- for I in range(0, len(aDEVS.OPorts) ):
- if aDEVS.OPorts[I] in aDEVS.my_output:
- port_name = aDEVS.OPorts[I].getPortName()
- signal_bag = aDEVS.my_output.get(aDEVS.OPorts[I], [])
- if signal_bag is not None:
- for port_signal in signal_bag:
- runTraceAtController(self.server,
- self.uid,
- aDEVS,
- [name,
- aDEVS.time_last,
- toStr(port_name),
- toStr(port_signal)])
- def traceExternal(self, aDEVS):
- """
- The trace functionality for VCD output at an external transition
- :param aDEVS: the model that transitioned
- """
- name = toStr(aDEVS.getModelFullName())
- for I in range(len(aDEVS.IPorts)):
- port_name = aDEVS.IPorts[I].getPortName()
- signal_bag = aDEVS.my_input.get(aDEVS.IPorts[I], [])
- if signal_bag is not None:
- for port_signal in signal_bag:
- runTraceAtController(self.server,
- self.uid,
- aDEVS,
- [name,
- aDEVS.time_last,
- toStr(port_name),
- toStr(port_signal)])
- def traceInit(self, aDEVS, t):
- """
- The trace functionality for VCD output at initialisation
- :param aDEVS: the model that was initialized
- :param t: time at which it should be traced
- """
- pass
|