httpd.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. '''*****************************************************************************
  2. AToMPM - A Tool for Multi-Paradigm Modelling
  3. Copyright (c) 2011 Raphael Mannadiar (raphael.mannadiar@mail.mcgill.ca)
  4. This file is part of AToMPM.
  5. AToMPM is free software: you can redistribute it and/or modify it under the
  6. terms of the GNU Lesser General Public License as published by the Free Software
  7. Foundation, either version 3 of the License, or (at your option) any later
  8. version.
  9. AToMPM is distributed in the hope that it will be useful, but WITHOUT ANY
  10. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  11. PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with AToMPM. If not, see <http://www.gnu.org/licenses/>.
  14. *****************************************************************************'''
  15. import threading, urlparse
  16. from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
  17. from SocketServer import ThreadingMixIn
  18. from mtworker import mtworkerThread
  19. mtw2msgQueue = {} #maps mtworkers to their message queues
  20. mtw2lock = {} #maps workers to locks
  21. '''
  22. http request handler thread: one instance per http request to http server '''
  23. class HTTPRequestHandler(BaseHTTPRequestHandler) :
  24. def do_GET(self) :
  25. self._onrequest()
  26. def do_POST(self) :
  27. self._onrequest()
  28. def do_PUT(self) :
  29. self._onrequest()
  30. def do_DELETE(self) :
  31. self._onrequest()
  32. '''
  33. handle an incoming request '''
  34. def _onrequest(self) :
  35. print(self.command+' '+self.path)
  36. #spawn new worker + respond worker id
  37. if( self.path == '/mtworker' and self.command == 'POST' ) :
  38. mtw = mtworkerThread(mtw2msgQueue,mtw2lock)
  39. mtw.start()
  40. return self._respond(201,'',mtw.wid)
  41. #check for valid worker id
  42. url = urlparse.urlparse(self.path)
  43. query = urlparse.parse_qs(url[4])
  44. if query == '' or 'wid' not in query :
  45. return self._respond(400, 'missing worker id')
  46. wid = query['wid'][0]
  47. if wid not in mtw2msgQueue :
  48. return self._respond(400, 'invalid worker id :: '+wid)
  49. #retrieve reqdata if any
  50. reqData = None
  51. if (self.command == 'PUT' or self.command == 'POST') :
  52. dl = int(self.headers.getheader('Content-Length') or 0)
  53. if dl > 0 :
  54. reqData = self.rfile.read(dl)
  55. #setup lock and response objects + forward request to worker
  56. self.lock = threading.Condition()
  57. self._response = {}
  58. self._postMessage(
  59. wid,
  60. {'method':self.command,
  61. 'uri':self.path,
  62. 'reqData':reqData,
  63. 'resp':self})
  64. #wait on worker's response (necessary completing the execution of a do_*()
  65. #causes an empty response to be sent)
  66. self.lock.acquire()
  67. if self._response == {} :
  68. self.lock.wait()
  69. self.lock.release()
  70. self._respond(
  71. self._response['statusCode'],
  72. self._response['reason'],
  73. self._response['data'],
  74. self._response['headers'])
  75. '''
  76. send a message to an mtworker
  77. 1. acquire lock on its msgQueue (this call may block very briefly if
  78. worker is currently using msgQueue)
  79. 2. add msg to it
  80. 3. release lock and notify worker that a new msg is available '''
  81. def _postMessage(self,wid,msg) :
  82. mtw2lock[wid].acquire()
  83. mtw2msgQueue[wid].append(msg)
  84. mtw2lock[wid].notify()
  85. mtw2lock[wid].release()
  86. '''
  87. send a response
  88. 1. send statusCode
  89. 2. send headers
  90. 3. send data|reason '''
  91. def _respond(self,statusCode,reason='',data='',headers='') :
  92. self.send_response(statusCode)
  93. if headers == '' :
  94. self.send_header('Content-Type','text/plain')
  95. else :
  96. for h,i in headers.iteritems() :
  97. self.send_header(h,i)
  98. self.end_headers()
  99. if round(statusCode/100.0) != 2 :
  100. if reason != '' :
  101. self.wfile.write(reason)
  102. else :
  103. if data != '' :
  104. self.wfile.write(data)
  105. '''
  106. used by worker threads to populate self._response with their results '''
  107. def setResponse(self,msg) :
  108. self._response['statusCode'] = msg['statusCode']
  109. for x in ('reason','data','headers') :
  110. if x in msg :
  111. self._response[x] = msg[x]
  112. else :
  113. self._response[x] = ''
  114. '''
  115. init thread that runs http server '''
  116. class HTTPServerThread(threading.Thread) :
  117. def __init__(self) :
  118. threading.Thread.__init__(self)
  119. def run(self):
  120. self.httpd = MultiThreadedHTTPServer(('127.0.0.1', 8125), HTTPRequestHandler)
  121. self.httpd.serve_forever()
  122. self.httpd.socket.close()
  123. def stop(self) :
  124. self.httpd.shutdown()
  125. for wid in mtw2lock :
  126. mtw2lock[wid].acquire()
  127. mtw2msgQueue[wid].append('DIE')
  128. mtw2lock[wid].notify()
  129. mtw2lock[wid].release()
  130. '''
  131. multi-threaded http server '''
  132. class MultiThreadedHTTPServer(ThreadingMixIn, HTTPServer):
  133. pass