123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- import json
- import os
- import pexpect
- import sys
- import threading
- import time
- import urllib
- import urllib2
- sys.path.append("interface/HUTN")
- from hutn_compiler.compiler import main
- # CONSTANTS
- user = 'test'
- timeout = 10
- grammar_file = 'interface/HUTN/grammars/actionlanguage.g'
- # Consumes output of some subprocess in a background thread
- class OutletThread(threading.Thread):
- # obj: pexpect.spawn object
- # delay: time (in seconds) to wait between two reads
- def __init__(self, obj, delay = 1):
- threading.Thread.__init__(self)
- self.obj = obj
- self.on = True
- self.delay = delay
- # main
- def run(self):
- while self.on:
- try:
- x = self.obj.read_nonblocking(size=2048, timeout=0)
- sys.stdout.write(x)
- sys.stdout.flush()
- except pexpect.TIMEOUT:
- pass
- time.sleep(self.delay)
- # utils
- def join(self, timeout=None):
- self.on = False
- threading.Thread.join(self, timeout)
- def pause(self):
- self.on = False
- def resume(self):
- self.on = True
- # do obj.expect(s) and print the output of the spawned process
- def pp(obj, s):
- obj.expect(s)
- print obj.before + obj.after
- def execute(scriptname, parameters=[]):
- if os.name == "nt":
- command = "%s.bat" % scriptname
- elif os.name == "posix":
- command = "./%s.sh" % scriptname
- else:
- raise Exception("Unknown OS: " + str(os.name))
- ok = False
- proc = None
- while not ok:
- try:
- proc = pexpect.spawn(command, parameters)
- ok = True
- except:
- pass
- return proc
- def set_input(value, user=user, timeout=timeout):
- if isinstance(value, tuple):
- for v in value:
- set_input(v)
- else:
- args = {"op": "set_input",
- "element_type": "V",
- "value": json.dumps(value),
- "username": user}
- urllib2.urlopen(
- urllib2.Request("http://localhost:8001/", urllib.urlencode(args)),
- timeout=timeout).read()
- def flush_data(data, user=user, timeout=timeout):
- args = {"op": "set_input",
- "data": json.dumps(data),
- "username": user}
- urllib2.urlopen(
- urllib2.Request("http://localhost:8001/", urllib.urlencode(args)),
- timeout=timeout).read()
- return []
- def get_output(user=user, timeout=timeout):
- return urllib2.urlopen(
- urllib2.Request("http://localhost:8001/",
- urllib.urlencode({"op": "get_output",
- "username": user})),
- timeout=timeout).read()
- def create_user(user):
- ok = False
- while not ok:
- try:
- set_input(user, "user_manager")
- ok = True
- except:
- pass
- def run_file(input_file, test_input, test_output, mode="PS"):
- # Resolve file
- input_file = "integration/code/%s" % input_file
- # Run Modelverse server
- mv = execute("run_local_modelverse", ["bootstrap/bootstrap.m"])
- # consume output of MvK and print it in the main thread
- mvo = OutletThread(mv)
- mvo.start()
- # determine the interface
- if mode == "CS":
- interface = 1
- elif mode == "PS":
- interface = 0
- else:
- raise RuntimeError("Mode be either PS or CS")
- # compile to code
- code = main(input_file, grammar_file, mode)
- create_user(user)
- # set interface
- set_input(interface)
- try:
- # send code
- if mode == "CS":
- var_list = {}
- data = []
- for p in code:
- if isinstance(p, int):
- if p not in var_list:
- data = flush_data(data)
- data = []
- val = get_output()
- val = val.split("=", 2)[1].split("&", 1)[0]
- var_list[p] = val
- continue
- else:
- val = var_list[p]
- t = "R"
- else:
- val = p
- t = "V"
- data.append([t, val])
- data = flush_data(data)
- elif mode == "PS":
- set_input(code)
- # set input and get output
- for i in range(len(test_input)):
- set_input(test_input[i])
- val = get_output()
- val = val.split("=", 2)[2]
- print("For input %s, got output %s, expected %s" % (test_input[i], val, test_output[i]))
- assert str(val) == test_output[i]
- if str(val) != test_output[i]:
- return False
- finally:
- mvo.join()
- mv.sendintr()
- # All passed!
- return True
- def run_barebone(constructors_and_test_input, test_output, interface):
- # Run Modelverse server
- mv = execute("run_local_modelverse", ["bootstrap/bootstrap.m"])
- # consume output of MvK and print it in the main thread
- mvo = OutletThread(mv)
- mvo.start()
- create_user(user)
- # set interface
- set_input(interface)
- # send constructors and inpout
- var_list = {}
- data = []
- for p in constructors_and_test_input:
- if isinstance(p, int):
- if p not in var_list:
- data = flush_data(data)
- val = get_output()
- val = val.split("=", 2)[1].split("&", 1)[0]
- var_list[p] = val
- continue
- else:
- val = var_list[p]
- t = "R"
- else:
- val = p
- t = "V"
- data.append([t, val])
- data = flush_data(data)
- for e in test_output:
- val = get_output()
- val = val.split("=", 2)[2]
- print("Got %s, expected %s" % (val, e))
- assert str(val) == e
- if str(val) != e:
- mvo.join()
- mv.sendintr()
- return False
- # for e in test_output:
- # c = len(e) if isinstance(e, set) else 1
- # for _ in range(c):
- # val = get_output(user)
- # val = val.split("=", 2)[2]
- #
- # print("Got %s, expected %s" % (val, e))
- # if isinstance(e, set):
- # assert str(val) in e
- # if str(val) not in e:
- # mvo.join()
- # mv.sendintr()
- # return False
- # else:
- # assert str(val) == e
- # if str(val) != e:
- # mvo.join()
- # mv.sendintr()
- # return False
- # All passed!
- mvo.join()
- mv.sendintr()
- return True
- # assert(run_file("binary_to_decimal.al",
- # ["1", "10", "11", "100", "001", "1100111101"],
- # ["1", "2", "3", "4", "1", "829"],
- # "PS"))
- assert(run_file("binary_to_decimal.al",
- ["1", "10", "11", "100", "001", "1100111101"],
- ["1", "2", "3", "4", "1", "829"],
- "CS"))
- # assert(run_file("fibonacci.al", [1, 2, 3, 4], ["1", "1", "2", "3"]))
- # assert(run_file("fibonacci.al", [1, 2, 3, 4], ["1", "1", "2", "3"], mode="CS"))
- # assert(run_file("power.al", [(1, 0), (2, 1), (5, 0), (2, 2), (3, 2), (10, 2), (10, 10)], ["1", "2", "1", "4", "9", "100", "10000000000"]))
- # assert(run_file("binary_to_decimal.al", ["1", "10", "11", "100", "001", "1100111101"], ["1", "2", "3", "4", "1", "829"]))
- # assert(run_file("leap_year.al", [ 2016, 2015, 2014, 2013, 2012, 2001, 2000, 1999],
- # ["True", "False", "False", "False", "True", "False", "False", "False"]))
- #
- #
- # def flatten(lst):
- # new_lst = []
- # for f in lst:
- # if isinstance(f, (list, tuple)):
- # new_lst.extend(flatten(f))
- # else:
- # new_lst.append(f)
- # return new_lst
- #
- #
- # commands = [('"output"', # Output
- # ('"const"', 'true'),
- # 'true', # (has next)
- # ('"return"', # Return
- # 'true', # (has value)
- # ('"const"', 'true'),
- # ),
- # )
- # ]
- #
- #
- # assert(run_barebone(flatten(commands), ["True"], 1))
|