# Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pypdevs.tracers.tracerBase import BaseTracer
from pypdevs.util import runTraceAtController, toStr
import sys, re
class TracerXML(BaseTracer):
"""
A tracer for XML tracing output
"""
def __init__(self, uid, server, filename):
"""
Constructor
:param uid: the UID of this tracer
:param server: the server to make remote calls on
:param filename: file to save the trace to
"""
super(TracerXML, self).__init__(uid, server)
if server.getName() == 0:
self.filename = filename
else:
self.filename = None
def write_py23(self, string):
try:
self.xml_file.write(string)
except TypeError:
self.xml_file.write(string.encode())
def startTracer(self, recover):
"""
Starts up the tracer
:param recover: whether or not this is a recovery call (so whether or not the file should be appended to)
"""
if self.filename is None:
# Nothing to do here as we aren't the controller
return
elif recover:
self.xml_file = open(self.filename, 'a+')
else:
self.xml_file = open(self.filename, 'w')
self.write_py23("\n" + "\n")
def stopTracer(self):
"""
Stop the tracer
"""
self.write_py23("")
self.xml_file.flush()
def trace(self, model_name, timestamp, event_kind, port_info, xml_state, str_state):
"""
Save an XML entry for the provided parameters, basically wraps it in the necessary tags
:param model_name: name of the model
:param timestamp: timestamp of the transition
:param event_kind: kind of event that happened, e.g. internal, external, ...
:param port_info: actual information about the port
:param xml_state: XML representation of the state
:param str_state: normal string representation of the state
"""
self.write_py23("\n"
+ "" + model_name + "\n"
+ "\n"
+ "" + event_kind + "\n"
+ port_info
+ "\n"+ xml_state + "\n\n"
+ "\n")
def traceInternal(self, aDEVS):
"""
The trace functionality for XML output at an internal transition
:param aDEVS: the model that transitioned
"""
port_info = ""
for I in range(len(aDEVS.OPorts)):
if (aDEVS.OPorts[I] in aDEVS.my_output and
aDEVS.my_output[aDEVS.OPorts[I]] is not None):
port_info += '\n'
for j in aDEVS.my_output.get(aDEVS.OPorts[I], []):
port_info += "" + str(j) + "\n"
port_info += "\n"
runTraceAtController(self.server,
self.uid,
aDEVS,
[toStr(aDEVS.getModelFullName()),
aDEVS.time_last,
"'IN'",
toStr(port_info),
toStr(TracerXML.toXML(aDEVS.state)),
toStr(aDEVS.state)])
def traceExternal(self, aDEVS):
"""
The trace functionality for XML output at an external transition
:param aDEVS: the model that transitioned
"""
port_info = ""
for I in range(len(aDEVS.IPorts)):
port_info += '\n'
for j in aDEVS.my_input.get(aDEVS.IPorts[I], []):
port_info += "" + str(j) + "\n"
port_info += "\n"
runTraceAtController(self.server,
self.uid,
aDEVS,
[toStr(aDEVS.getModelFullName()),
aDEVS.time_last,
"'EX'",
toStr(port_info),
toStr(TracerXML.toXML(aDEVS.state)),
toStr(aDEVS.state)])
def traceConfluent(self, aDEVS):
"""
The trace functionality for XML output at a confluent transition
:param aDEVS: the model that transitioned
"""
port_info = ""
for I in range(len(aDEVS.IPorts)):
port_info += '\n'
for j in aDEVS.my_input.get(aDEVS.IPorts[I], []):
port_info += "" + str(j) + "\n"
port_info += "\n"
runTraceAtController(self.server,
self.uid,
aDEVS,
[toStr(aDEVS.getModelFullName()),
aDEVS.time_last,
"'EX'",
toStr(port_info),
toStr(TracerXML.toXML(aDEVS.state)),
toStr(aDEVS.state)])
port_info = ""
for I in range(len(aDEVS.OPorts)):
if aDEVS.OPorts[I] in aDEVS.my_output:
port_info += '\n'
for j in aDEVS.my_output.get(aDEVS.OPorts[I], []):
port_info += "" + str(j) + "\n"
port_info += "\n"
runTraceAtController(self.server,
self.uid,
aDEVS,
[toStr(aDEVS.getModelFullName()),
aDEVS.time_last,
"'IN'",
toStr(port_info),
toStr(TracerXML.toXML(aDEVS.state)),
toStr(aDEVS.state)])
def traceInit(self, aDEVS, t):
"""
The trace functionality for XML output at initialization
:param aDEVS: the model that transitioned
:param t: time at which it should be traced
"""
runTraceAtController(self.server,
self.uid,
aDEVS,
[toStr(aDEVS.getModelFullName()),
t,
"'EX'",
"''",
toStr(TracerXML.toXML(aDEVS.state)),
toStr(aDEVS.state)])
@staticmethod
def toXML(state):
primitives = {
int: "Integer",
float: "Float",
str: "String",
bool: "Boolean"
}
def create_multi_attrib(name, elem):
cat = "C"
if type(elem) in primitives:
cat = "P"
type_ = primitives[type(elem)]
return "%s%s%s" % (
cat, name, type_, str(elem))
else:
type_ = "Unknown"
value = TracerXML.toXML(elem)
return "%s%s%s" % (
cat, name, type_, str(value))
if isinstance(state, (str, int, float, bool)):
return "state%s%s" % (primitives[type(state)], str(state))
elif isinstance(state, dict):
res = ""
for k, v in state.items():
name = re.sub("[^a-zA-Z0-9_]", "", k)
res += create_multi_attrib(name, v)
return "stateMap%s" % res
elif isinstance(state, (list, tuple)):
res = ""
for ix, item in enumerate(state):
name = "item-%d" % ix
res += create_multi_attrib(name, item)
return "stateList%s" % res
elif hasattr(state, "toXML"):
return state.toXML()
elif hasattr(state, "__str__"):
return TracerXML.toXML(str(state))
return TracerXML.toXML({k: getattr(state, k) for k in dir(state) if not k.startswith("_") and not callable(getattr(state, k))})