httpd.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. import threading, urlparse
  5. from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
  6. from SocketServer import ThreadingMixIn
  7. from mtworker import mtworkerThread
  8. mtw2msgQueue = {} #maps mtworkers to their message queues
  9. mtw2lock = {} #maps workers to locks
  10. '''
  11. http request handler thread: one instance per http request to http server '''
  12. class HTTPRequestHandler(BaseHTTPRequestHandler) :
  13. def do_GET(self) :
  14. self._onrequest()
  15. def do_POST(self) :
  16. self._onrequest()
  17. def do_PUT(self) :
  18. self._onrequest()
  19. def do_DELETE(self) :
  20. self._onrequest()
  21. '''
  22. handle an incoming request '''
  23. def _onrequest(self) :
  24. print(self.command+' '+self.path)
  25. #spawn new worker + respond worker id
  26. if( self.path == '/mtworker' and self.command == 'POST' ) :
  27. mtw = mtworkerThread(mtw2msgQueue,mtw2lock)
  28. mtw.start()
  29. return self._respond(201,'',mtw.wid)
  30. #check for valid worker id
  31. url = urlparse.urlparse(self.path)
  32. query = urlparse.parse_qs(url[4])
  33. if query == '' or 'wid' not in query :
  34. return self._respond(400, 'missing worker id')
  35. wid = query['wid'][0]
  36. if wid not in mtw2msgQueue :
  37. return self._respond(400, 'invalid worker id :: '+wid)
  38. #retrieve reqdata if any
  39. reqData = None
  40. if (self.command == 'PUT' or self.command == 'POST') :
  41. dl = int(self.headers.getheader('Content-Length') or 0)
  42. if dl > 0 :
  43. reqData = self.rfile.read(dl)
  44. #setup lock and response objects + forward request to worker
  45. self.lock = threading.Condition()
  46. self._response = {}
  47. self._postMessage(
  48. wid,
  49. {'method':self.command,
  50. 'uri':self.path,
  51. 'reqData':reqData,
  52. 'resp':self})
  53. #wait on worker's response (necessary completing the execution of a do_*()
  54. #causes an empty response to be sent)
  55. self.lock.acquire()
  56. if self._response == {} :
  57. self.lock.wait()
  58. self.lock.release()
  59. self._respond(
  60. self._response['statusCode'],
  61. self._response['reason'],
  62. self._response['data'],
  63. self._response['headers'])
  64. '''
  65. send a message to an mtworker
  66. 1. acquire lock on its msgQueue (this call may block very briefly if
  67. worker is currently using msgQueue)
  68. 2. add msg to it
  69. 3. release lock and notify worker that a new msg is available '''
  70. def _postMessage(self,wid,msg) :
  71. mtw2lock[wid].acquire()
  72. mtw2msgQueue[wid].append(msg)
  73. mtw2lock[wid].notify()
  74. mtw2lock[wid].release()
  75. '''
  76. send a response
  77. 1. send statusCode
  78. 2. send headers
  79. 3. send data|reason '''
  80. def _respond(self,statusCode,reason='',data='',headers='') :
  81. self.send_response(statusCode)
  82. if headers == '' :
  83. self.send_header('Content-Type','text/plain')
  84. else :
  85. for h,i in headers.iteritems() :
  86. self.send_header(h,i)
  87. self.end_headers()
  88. if round(statusCode/100.0) != 2 :
  89. if reason != '' :
  90. self.wfile.write(reason)
  91. else :
  92. if data != '' :
  93. self.wfile.write(data)
  94. '''
  95. used by worker threads to populate self._response with their results '''
  96. def setResponse(self,msg) :
  97. self._response['statusCode'] = msg['statusCode']
  98. for x in ('reason','data','headers') :
  99. if x in msg :
  100. self._response[x] = msg[x]
  101. else :
  102. self._response[x] = ''
  103. '''
  104. init thread that runs http server '''
  105. class HTTPServerThread(threading.Thread) :
  106. def __init__(self) :
  107. threading.Thread.__init__(self)
  108. def run(self):
  109. self.httpd = MultiThreadedHTTPServer(('127.0.0.1', 8125), HTTPRequestHandler)
  110. self.httpd.serve_forever()
  111. self.httpd.socket.close()
  112. def stop(self) :
  113. self.httpd.shutdown()
  114. for wid in mtw2lock :
  115. mtw2lock[wid].acquire()
  116. mtw2msgQueue[wid].append('DIE')
  117. mtw2lock[wid].notify()
  118. mtw2lock[wid].release()
  119. '''
  120. multi-threaded http server '''
  121. class MultiThreadedHTTPServer(ThreadingMixIn, HTTPServer):
  122. pass