import json import os import pexpect import sys import threading import time import urllib import urllib2 sys.path.append("interface/HUTN") from hutn_compiler.compiler import main # CONSTANTS user = 'test' timeout = 10 grammar_file = 'interface/HUTN/grammars/actionlanguage.g' # Consumes output of some subprocess in a background thread class OutletThread(threading.Thread): # obj: pexpect.spawn object # delay: time (in seconds) to wait between two reads def __init__(self, obj, delay = 1): threading.Thread.__init__(self) self.obj = obj self.on = True self.delay = delay # main def run(self): while self.on: try: x = self.obj.read_nonblocking(size=2048, timeout=0) sys.stdout.write(x) sys.stdout.flush() except pexpect.TIMEOUT: pass time.sleep(self.delay) # utils def join(self, timeout=None): self.on = False threading.Thread.join(self, timeout) def pause(self): self.on = False def resume(self): self.on = True # do obj.expect(s) and print the output of the spawned process def pp(obj, s): obj.expect(s) print obj.before + obj.after def execute(scriptname, parameters=[]): if os.name == "nt": command = "%s.bat" % scriptname elif os.name == "posix": command = "./%s.sh" % scriptname else: raise Exception("Unknown OS: " + str(os.name)) ok = False proc = None while not ok: try: proc = pexpect.spawn(command, parameters) ok = True except: pass return proc def set_input(value, user=user, timeout=timeout): if isinstance(value, tuple): for v in value: set_input(v) else: args = {"op": "set_input", "element_type": "V", "value": json.dumps(value), "username": user} urllib2.urlopen( urllib2.Request("http://localhost:8001/", urllib.urlencode(args)), timeout=timeout).read() def flush_data(data, user=user, timeout=timeout): args = {"op": "set_input", "data": json.dumps(data), "username": user} urllib2.urlopen( urllib2.Request("http://localhost:8001/", urllib.urlencode(args)), timeout=timeout).read() return [] def get_output(user=user, timeout=timeout): return urllib2.urlopen( urllib2.Request("http://localhost:8001/", urllib.urlencode({"op": "get_output", "username": user})), timeout=timeout).read() def create_user(user): ok = False while not ok: try: set_input(user, "user_manager") ok = True except: pass def run_file(input_file, test_input, test_output, mode="PS"): # Resolve file input_file = "integration/code/%s" % input_file # Run Modelverse server mv = execute("run_local_modelverse", ["bootstrap/bootstrap.m"]) # consume output of MvK and print it in the main thread mvo = OutletThread(mv) mvo.start() # determine the interface if mode == "CS": interface = 1 elif mode == "PS": interface = 0 else: raise RuntimeError("Mode be either PS or CS") # compile to code code = main(input_file, grammar_file, mode) create_user(user) # set interface set_input(interface) try: # send code if mode == "CS": var_list = {} data = [] for p in code: if isinstance(p, int): if p not in var_list: data = flush_data(data) data = [] val = get_output() val = val.split("=", 2)[1].split("&", 1)[0] var_list[p] = val continue else: val = var_list[p] t = "R" else: val = p t = "V" data.append([t, val]) data = flush_data(data) elif mode == "PS": set_input(code) # set input and get output for i in range(len(test_input)): set_input(test_input[i]) val = get_output() val = val.split("=", 2)[2] print("For input %s, got output %s, expected %s" % (test_input[i], val, test_output[i])) assert str(val) == test_output[i] if str(val) != test_output[i]: return False finally: mvo.join() mv.sendintr() # All passed! return True def run_barebone(constructors_and_test_input, test_output, interface): # Run Modelverse server mv = execute("run_local_modelverse", ["bootstrap/bootstrap.m"]) # consume output of MvK and print it in the main thread mvo = OutletThread(mv) mvo.start() create_user(user) # set interface set_input(interface) # send constructors and inpout var_list = {} data = [] for p in constructors_and_test_input: if isinstance(p, int): if p not in var_list: data = flush_data(data) val = get_output() val = val.split("=", 2)[1].split("&", 1)[0] var_list[p] = val continue else: val = var_list[p] t = "R" else: val = p t = "V" data.append([t, val]) data = flush_data(data) for e in test_output: val = get_output() val = val.split("=", 2)[2] print("Got %s, expected %s" % (val, e)) assert str(val) == e if str(val) != e: mvo.join() mv.sendintr() return False # for e in test_output: # c = len(e) if isinstance(e, set) else 1 # for _ in range(c): # val = get_output(user) # val = val.split("=", 2)[2] # # print("Got %s, expected %s" % (val, e)) # if isinstance(e, set): # assert str(val) in e # if str(val) not in e: # mvo.join() # mv.sendintr() # return False # else: # assert str(val) == e # if str(val) != e: # mvo.join() # mv.sendintr() # return False # All passed! mvo.join() mv.sendintr() return True # assert(run_file("binary_to_decimal.al", # ["1", "10", "11", "100", "001", "1100111101"], # ["1", "2", "3", "4", "1", "829"], # "PS")) assert(run_file("binary_to_decimal.al", ["1", "10", "11", "100", "001", "1100111101"], ["1", "2", "3", "4", "1", "829"], "CS")) # assert(run_file("fibonacci.al", [1, 2, 3, 4], ["1", "1", "2", "3"])) # assert(run_file("fibonacci.al", [1, 2, 3, 4], ["1", "1", "2", "3"], mode="CS")) # assert(run_file("power.al", [(1, 0), (2, 1), (5, 0), (2, 2), (3, 2), (10, 2), (10, 10)], ["1", "2", "1", "4", "9", "100", "10000000000"])) # assert(run_file("binary_to_decimal.al", ["1", "10", "11", "100", "001", "1100111101"], ["1", "2", "3", "4", "1", "829"])) # assert(run_file("leap_year.al", [ 2016, 2015, 2014, 2013, 2012, 2001, 2000, 1999], # ["True", "False", "False", "False", "True", "False", "False", "False"])) # # # def flatten(lst): # new_lst = [] # for f in lst: # if isinstance(f, (list, tuple)): # new_lst.extend(flatten(f)) # else: # new_lst.append(f) # return new_lst # # # commands = [('"output"', # Output # ('"const"', 'true'), # 'true', # (has next) # ('"return"', # Return # 'true', # (has value) # ('"const"', 'true'), # ), # ) # ] # # # assert(run_barebone(flatten(commands), ["True"], 1))