mtworker.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. import re, threading, json, logging
  5. from ws import WebSocket
  6. from ptcal.ptcal import PyTCoreAbstractionLayer
  7. from ptcal.utils import Utilities as utils
  8. import traceback
  9. '''
  10. message handler thread: mtworkers delegate the handling of each message they
  11. receive to a new instance of this thread, which
  12. terminates after handling the said message
  13. _msg the message to handle
  14. _onmsg the message handler function '''
  15. class messageHandlerThread(threading.Thread) :
  16. def __init__(self,onmsg,msg) :
  17. threading.Thread.__init__(self)
  18. self._onmsg = onmsg
  19. self._msg = msg
  20. def run(self) :
  21. self._onmsg(self._msg)
  22. '''
  23. mtworker thread: handles every request to given mtworker
  24. wid this worker's id
  25. _msgQueue this worker's message queue (stores incoming REST queries)
  26. _lock a lock that ensures synchronous access to the message queue
  27. and that causes the worker to sleep while waiting for messages
  28. _ws this worker's websocket (listens for its asworker's changelogs)
  29. _aswid this worker's asworker's wid
  30. _wContext a 'summary' of this mtworker used by _ptcal
  31. _ptcal this worker's PyTCoreAbstractionLayer instance
  32. _stopped this flag becomes true when this worker should terminate '''
  33. class mtworkerThread(threading.Thread) :
  34. nextID = 0
  35. '''
  36. sets up instance vars + stores references to _msgQueue and _lock in
  37. argument data structures '''
  38. def __init__(self,mtw2msgQueue,mtw2lock) :
  39. threading.Thread.__init__(self)
  40. self.wid = str(mtworkerThread.nextID)
  41. mtworkerThread.nextID += 1
  42. self._msgQueue = []
  43. mtw2msgQueue[self.wid] = self._msgQueue
  44. self._lock = threading.Condition()
  45. mtw2lock[self.wid] = self._lock
  46. self._ws = None
  47. self._aswid = None
  48. self._ptcal = None
  49. self._stopped = False
  50. '''
  51. init basic mtworker behavior
  52. 0. loop on the steps below until someone stops this thread
  53. 1. acquire self._lock
  54. 2. if self._msgQueue is empty, release self._lock and block until awakened
  55. by notify()... will occur in 1 of 2 cases:
  56. a) when the main thread adds something to the self._msgQueue
  57. b) on self.stop()
  58. to distinguish between both cases, we check if self._msgQueue is empty
  59. ... when it is, we break out of the loop (which terminates the current
  60. mtworkerThread)... when it isn't, we continue to step 3
  61. 3. remove oldest element from self._msgQueue
  62. 4. release the self._lock
  63. 5. delegate the handling of the message from step 3 to a new
  64. messageHandlerThread
  65. NOTE:: self._lock is used here for 2 purposes... 1st, to ensure
  66. synchronous access to self._msgQueue... 2nd, to ensure the worker
  67. thread sleeps while self._msgQueue is empty '''
  68. def run(self):
  69. while not self._stopped :
  70. self._lock.acquire()
  71. if len(self._msgQueue) == 0 :
  72. self._lock.wait()
  73. if len(self._msgQueue) == 0 :
  74. break
  75. msg = self._msgQueue.pop(0)
  76. self._lock.release()
  77. messageHandlerThread(self._onmessage,msg).start()
  78. '''
  79. send a request to this worker's asworker
  80. TBI:: the use of '127.0.0.1' implies that the atompm server is running on
  81. the same machine as the transformation engine... '''
  82. def _aswHttpReq(self,method,uri,data) :
  83. return utils.httpReq(
  84. method,
  85. '127.0.0.1:8124',
  86. uri+'?wid='+self._aswid,
  87. data)
  88. '''
  89. handle an incoming message from the server '''
  90. def _onmessage(self,msg):
  91. if msg == 'DIE' :
  92. return self.stop()
  93. logging.debug(self.wid+' >> #'+str(id(msg['resp']))+' '+ \
  94. msg['method']+' '+msg['uri'])
  95. if msg['method'] == 'PUT' and re.match('/aswSubscription',msg['uri']) :
  96. if self._ws != None :
  97. self._postMessage(
  98. msg['resp'],
  99. {'statusCode':403,
  100. 'reason':'already subscribed to an asworker'})
  101. else :
  102. self._aswid = str(json.loads(msg['reqData'])['aswid'])
  103. self._ptcal = PyTCoreAbstractionLayer(
  104. {'httpReq':self._aswHttpReq, 'wid':self._aswid}, self.wid)
  105. try :
  106. self._ws = WebSocket(self._ptcal)
  107. except Exception as e :
  108. self._postMessage(
  109. msg['resp'],
  110. {'statusCode':500,
  111. 'reason':str(e)})
  112. self._ws.subscribe(self._aswid)
  113. def respond(resp) :
  114. if self._ws.subscribed == False :
  115. self._ws.close()
  116. self._postMessage(
  117. resp,
  118. {'statusCode':500,
  119. 'reason':'subscription to asworker failed'})
  120. elif self._ws.subscribed == True :
  121. self._postMessage(resp,{'statusCode':200})
  122. else :
  123. t = threading.Timer(0.5,respond,[resp])
  124. t.start()
  125. respond(msg['resp'])
  126. elif msg['method'] == 'PUT' and re.match('/envvars',msg['uri']) :
  127. if self._ptcal.username != None :
  128. self._postMessage(
  129. msg['resp'],
  130. {'statusCode':403,
  131. 'reason':'already provided environment variables'})
  132. else :
  133. reqData = json.loads(msg['reqData'])
  134. self._ptcal.username = reqData['username']
  135. self._ptcal.defaultDCL = reqData['defaultDCL']
  136. self._postMessage(msg['resp'],{'statusCode':200})
  137. elif msg['method'] == 'PUT' and re.match('/current.model',msg['uri']) :
  138. m = json.loads(msg['reqData'])['m']
  139. mms = json.loads(msg['reqData'])['mms']
  140. sn = json.loads(msg['reqData'])['sequence#']
  141. self._ptcal.loadModel(m,mms,sn)
  142. self._postMessage(msg['resp'],{'statusCode':200})
  143. elif msg['method'] == 'PUT' and re.match('/current.transform',msg['uri']):
  144. try :
  145. if not self._ptcal.isStopped() :
  146. self._postMessage(
  147. msg['resp'],
  148. {'statusCode':403,
  149. 'reason':'not allowed to (re)load during '+ \
  150. 'ongoing transformation(s)'})
  151. else :
  152. transfs = json.loads(msg['reqData'])['transfs']
  153. transfs.reverse()
  154. self._ptcal.loadTransforms(transfs)
  155. self._postMessage(msg['resp'],{'statusCode':200})
  156. except Exception as e :
  157. print("Exception: " + str(e))
  158. traceback.print_exc()
  159. self._postMessage(
  160. msg['resp'],
  161. {'statusCode':500,
  162. 'reason':"Error in model transformation worker: " + str(e)})
  163. elif msg['method'] == 'PUT' and re.match('/query.transform',msg['uri']):
  164. try :
  165. self._ptcal.processQuery(json.loads(msg['reqData']))
  166. self._postMessage(msg['resp'],{'statusCode':200})
  167. except Exception as e :
  168. self._postMessage(
  169. msg['resp'],
  170. {'statusCode':500,
  171. 'reason':'There\'s something wrong with the query: '+str(e)})
  172. elif msg['method'] == 'PUT' and re.match('^/execmode',msg['uri']) :
  173. legalModes = ['play','stop','pause','step']
  174. mode = json.loads(msg['reqData'])['mode']
  175. if mode in legalModes :
  176. if self._ptcal.isStopping() :
  177. self._postMessage(
  178. msg['resp'],
  179. {'statusCode':503,
  180. 'reason':'currently processing a STOP request'})
  181. else :
  182. self._postMessage(msg['resp'],{'statusCode':200})
  183. getattr(self._ptcal,mode.lower())()
  184. else :
  185. self._postMessage(
  186. msg['resp'],
  187. {'statusCode':400,
  188. 'reason':'invalid execution command :: '+mode})
  189. elif msg['method'] == 'POST' and re.match('^/toggledebug',msg['uri']) :
  190. self._ptcal.toggleDebugMode()
  191. self._postMessage(msg['resp'],{'statusCode':200})
  192. elif msg['method'] == 'POST' and re.match('^/debugClient',msg['uri']) :
  193. self._ptcal.registerDebugClient(msg['reqData'])
  194. self._postMessage(msg['resp'],{'statusCode':200})
  195. #modular analysis
  196. elif msg['method'] == 'POST' and re.match('^/analyzePN',msg['uri']) :
  197. #self._ptcal.toggleDebugMode()
  198. self._ptcal.analyzePN();
  199. self._postMessage(msg['resp'],{'statusCode':204})
  200. #flat reachability analysis
  201. elif msg['method'] == 'POST' and re.match('^/PNFull',msg['uri']) :
  202. f = json.loads(msg['reqData'])['fname']
  203. #self._ptcal.toggleDebugMode()
  204. self._ptcal.PNFull(fname=f);
  205. self._postMessage(msg['resp'],{'statusCode':204})
  206. elif msg['method'] == 'POST' and re.match('^/dotPN',msg['uri']) :
  207. #self._ptcal.toggleDebugMode()
  208. f = json.loads(msg['reqData'])['fname']
  209. self._ptcal.PNFull(fname=f,dot=True);
  210. self._postMessage(msg['resp'],{'statusCode':204})
  211. elif msg['method'] == 'POST' and re.match('^/bdapiresp',msg['uri']) :
  212. resp = json.loads(msg['reqData'])
  213. self._ptcal._queueBDAPI(resp)
  214. self._postMessage(msg['resp'],{'statusCode':204})
  215. else :
  216. self._postMessage(msg['resp'],{'statusCode':501})
  217. '''
  218. post response back to server '''
  219. def _postMessage(self,resp,msg) :
  220. logging.debug(self.wid+' << #'+str(id(resp))+' '+str(msg))
  221. resp.lock.acquire()
  222. resp.setResponse(msg)
  223. resp.lock.notify()
  224. resp.lock.release()
  225. '''
  226. cause the loop in run() to be interrupted '''
  227. def stop(self):
  228. self._stopped = True
  229. self._lock.acquire()
  230. self._lock.notify()
  231. self._lock.release()