mtworker.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. '''*****************************************************************************
  2. AToMPM - A Tool for Multi-Paradigm Modelling
  3. Copyright (c) 2011 Raphael Mannadiar (raphael.mannadiar@mail.mcgill.ca)
  4. This file is part of AToMPM.
  5. AToMPM is free software: you can redistribute it and/or modify it under the
  6. terms of the GNU Lesser General Public License as published by the Free Software
  7. Foundation, either version 3 of the License, or (at your option) any later
  8. version.
  9. AToMPM is distributed in the hope that it will be useful, but WITHOUT ANY
  10. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  11. PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with AToMPM. If not, see <http://www.gnu.org/licenses/>.
  14. *****************************************************************************'''
  15. import re, threading, json, logging
  16. from ws import WebSocket
  17. from ptcal.ptcal import PyTCoreAbstractionLayer
  18. from ptcal.utils import Utilities as utils
  19. '''
  20. message handler thread: mtworkers delegate the handling of each message they
  21. receive to a new instance of this thread, which
  22. terminates after handling the said message
  23. _msg the message to handle
  24. _onmsg the message handler function '''
  25. class messageHandlerThread(threading.Thread) :
  26. def __init__(self,onmsg,msg) :
  27. threading.Thread.__init__(self)
  28. self._onmsg = onmsg
  29. self._msg = msg
  30. def run(self) :
  31. self._onmsg(self._msg)
  32. '''
  33. mtworker thread: handles every request to given mtworker
  34. wid this worker's id
  35. _msgQueue this worker's message queue (stores incoming REST queries)
  36. _lock a lock that ensures synchronous access to the message queue
  37. and that causes the worker to sleep while waiting for messages
  38. _ws this worker's websocket (listens for its asworker's changelogs)
  39. _aswid this worker's asworker's wid
  40. _wContext a 'summary' of this mtworker used by _ptcal
  41. _ptcal this worker's PyTCoreAbstractionLayer instance
  42. _stopped this flag becomes true when this worker should terminate '''
  43. class mtworkerThread(threading.Thread) :
  44. nextID = 0
  45. '''
  46. sets up instance vars + stores references to _msgQueue and _lock in
  47. argument data structures '''
  48. def __init__(self,mtw2msgQueue,mtw2lock) :
  49. threading.Thread.__init__(self)
  50. self.wid = str(mtworkerThread.nextID)
  51. mtworkerThread.nextID += 1
  52. self._msgQueue = []
  53. mtw2msgQueue[self.wid] = self._msgQueue
  54. self._lock = threading.Condition()
  55. mtw2lock[self.wid] = self._lock
  56. self._ws = None
  57. self._aswid = None
  58. self._ptcal = None
  59. self._stopped = False
  60. '''
  61. init basic mtworker behavior
  62. 0. loop on the steps below until someone stops this thread
  63. 1. acquire self._lock
  64. 2. if self._msgQueue is empty, release self._lock and block until awakened
  65. by notify()... will occur in 1 of 2 cases:
  66. a) when the main thread adds something to the self._msgQueue
  67. b) on self.stop()
  68. to distinguish between both cases, we check if self._msgQueue is empty
  69. ... when it is, we break out of the loop (which terminates the current
  70. mtworkerThread)... when it isn't, we continue to step 3
  71. 3. remove oldest element from self._msgQueue
  72. 4. release the self._lock
  73. 5. delegate the handling of the message from step 3 to a new
  74. messageHandlerThread
  75. NOTE:: self._lock is used here for 2 purposes... 1st, to ensure
  76. synchronous access to self._msgQueue... 2nd, to ensure the worker
  77. thread sleeps while self._msgQueue is empty '''
  78. def run(self):
  79. while not self._stopped :
  80. self._lock.acquire()
  81. if len(self._msgQueue) == 0 :
  82. self._lock.wait()
  83. if len(self._msgQueue) == 0 :
  84. break
  85. msg = self._msgQueue.pop(0)
  86. self._lock.release()
  87. messageHandlerThread(self._onmessage,msg).start()
  88. '''
  89. send a request to this worker's asworker
  90. TBI:: the use of '127.0.0.1' implies that the atompm server is running on
  91. the same machine as the transformation engine... '''
  92. def _aswHttpReq(self,method,uri,data) :
  93. return utils.httpReq(
  94. method,
  95. '127.0.0.1:8124',
  96. uri+'?wid='+self._aswid,
  97. data)
  98. '''
  99. handle an incoming message from the server '''
  100. def _onmessage(self,msg):
  101. if msg == 'DIE' :
  102. return self.stop()
  103. logging.debug(self.wid+' >> #'+str(id(msg['resp']))+' '+\
  104. msg['method']+' '+msg['uri'])
  105. if msg['method'] == 'PUT' and re.match('/aswSubscription',msg['uri']) :
  106. if self._ws != None :
  107. self._postMessage(
  108. msg['resp'],
  109. {'statusCode':403,
  110. 'reason':'already subscribed to an asworker'})
  111. else :
  112. self._aswid = str(json.loads(msg['reqData'])['aswid'])
  113. self._ptcal = PyTCoreAbstractionLayer(
  114. {'httpReq':self._aswHttpReq, 'wid':self._aswid}, self.wid)
  115. try :
  116. self._ws = WebSocket(self._ptcal)
  117. except Exception, e :
  118. self._postMessage(
  119. msg['resp'],
  120. {'statusCode':500,
  121. 'reason':str(e)})
  122. self._ws.subscribe(self._aswid)
  123. def respond(resp) :
  124. if self._ws.subscribed == False :
  125. self._ws.close()
  126. self._postMessage(
  127. resp,
  128. {'statusCode':500,
  129. 'reason':'subscription to asworker failed'})
  130. elif self._ws.subscribed == True :
  131. self._postMessage(resp,{'statusCode':200})
  132. else :
  133. t = threading.Timer(0.5,respond,[resp])
  134. t.start()
  135. respond(msg['resp'])
  136. elif msg['method'] == 'PUT' and re.match('/envvars',msg['uri']) :
  137. if self._ptcal.username != None :
  138. self._postMessage(
  139. msg['resp'],
  140. {'statusCode':403,
  141. 'reason':'already provided environment variables'})
  142. else :
  143. reqData = json.loads(msg['reqData'])
  144. self._ptcal.username = reqData['username']
  145. self._ptcal.defaultDCL = reqData['defaultDCL']
  146. self._postMessage(msg['resp'],{'statusCode':200})
  147. elif msg['method'] == 'PUT' and re.match('/current.model',msg['uri']) :
  148. m = json.loads(msg['reqData'])['m']
  149. mms = json.loads(msg['reqData'])['mms']
  150. sn = json.loads(msg['reqData'])['sequence#']
  151. self._ptcal.loadModel(m,mms,sn)
  152. self._postMessage(msg['resp'],{'statusCode':200})
  153. elif msg['method'] == 'PUT' and re.match('/current.transform',msg['uri']):
  154. try :
  155. if not self._ptcal.isStopped() :
  156. self._postMessage(
  157. msg['resp'],
  158. {'statusCode':403,
  159. 'reason':'not allowed to (re)load during '+\
  160. 'ongoing transformation(s)'})
  161. else :
  162. transfs = json.loads(msg['reqData'])['transfs']
  163. transfs.reverse()
  164. self._ptcal.loadTransforms(transfs)
  165. self._postMessage(msg['resp'],{'statusCode':200})
  166. except Exception, e :
  167. self._postMessage(
  168. msg['resp'],
  169. {'statusCode':500,
  170. 'reason':str(e)})
  171. elif msg['method'] == 'PUT' and re.match('/query.transform',msg['uri']):
  172. try :
  173. self._ptcal.processQuery(json.loads(msg['reqData']))
  174. self._postMessage(msg['resp'],{'statusCode':200})
  175. except Exception, e :
  176. self._postMessage(
  177. msg['resp'],
  178. {'statusCode':500,
  179. 'reason':'There\'s something wrong with the query: '+str(e)})
  180. elif msg['method'] == 'PUT' and re.match('^/execmode',msg['uri']) :
  181. legalModes = ['play','stop','pause','step']
  182. mode = json.loads(msg['reqData'])['mode']
  183. if mode in legalModes :
  184. if self._ptcal.isStopping() :
  185. self._postMessage(
  186. msg['resp'],
  187. {'statusCode':503,
  188. 'reason':'currently processing a STOP request'})
  189. else :
  190. self._postMessage(msg['resp'],{'statusCode':200})
  191. getattr(self._ptcal,mode.lower())()
  192. else :
  193. self._postMessage(
  194. msg['resp'],
  195. {'statusCode':400,
  196. 'reason':'invalid execution command :: '+mode})
  197. elif msg['method'] == 'POST' and re.match('^/toggledebug',msg['uri']) :
  198. self._ptcal.toggleDebugMode()
  199. self._postMessage(msg['resp'],{'statusCode':200})
  200. elif msg['method'] == 'POST' and re.match('^/debugClient',msg['uri']) :
  201. self._ptcal.registerDebugClient(msg['reqData'])
  202. self._postMessage(msg['resp'],{'statusCode':200})
  203. #modular analysis
  204. elif msg['method'] == 'POST' and re.match('^/analyzePN',msg['uri']) :
  205. #self._ptcal.toggleDebugMode()
  206. self._ptcal.analyzePN();
  207. self._postMessage(msg['resp'],{'statusCode':204})
  208. #flat reachability analysis
  209. elif msg['method'] == 'POST' and re.match('^/PNFull',msg['uri']) :
  210. f = json.loads(msg['reqData'])['fname']
  211. #self._ptcal.toggleDebugMode()
  212. self._ptcal.PNFull(fname=f);
  213. self._postMessage(msg['resp'],{'statusCode':204})
  214. elif msg['method'] == 'POST' and re.match('^/dotPN',msg['uri']) :
  215. #self._ptcal.toggleDebugMode()
  216. f = json.loads(msg['reqData'])['fname']
  217. self._ptcal.PNFull(fname=f,dot=True);
  218. self._postMessage(msg['resp'],{'statusCode':204})
  219. elif msg['method'] == 'POST' and re.match('^/bdapiresp',msg['uri']) :
  220. resp = json.loads(msg['reqData'])
  221. self._ptcal._queueBDAPI(resp)
  222. self._postMessage(msg['resp'],{'statusCode':204})
  223. else :
  224. self._postMessage(msg['resp'],{'statusCode':501})
  225. '''
  226. post response back to server '''
  227. def _postMessage(self,resp,msg) :
  228. logging.debug(self.wid+' << #'+str(id(resp))+' '+str(msg))
  229. resp.lock.acquire()
  230. resp.setResponse(msg)
  231. resp.lock.notify()
  232. resp.lock.release()
  233. '''
  234. cause the loop in run() to be interrupted '''
  235. def stop(self):
  236. self._stopped = True
  237. self._lock.acquire()
  238. self._lock.notify()
  239. self._lock.release()