httpd.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. import sys
  5. if sys.version_info[0] < 3:
  6. import threading, urlparse
  7. from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
  8. from SocketServer import ThreadingMixIn
  9. from mtworker import mtworkerThread
  10. else:
  11. import threading
  12. import urllib.parse as urlparse
  13. from http.server import BaseHTTPRequestHandler, HTTPServer
  14. from socketserver import ThreadingMixIn
  15. from mtworker import mtworkerThread
  16. mtw2msgQueue = {} #maps mtworkers to their message queues
  17. mtw2lock = {} #maps workers to locks
  18. '''
  19. http request handler thread: one instance per http request to http server '''
  20. class HTTPRequestHandler(BaseHTTPRequestHandler) :
  21. def do_GET(self) :
  22. self._onrequest()
  23. def do_POST(self) :
  24. self._onrequest()
  25. def do_PUT(self) :
  26. self._onrequest()
  27. def do_DELETE(self) :
  28. self._onrequest()
  29. '''
  30. handle an incoming request '''
  31. def _onrequest(self) :
  32. print(self.command+' '+self.path)
  33. #spawn new worker + respond worker id
  34. if( self.path == '/mtworker' and self.command == 'POST' ) :
  35. mtw = mtworkerThread(mtw2msgQueue,mtw2lock)
  36. mtw.start()
  37. return self._respond(201,'',mtw.wid)
  38. #check for valid worker id
  39. url = urlparse.urlparse(self.path)
  40. query = urlparse.parse_qs(url[4])
  41. if query == '' or 'wid' not in query :
  42. return self._respond(400, 'missing worker id')
  43. wid = query['wid'][0]
  44. if wid not in mtw2msgQueue :
  45. return self._respond(400, 'invalid worker id :: '+wid)
  46. #retrieve reqdata if any
  47. reqData = None
  48. if (self.command == 'PUT' or self.command == 'POST') :
  49. if sys.version_info < (3, 0):
  50. header = self.headers.getheader('Content-Length')
  51. else:
  52. header = self.headers.get('Content-Length')
  53. dl = int(header or 0)
  54. if dl > 0 :
  55. reqData = self.rfile.read(dl)
  56. #setup lock and response objects + forward request to worker
  57. self.lock = threading.Condition()
  58. self._response = {}
  59. self._postMessage(
  60. wid,
  61. {'method':self.command,
  62. 'uri':self.path,
  63. 'reqData':reqData,
  64. 'resp':self})
  65. #wait on worker's response (necessary completing the execution of a do_*()
  66. #causes an empty response to be sent)
  67. self.lock.acquire()
  68. if self._response == {} :
  69. self.lock.wait()
  70. self.lock.release()
  71. self._respond(
  72. self._response['statusCode'],
  73. self._response['reason'],
  74. self._response['data'],
  75. self._response['headers'])
  76. '''
  77. send a message to an mtworker
  78. 1. acquire lock on its msgQueue (this call may block very briefly if
  79. worker is currently using msgQueue)
  80. 2. add msg to it
  81. 3. release lock and notify worker that a new msg is available '''
  82. def _postMessage(self,wid,msg) :
  83. mtw2lock[wid].acquire()
  84. mtw2msgQueue[wid].append(msg)
  85. mtw2lock[wid].notify()
  86. mtw2lock[wid].release()
  87. '''
  88. send a response
  89. 1. send statusCode
  90. 2. send headers
  91. 3. send data|reason '''
  92. def _respond(self,statusCode,reason='',data='',headers='') :
  93. self.send_response(statusCode)
  94. if headers == '' :
  95. self.send_header('Content-Type','text/plain')
  96. else :
  97. for h,i in headers.items() :
  98. self.send_header(h,i)
  99. self.end_headers()
  100. if round(statusCode/100.0) != 2 :
  101. if reason != '' :
  102. if sys.version_info < (3, 0):
  103. reason = bytes(reason)
  104. else:
  105. reason = bytes(reason, 'utf8')
  106. self.wfile.write(reason)
  107. else :
  108. if data != '' :
  109. if sys.version_info < (3, 0):
  110. data = bytes(data)
  111. else:
  112. data = bytes(data, 'utf8')
  113. self.wfile.write(data)
  114. '''
  115. used by worker threads to populate self._response with their results '''
  116. def setResponse(self,msg) :
  117. self._response['statusCode'] = msg['statusCode']
  118. for x in ('reason','data','headers') :
  119. if x in msg :
  120. self._response[x] = msg[x]
  121. else :
  122. self._response[x] = ''
  123. '''
  124. init thread that runs http server '''
  125. class HTTPServerThread(threading.Thread) :
  126. def __init__(self) :
  127. threading.Thread.__init__(self)
  128. def run(self):
  129. self.httpd = MultiThreadedHTTPServer(('127.0.0.1', 8125), HTTPRequestHandler)
  130. self.httpd.serve_forever()
  131. self.httpd.socket.close()
  132. def stop(self) :
  133. self.httpd.shutdown()
  134. for wid in mtw2lock :
  135. mtw2lock[wid].acquire()
  136. mtw2msgQueue[wid].append('DIE')
  137. mtw2lock[wid].notify()
  138. mtw2lock[wid].release()
  139. '''
  140. multi-threaded http server '''
  141. class MultiThreadedHTTPServer(ThreadingMixIn, HTTPServer):
  142. pass