mtworker.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. import re, threading, json, logging
  5. from ws import WebSocket
  6. from ptcal.ptcal import PyTCoreAbstractionLayer
  7. from ptcal.utils import Utilities as utils
  8. '''
  9. message handler thread: mtworkers delegate the handling of each message they
  10. receive to a new instance of this thread, which
  11. terminates after handling the said message
  12. _msg the message to handle
  13. _onmsg the message handler function '''
  14. class messageHandlerThread(threading.Thread) :
  15. def __init__(self,onmsg,msg) :
  16. threading.Thread.__init__(self)
  17. self._onmsg = onmsg
  18. self._msg = msg
  19. def run(self) :
  20. self._onmsg(self._msg)
  21. '''
  22. mtworker thread: handles every request to given mtworker
  23. wid this worker's id
  24. _msgQueue this worker's message queue (stores incoming REST queries)
  25. _lock a lock that ensures synchronous access to the message queue
  26. and that causes the worker to sleep while waiting for messages
  27. _ws this worker's websocket (listens for its asworker's changelogs)
  28. _aswid this worker's asworker's wid
  29. _wContext a 'summary' of this mtworker used by _ptcal
  30. _ptcal this worker's PyTCoreAbstractionLayer instance
  31. _stopped this flag becomes true when this worker should terminate '''
  32. class mtworkerThread(threading.Thread) :
  33. nextID = 0
  34. '''
  35. sets up instance vars + stores references to _msgQueue and _lock in
  36. argument data structures '''
  37. def __init__(self,mtw2msgQueue,mtw2lock) :
  38. threading.Thread.__init__(self)
  39. self.wid = str(mtworkerThread.nextID)
  40. mtworkerThread.nextID += 1
  41. self._msgQueue = []
  42. mtw2msgQueue[self.wid] = self._msgQueue
  43. self._lock = threading.Condition()
  44. mtw2lock[self.wid] = self._lock
  45. self._ws = None
  46. self._aswid = None
  47. self._ptcal = None
  48. self._stopped = False
  49. '''
  50. init basic mtworker behavior
  51. 0. loop on the steps below until someone stops this thread
  52. 1. acquire self._lock
  53. 2. if self._msgQueue is empty, release self._lock and block until awakened
  54. by notify()... will occur in 1 of 2 cases:
  55. a) when the main thread adds something to the self._msgQueue
  56. b) on self.stop()
  57. to distinguish between both cases, we check if self._msgQueue is empty
  58. ... when it is, we break out of the loop (which terminates the current
  59. mtworkerThread)... when it isn't, we continue to step 3
  60. 3. remove oldest element from self._msgQueue
  61. 4. release the self._lock
  62. 5. delegate the handling of the message from step 3 to a new
  63. messageHandlerThread
  64. NOTE:: self._lock is used here for 2 purposes... 1st, to ensure
  65. synchronous access to self._msgQueue... 2nd, to ensure the worker
  66. thread sleeps while self._msgQueue is empty '''
  67. def run(self):
  68. while not self._stopped :
  69. self._lock.acquire()
  70. if len(self._msgQueue) == 0 :
  71. self._lock.wait()
  72. if len(self._msgQueue) == 0 :
  73. break
  74. msg = self._msgQueue.pop(0)
  75. self._lock.release()
  76. messageHandlerThread(self._onmessage,msg).start()
  77. '''
  78. send a request to this worker's asworker
  79. TBI:: the use of '127.0.0.1' implies that the atompm server is running on
  80. the same machine as the transformation engine... '''
  81. def _aswHttpReq(self,method,uri,data) :
  82. return utils.httpReq(
  83. method,
  84. '127.0.0.1:8124',
  85. uri+'?wid='+self._aswid,
  86. data)
  87. '''
  88. handle an incoming message from the server '''
  89. def _onmessage(self,msg):
  90. if msg == 'DIE' :
  91. return self.stop()
  92. logging.debug(self.wid+' >> #'+str(id(msg['resp']))+' '+\
  93. msg['method']+' '+msg['uri'])
  94. if msg['method'] == 'PUT' and re.match('/aswSubscription',msg['uri']) :
  95. if self._ws != None :
  96. self._postMessage(
  97. msg['resp'],
  98. {'statusCode':403,
  99. 'reason':'already subscribed to an asworker'})
  100. else :
  101. self._aswid = str(json.loads(msg['reqData'])['aswid'])
  102. self._ptcal = PyTCoreAbstractionLayer(
  103. {'httpReq':self._aswHttpReq, 'wid':self._aswid}, self.wid)
  104. try :
  105. self._ws = WebSocket(self._ptcal)
  106. except Exception, e :
  107. self._postMessage(
  108. msg['resp'],
  109. {'statusCode':500,
  110. 'reason':str(e)})
  111. self._ws.subscribe(self._aswid)
  112. def respond(resp) :
  113. if self._ws.subscribed == False :
  114. self._ws.close()
  115. self._postMessage(
  116. resp,
  117. {'statusCode':500,
  118. 'reason':'subscription to asworker failed'})
  119. elif self._ws.subscribed == True :
  120. self._postMessage(resp,{'statusCode':200})
  121. else :
  122. t = threading.Timer(0.5,respond,[resp])
  123. t.start()
  124. respond(msg['resp'])
  125. elif msg['method'] == 'PUT' and re.match('/envvars',msg['uri']) :
  126. if self._ptcal.username != None :
  127. self._postMessage(
  128. msg['resp'],
  129. {'statusCode':403,
  130. 'reason':'already provided environment variables'})
  131. else :
  132. reqData = json.loads(msg['reqData'])
  133. self._ptcal.username = reqData['username']
  134. self._ptcal.defaultDCL = reqData['defaultDCL']
  135. self._postMessage(msg['resp'],{'statusCode':200})
  136. elif msg['method'] == 'PUT' and re.match('/current.model',msg['uri']) :
  137. m = json.loads(msg['reqData'])['m']
  138. mms = json.loads(msg['reqData'])['mms']
  139. sn = json.loads(msg['reqData'])['sequence#']
  140. self._ptcal.loadModel(m,mms,sn)
  141. self._postMessage(msg['resp'],{'statusCode':200})
  142. elif msg['method'] == 'PUT' and re.match('/current.transform',msg['uri']):
  143. try :
  144. if not self._ptcal.isStopped() :
  145. self._postMessage(
  146. msg['resp'],
  147. {'statusCode':403,
  148. 'reason':'not allowed to (re)load during '+\
  149. 'ongoing transformation(s)'})
  150. else :
  151. transfs = json.loads(msg['reqData'])['transfs']
  152. transfs.reverse()
  153. self._ptcal.loadTransforms(transfs)
  154. self._postMessage(msg['resp'],{'statusCode':200})
  155. except Exception, e :
  156. self._postMessage(
  157. msg['resp'],
  158. {'statusCode':500,
  159. 'reason':"Error in model transformation worker: " + str(e)})
  160. elif msg['method'] == 'PUT' and re.match('/query.transform',msg['uri']):
  161. try :
  162. self._ptcal.processQuery(json.loads(msg['reqData']))
  163. self._postMessage(msg['resp'],{'statusCode':200})
  164. except Exception, e :
  165. self._postMessage(
  166. msg['resp'],
  167. {'statusCode':500,
  168. 'reason':'There\'s something wrong with the query: '+str(e)})
  169. elif msg['method'] == 'PUT' and re.match('^/execmode',msg['uri']) :
  170. legalModes = ['play','stop','pause','step']
  171. mode = json.loads(msg['reqData'])['mode']
  172. if mode in legalModes :
  173. if self._ptcal.isStopping() :
  174. self._postMessage(
  175. msg['resp'],
  176. {'statusCode':503,
  177. 'reason':'currently processing a STOP request'})
  178. else :
  179. self._postMessage(msg['resp'],{'statusCode':200})
  180. getattr(self._ptcal,mode.lower())()
  181. else :
  182. self._postMessage(
  183. msg['resp'],
  184. {'statusCode':400,
  185. 'reason':'invalid execution command :: '+mode})
  186. elif msg['method'] == 'POST' and re.match('^/toggledebug',msg['uri']) :
  187. self._ptcal.toggleDebugMode()
  188. self._postMessage(msg['resp'],{'statusCode':200})
  189. elif msg['method'] == 'POST' and re.match('^/debugClient',msg['uri']) :
  190. self._ptcal.registerDebugClient(msg['reqData'])
  191. self._postMessage(msg['resp'],{'statusCode':200})
  192. #modular analysis
  193. elif msg['method'] == 'POST' and re.match('^/analyzePN',msg['uri']) :
  194. #self._ptcal.toggleDebugMode()
  195. self._ptcal.analyzePN();
  196. self._postMessage(msg['resp'],{'statusCode':204})
  197. #flat reachability analysis
  198. elif msg['method'] == 'POST' and re.match('^/PNFull',msg['uri']) :
  199. f = json.loads(msg['reqData'])['fname']
  200. #self._ptcal.toggleDebugMode()
  201. self._ptcal.PNFull(fname=f);
  202. self._postMessage(msg['resp'],{'statusCode':204})
  203. elif msg['method'] == 'POST' and re.match('^/dotPN',msg['uri']) :
  204. #self._ptcal.toggleDebugMode()
  205. f = json.loads(msg['reqData'])['fname']
  206. self._ptcal.PNFull(fname=f,dot=True);
  207. self._postMessage(msg['resp'],{'statusCode':204})
  208. elif msg['method'] == 'POST' and re.match('^/bdapiresp',msg['uri']) :
  209. resp = json.loads(msg['reqData'])
  210. self._ptcal._queueBDAPI(resp)
  211. self._postMessage(msg['resp'],{'statusCode':204})
  212. else :
  213. self._postMessage(msg['resp'],{'statusCode':501})
  214. '''
  215. post response back to server '''
  216. def _postMessage(self,resp,msg) :
  217. logging.debug(self.wid+' << #'+str(id(resp))+' '+str(msg))
  218. resp.lock.acquire()
  219. resp.setResponse(msg)
  220. resp.lock.notify()
  221. resp.lock.release()
  222. '''
  223. cause the loop in run() to be interrupted '''
  224. def stop(self):
  225. self._stopped = True
  226. self._lock.acquire()
  227. self._lock.notify()
  228. self._lock.release()