import unittest import sys import os import sys import time import json import urllib import urllib2 import subprocess import signal import random sys.path.append("interface/HUTN") sys.path.append("scripts") from hutn_compiler.compiler import main as do_compile taskname = "test_task" INIT_TIMEOUT = 30 try: import pytest slow = pytest.mark.skipif( not pytest.config.getoption("--runslow"), reason="need --runslow option to run" ) except: slow = lambda i:i ports = set() def getFreePort(): """Gets a unique new port.""" while 1: port = random.randint(10000, 20000) # Check if this port is in the set of ports. if port not in ports: # We have found a unique port. Add it to the set and return. ports.add(port) return port def execute(scriptname, parameters=[], wait=False): if os.name not in ["nt", "posix"]: # Stop now, as we would have no clue on how to kill its subtree raise Exception("Unknown OS version: " + str(os.name)) command = [sys.executable, "-u", "scripts/%s.py" % scriptname] + parameters if wait: return subprocess.call(command, shell=False) else: return subprocess.Popen(command, shell=False, stdout=subprocess.PIPE) def kill(process): if os.name == "nt": subprocess.call(["taskkill", "/F", "/T", "/PID", "%i" % process.pid]) elif os.name == "posix": # Kill parents subprocess.call(["pkill", "-9", "-P", "%i" % process.pid]) # Kill self subprocess.call(["kill", "-9", "%i" % process.pid]) def flush_data(address, data): if data: urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": json.dumps(data), "taskname": taskname})), timeout=INIT_TIMEOUT).read() return [] def start_mvc(): port = getFreePort() address = "http://127.0.0.1:%s" % port proc = execute("run_local_modelverse", [str(port)], wait=False) return proc, address def get_constructor(code): code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__constraint.alc", "w") as f: f.write(code) f.flush() constructors = do_compile("__constraint.alc", "interface/HUTN/grammars/actionlanguage.g", "CS") return constructors def get_model_constructor(code): # First change multiple spaces to a tab code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__model.mvc", "w") as f: f.write(code) f.flush() return get_model_constructor_2("__model.mvc") def get_model_constructor_2(f): return do_compile(f, "interface/HUTN/grammars/modelling.g", "M") + ["exit"]