# __ File: ASG.py ____________________________________________________________________________ # Implements : class ASG # Author : Juan de Lara # Description : Base (Abstract) class for Abstract Syntax Graphs # Modified : 23 Oct 2001 # Changes : # - 23 Oct 2001 : comments added # - 20 Jul 2002 : Added an optional parameter to method writeContents # - 22 Jul 2002 : Added the invalid method, which in the case that the graph is a LHS or RHS # checks that labels are unique. # ______________________________________________________________________________________________ from ASGNode import * from graph_ExternalConnection import * import string import os.path class ASG (ASGNode): # Some useful static constants EDIT = 0 SAVE = 1 CREATE = 2 CONNECT = 3 DELETE = 4 DISCONNECT = 5 TRANSFORM = 6 SELECT = 7 DRAG = 8 DROP = 9 MOVE = 10 def __init__(self, metaModelName, rootASGNode, nodeTypes): # creates a root node for the tree "creates a root node for the tree and a dictionary to store list of the nodes" ASGNode.__init__(self) self.metaModelName = metaModelName# the name of the MetaModel (file with the buttons) self.root = rootASGNode self.listNodes = {} # the dicitionary must be looked-up by the type of the node, it returns a list of all # the nodes of that type self.nodeTypes = nodeTypes self.constraints = {} # dictionary of the constraints implementd, searchable by the name of the constraint for nt in nodeTypes: # initialize the dictionary with the node types self.listNodes[nt] = [] self.mergedASG = [] # ASGs with which this ASG is merged def merge(self, ASGgraph): """ Merges this ASG with another ASG """ self.mergedASG.append(ASGgraph) # add the graph to the list of merged graphs for nodeType in ASGgraph.listNodes.keys(): if not nodeType in self.listNodes.keys(): # node type was not known self.listNodes[nodeType] = ASGgraph.listNodes[nodeType] self.nodeTypes.append(nodeType) else: # node type existed... for node in ASGgraph.listNodes[nodeType]: # add each node of merged graph to actual graph self.listNodes[nodeType].append(node) # copy also the model's attribute for attr in ASGgraph.generatedAttributes.keys(): if attr in self.generatedAttributes.keys(): # Attribute is present! print "Attribute collision: ", attr self.generatedAttributes[attr] = ASGgraph.generatedAttributes[attr] # now create the attribute! self.setAttrValue(attr, ASGgraph.getAttrValue(attr).clone()) def unMerge(self, ASGgraph): """ de-merges this ASG with the given ASG. returns 0 if we do not have an ASG with the same type as ASGgraph, or -1 if we cannot delete the ASG because we have instances od some entity defined by ASGgraph. 1 otherwise. """ # 1st check if the type of ASGgraph is our own type -> so erase ourselves and return another ASG if self.getClass() == ASGgraph.getClass(): return self.selfUnMerge() # 2nd check if we have this ASGgraph isPresent = 0 ASGClass = ASGgraph.getClass() for midx in self.mergedASG: if midx.getClass() == ASGClass: isPresent = 1 break if not isPresent: return 0 # now check that we do not have instances of the entities of the ASG to delete canDelete = 1 for entity in midx.listNodes.keys(): if self.listNodes[entity] != []: canDelete = 0 break if not canDelete: return -1 # now proceed to erase the ASG from mergedASG, the entity types and the generatedAttributes... self.mergedASG.remove(midx) for entity in midx.listNodes.keys(): del self.listNodes[entity] for genattr in midx.generatedAttributes.keys(): del self.generatedAttributes[genattr] def selfUnMerge(self): """ Same as unMerge but the current ASG is who has to be erased!. returns 0 if we do not have anything merged, -1 if we have instances of entities specific of our types and another ASG if we can be de-merged. """ if (not self.mergedASG) or self.mergedASG == []: return 0 # check that we can be erased (do not have instances of entities specific of this ASG) exec "from "+self.getClass()+" import "+self.getClass()+"\n" in self.__dict__, self.__dict__ anASG = eval(self.getClass()+"()", self.__dict__, self.__dict__) canDelete = 1 for entity in anASG.listNodes.keys(): if self.listNodes[entity] != []: canDelete = 0 break if not canDelete: return -1 # now merge the 1st merged ASG with the rest ASG2return = self.mergedASG[0] for merg in self.mergedASG[1:]: ASG2return.merge(merg) # and copy the entities... for entity in ASG2return.listNodes.keys(): ASG2return.listNodes[entity] = self.listNodes[entity] return ASG2return def hasMetaModel(self, metaModel): "returns 1 if metaModel (a string) is one of the meta-models that are loaded" if self.getClass() == metaModel: return 1 # if the meta model is the actual class for mmodels in self.mergedASG: # else check the merged meta-models if mmodels.getClass() == metaModel: return 1 return 0 def open(self): "Method to open an ATOM3 instance to edit the entities (can be overwritten in children)" ATOM3Type.show(self, parent, topWindowParent) return ATOM3(topWindowParent, None , 0, 1, self) def getRoot(self): # returns the root of the tree "returns the root of the tree" return self.root def setRoot(self, newRootNode): # sets the new root node "sets the new root node" self.root = newRootNode def doPrint(self): # prints each node of the tree "prints each node of the tree" self.root.clearPrint() self.root.doPrint() def traverse(self, functionToApply): "Traverses the tree applying the funtion 'functionToApply' to all nodes" for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type functionToApply(node) def addNode(self, node): "adds a node to the graph" nl = self.listNodes[node.getClass()] if not node in nl: nl.append(node) # if we don't have a root node, set the root node to it... if self.root == None: self.root = node # set the attribute rootNode of node to self node.rootNode = self def remove(self, node): "removes the node from the graph..." nl = self.listNodes[node.getClass()] if node in nl: nl.remove(node) def getValue(self, attribute): "returns the value associated with that attribute" return self.__dict__[attribute] #def genAttributesCode(self, file): # "Generates code for the attributes value" # # generate the constructor call... # for attr in self.generatedAttributes.keys(): # for each generated attribute # self.getValue(attr).writeValue2File ( file, ' ', 'self.ASGroot.'+attr ) # write its value to file def genCode(self, fileName, allowedTypes, genGraph = 1, isRootNode = 0, metaModelName = None, export = 0, newTypes = None): "generates code for creating this graph. allowedTypes is a dictionary with the allowed types..." file = open(fileName, "w+t" ) dir, fil = os.path.split(fileName) funcName = string.split (fil, ".") # compose class name if export == 0: file.write('from graph_ASG_ERmetaMetaModel import *\n') # just for the case! file.write('from stickylink import *\n') # necessary if we describe some graphLinks... file.write('from widthXfillXdecoration import *\n') # necessary if we describe some graphLinks... # import the subclass ... file.write('from '+self.getClass()+' import *\n') # import all the node types... for nodetype in self.nodeTypes: file.write('from '+nodetype+' import *\n') # Import graphical files only if they exist try: f = open ('graph_'+nodetype+'.py', "rt") except IOError: pass else: file.write('from graph_'+nodetype+' import *\n') f.close() # import the basic types... for typ in allowedTypes.keys(): typeInstance, params = allowedTypes[typ] typeName = typeInstance.__name__ file.write('from '+typeName+' import *\n') # generate the ASG constructor file.write('\ndef '+funcName[0]+'(self, rootNode):\n') # generate code for the own ASGroot attributes if isRootNode: self.genAttributesCode(file, genGraph, "rootNode") self.writeGraph2File(file, genGraph, isRootNode, None, " ", 1, funcName[0]) # generate code for the sub-models counter = 0 for nodetype in self.nodeTypes: for node in self.listNodes[nodetype]: newFile = funcName[0]+str(counter) res = node.genCode(os.path.join(dir, newFile+'.py'), allowedTypes, genGraph, 0) counter = counter + 1 if isRootNode: hierarchical = self.isHierarchical() if export == 0: if hierarchical: file.write('def main'+funcName[0]+'(self, ASGroot):\n') # file.write(' self.ASGroot = '+self.getClass()+'(self)\n') file.write(' self.'+funcName[0]+'(self, ASGroot)\n\n') file.write(' self.'+funcName[0]+'_connections(self, ASGroot)\n\n') file.write('newfunction = main'+funcName[0]+'\n\n') else: file.write('newfunction = '+funcName[0]+'\n\n') if newTypes and len(newTypes)>0: # generate a list of newly added types file.write('loadedTypes = [') counter = 0 for nt in newTypes: if counter > 0: file.write(',') file.write(str(nt)) counter = counter + 1 file.write(']\n') if metaModelName: file.write("loadedMMName = '"+metaModelName+"'\n") file.close() return funcName[0] # this indicates that we've done something def isHierarchical(self): "Decides if it is a Hierarchical graph..." for type in self.listNodes.keys(): if type[:4] == 'ASG_': # All model names begin with ASG_ for node in self.listNodes[type]: # see if it is a subclass of ASG if node.isSubclass(node, 'ASG'): return 1 # has node that is an ASG # No see if we are examining a sub-model if self.rootNode != None: # that means we are not the root node! return 1 return 0 def writeGraph2File(self, file, genGraph=1, isRootNode=0, rootNodeName = "rootNode", indent=" ", genConstraints = 0, fileName = '', genGGcode = 0, parentName="self"): "Writes the graph into a file" # generate code for the nodes... counter =0 for nodetype in self.nodeTypes: # iterate on all the node types... for node in self.listNodes[nodetype]: # Iterate on all the nodes of each type node.genAttributesCode(file, genGraph, None, isRootNode, rootNodeName, indent, genConstraints, 1, genGGcode, parentName) if self.isSubclass(node, 'ASG'): # if it is a subclass of ASG, ws should include the file generated (hierarchical modeling) newFile = fileName+str(counter) file.write(indent+'exec "from '+newFile+' import *\\n" in self.__dict__, self.__dict__\n') file.write(indent+'self.'+newFile+'(self, self.obj'+str(node.objectNumber)+') \n\n') counter = counter + 1 # if fileName has a value, we are saving a model, we must generate a function to hold the connections... if fileName != '': # if we are not dealing with a hierarchical model, an extra method is not needed.. hierarchical = self.isHierarchical() if hierarchical: file.write('\ndef '+fileName+'_connections(self, rootNode):\n') writed = 0 if isRootNode: file.write(indent+'self.drawConnections(') # generate code for the connections... for nodetype in self.nodeTypes: for node in self.listNodes[nodetype]: res = node.genConnectionsCode(file, genGraph, isRootNode, indent, 1, writed) writed = writed or res # update writed flag if isRootNode: file.write(' )\n\n') # if rootNode and I'm generating a function (filename != '') # then call subModel's functions for connections... if isRootNode and fileName != '': # if main model counter = 0 for nodetype in self.nodeTypes: # iterate, to search for all submodels for node in self.listNodes[nodetype]: if self.isSubclass(node, 'ASG'): # found a submodel file.write(indent+'self.'+fileName+str(counter)+'_connections( self, self.obj'+str(node.objectNumber)+')\n') writed = 1 counter = counter + 1 if fileName != '' and (not writed) and hierarchical: # we must write 'pass', because nothing has been writed in the function!! file.write(indent+'pass\n') def removeContents( self, atom3i, genGraphicalInfo = 1): "removes the contents of graph form ATOM3 drawing canvas" for otype in self.listNodes.keys(): listNodes = self.listNodes[otype] # get the list of objects... for obj in listNodes: # for each object, delete from drawing canvas if obj.graphObject_: # if object has an associated graphical object obj.graphObject_.erase(atom3i) # erase the graphical object # now remove physically from memory... for otype in self.listNodes.keys(): listNodes = self.listNodes[otype] # get the list of objects... while len(listNodes) > 0: del listNodes[0] self.listNodes[otype] = [] def createExternalConnection (self, atom3i, fromClass, toClass, objFrom, objTo, From = 1 ): "Creates an external connection between objFrom and objTo" # .............................................................................................. # create an 'externalConnection" object # .............................................................................................. # 1st.: decide the position to be created... xpos, ypos = 0, 0 x, y = objFrom.graphObject_.getCoordsVertex() if x < 350: xpos = 110 else: xpos = 577 if y < 250: ypos = 85 else: ypos = 425 # 2nd. create the object, add the tag of peer's model gobj = graph_ExternalConnection(xpos, ypos) gobj.DrawObject(atom3i.UMLmodel) gobj.semanticObject = objTo.rootNode # link to the model of peer node modelTag = objTo.rootNode.getClass() atom3i.UMLmodel.addtag_withtag(modelTag, gobj.tag) # 3rd. connect (only graphically) both objects if From == 1: atom3i.showConnection( fromClass, gobj.tag) else: atom3i.showConnection( gobj.tag, toClass) def getNumberOfDrawn(self, objDest, list): number = 0 for tuple in list: if tuple[0] == objDest and tuple[1] != None: number = number + 1 if tuple[1] == None: return number return number def setConnectionHandler (self, handler, obj, list): index = list.index((obj, None)) list[index] = (obj, handler) def writeContents( self, atom3i, genGraphicalInfo = 1, rewriteObjects = 1, objects2Show = [] ): """ Calls the ATOM3 API to show its contents. The last parameter can be a list of objects, and then only these objects are shown (added 20 July 2002, JL) """ obj2show = [] if objects2Show == []: # Then show all objects... for otype in self.listNodes.keys(): # fill objects2Show list with all the objects in the graph for obj in self.listNodes[otype]: obj2show.append(obj) else: obj2show = []+objects2Show for obj in obj2show: # for each object, show itself atom3i.drawEntity(obj, obj.getClass()) if genGraphicalInfo: # ............................................................................. # Repeat the iterations, this time to show the connections... # ............................................................................. # 1st. : to each connection in "in_connections_" and "out_connections_" convert it in a tuple (, handler) # For the moment set the handler to None (because it has not been drawn yet). Create also in the graphical # objects a dictionary to store the new handler information for obj in obj2show: for index in range(len(obj.in_connections_)): obj.in_connections_[index] = ( obj.in_connections_[index], None) for index in range(len(obj.out_connections_)): obj.out_connections_[index] = ( obj.out_connections_[index], None) obj.graphObject_.OLD_out_connections_Points = {} for handler in obj.graphObject_.out_connections_Points.keys(): obj.graphObject_.OLD_out_connections_Points[handler] = obj.graphObject_.out_connections_Points[handler] obj.graphObject_.out_connections_Points = {} obj.graphObject_.connections = [] # delete the graphical connections # 2nd. : Iterate over the objects, drawing each connection, if it has not been drawn yet for obj in obj2show: # for each object... fromClass = obj.graphObject_.tag # get if of the origin of the connection for peerTuple in obj.out_connections_: # for each connection (, handle) of each object... peer, handle = peerTuple # unwrap tuple... if type(peer.in_connections_[0]) == TupleType: if handle == None: # do not do anything if the connection's been drawn yet... toClass = peer.graphObject_.tag # get connection destination connCounter = self.getNumberOfDrawn(peer, obj.out_connections_) # number of connections to that object drawn so far ret = obj.graphObject_.getInfoTuple(fromClass, toClass, connCounter+1, obj.graphObject_.OLD_out_connections_Points) # obtain connection information .... if ret: infoTuple, oldHandler = ret interPts, smoothConn, num1st = infoTuple[2:] x0, y0, x1, y1 = interPts[0], interPts[1], interPts[len(interPts)-2], interPts[len(interPts)-1] if len(interPts) > 4: atom3i.inter_connect_points = peer.graphObject_.reverseList2by2(interPts[2:len(interPts)-2]) if issubclass(obj.graphObject_.__class__, graphEntity): retuple = atom3i.showConnection(fromClass, toClass, smoothConn, x0, y0, x1, y1, num1st) # call the API else: retuple = atom3i.showConnection(fromClass, toClass, smoothConn, x1, y1, x0, y0, num1st) # call the API else: retuple = atom3i.showConnection(fromClass, toClass) # call the API self.setConnectionHandler(retuple[0], peer, obj.out_connections_) # store the new handler self.setConnectionHandler(retuple[0], obj, peer.in_connections_) # store the new handler in the peer object else : # umm! this means that we have a connection across hierarchies... # check if we need an 'externalConnection' object or a connection # to a submodel!. This last case happens when Level(TO) > Level(FROM), that is, we have # a connection down. from msvcrt import getch getch() if obj.level() > peer.level(): # this level > other level ! -> create external connection self.createExternalConnection(atom3i, fromClass, toClass, obj, peer, 1) else: # this level < other level ! -> create a connection to the model icon atom3i.showConnection ( fromClass, peer.rootNode.graphObject_.tag ) # it won't work with multi-level connections # now look for incoming connections! toClass = obj.graphObject_.tag # As we are inspecting incoming connections, obj is the destination... for peerTuple in obj.in_connections_: # for each connection (, handle) of each object... peer, handler = peerTuple # unwrap tuple... if type(peer.out_connections_[0]) == TupleType: if handler == None: # if they have not been connected yet... # obtain source and destinations graphical and semantic objects... fromClass = peer.graphObject_.tag connCounter = self.getNumberOfDrawn(obj, peer.out_connections_) # number of connections to that object drawn so far ret = peer.graphObject_.getInfoTuple(fromClass, toClass, connCounter+1, peer.graphObject_.OLD_out_connections_Points) # obtain connection information .... if ret: infoTuple, oldHandler = ret interPts, smoothConn, num1st = infoTuple[2:] x0, y0, x1, y1 = interPts[0], interPts[1], interPts[len(interPts)-2], interPts[len(interPts)-1] if len(interPts) > 4: atom3i.inter_connect_points = interPts[2:len(interPts)-2] if issubclass(obj.graphObject_.__class__, graphEntity): retuple = atom3i.showConnection(fromClass, toClass, smoothConn, x1, y1, x0, y0, num1st) # call the API else: retuple = atom3i.showConnection(fromClass, toClass, smoothConn, x0, y0, x1, y1, num1st) # call the API else: retuple = atom3i.showConnection(fromClass, toClass) # call the API self.setConnectionHandler(retuple[0], peer, obj.in_connections_) self.setConnectionHandler(retuple[0], obj, peer.out_connections_) else : # umm! this means that we have a connection across hierarchies... if obj.level() > peer.level(): # this level > other level ! -> create external connection self.createExternalConnection(atom3i, fromClass, toClass, obj, peer, 0) else: # this level < other level ! -> create a connection to the model icon atom3i.showConnection ( peer.rootNode.graphObject_.tag, toClass ) # it won't work with multi-level connections # 3rd. : make the tuples in "in_connections_" and "out_connections_" single elements again for obj in obj2show: del obj.graphObject_.OLD_out_connections_Points for index in range(len(obj.in_connections_)): obj.in_connections_[index] = obj.in_connections_[index][0] for index in range(len(obj.out_connections_)): obj.out_connections_[index] = obj.out_connections_[index][0] def preAction(self, actionID, * params): pass def postAction(self, actionID, * params): pass def preCondition(self, actionID, * params): pass def postCondition(self, actionID, * params): pass def findLabel(self, label): "Searchs graph for a node with label 'label'" for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type if node.GGLabel.getValue() == label: # Found it return node # Couldn't find a match return None def getAncestors(self,node): "returns a list of the ancestros of node" ancestors = [] for nt in self.listNodes.keys(): # for all kind of nodes... for nod in self.listNodes[nt]: # for all nodes of type nod.ancProc = 0 # create new label for ancestor calculation node.ancProc = 1 # make sure we don't include the node itself... self.getParents(node, ancestors) for nt in self.listNodes.keys(): # for all kind of nodes... for nod in self.listNodes[nt]: # for all nodes of type del nod.ancProc # delete new label for ancestor calculation return ancestors def getParents(self, node, ancestors): for anc in node.in_connections_: # iterate over input connections... ancestors.append(anc) if not anc.ancProc: # if not processed, calculate ancestors recursively... self.getParents(anc, ancestors) anc.ancProc = 1 def nodeWithLabel(self, label): """returns a node with the label 'label'""" for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type if node.GGLabel.getValue() == label: # check if the node's label is what we are looking for... return node # a node has been found! return None # no appropriate node has been found def evaluateLocalPreCondition(self, which, * params): "Traverses the tree applying the funtion 'functionToApply' to all nodes" for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type res = apply(node.preCondition, [which]+list(params)) if res: return res return None def evaluateLocalPostCondition(self, which, * params): "Traverses the tree applying the funtion 'functionToApply' to all nodes" for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type res = apply (node.postCondition, [which]+list(params)) if res: return res return None def isEmpty(self): "returns 1 if has some node inside, else return 0" for nt in self.listNodes.keys(): if self.listNodes[nt] != []: return 0 return 1 def destroyNodes(self): """ Applies the destroy method to the graphical objects of each node. This function is called when the canvas in which this graph is being shown is closed. """ for nt in self.listNodes.keys(): # for all kind of nodes... for node in self.listNodes[nt]: # for all nodes of type if node.graphObject_: node.graphObject_.destroy() def invalid(self): """ In the case we are the LHS or RHS of a Graph Grammar, verifies that GG labels are different Created 21 Jul 2002 by JL """ inGG = 0 if self.root and self.root.parent: inGG = self.root.parent.editGGLabel for asg in self.mergedASG: if asg.root and asg.root.parent: inGG = inGG or asg.root.parent.editGGLabel if inGG: # This means we are a LHS or RHS of a GG rule GGlabels = [] # A list to obtain all the GG labels... for type in self.listNodes.keys(): for node in self.listNodes[type]: label = node.GGLabel.getValue() # Get the node's GG label if label in GGlabels: # it means it is repeated return "Graph Grammar labels are not unique." else: GGlabels.append(label) return ASGNode.invalid(self) # perform the checking in my parent's invalid code