123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import unittest
- import sys
- import os
- import sys
- import time
- import json
- import urllib
- import urllib2
- import subprocess
- import signal
- import random
- sys.path.append("interface/HUTN")
- sys.path.append("scripts")
- from hutn_compiler.compiler import main as do_compile
- from check_objects import to_recompile
- taskname = "test_task"
- parallel_push = True
- INIT_TIMEOUT = 30
- TIMEOUT = 2000
- try:
- import pytest
- slow = pytest.mark.skipif(
- not pytest.config.getoption("--runslow"),
- reason="need --runslow option to run"
- )
- except:
- slow = lambda i:i
- ports = set()
- def getFreePort():
- """Gets a unique new port."""
- while 1:
- port = random.randint(10000, 20000)
- # Check if this port is in the set of ports.
- if port not in ports:
- # We have found a unique port. Add it to the set and return.
- ports.add(port)
- return port
- def execute(scriptname, parameters=[], wait=False):
- if os.name not in ["nt", "posix"]:
- # Stop now, as we would have no clue on how to kill its subtree
- raise Exception("Unknown OS version: " + str(os.name))
- command = [sys.executable, "-u", "scripts/%s.py" % scriptname] + parameters
- if wait:
- return subprocess.call(command, shell=False)
- else:
- return subprocess.Popen(command, shell=False, stdout=subprocess.PIPE)
- def kill(process):
- if os.name == "nt":
- subprocess.call(["taskkill", "/F", "/T", "/PID", "%i" % process.pid])
- elif os.name == "posix":
- # Kill parents
- subprocess.call(["pkill", "-P", "%i" % process.pid])
- # Kill self
- subprocess.call(["kill", "%i" % process.pid])
- def flush_data(address, data):
- if data:
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": json.dumps(data), "taskname": taskname})), timeout=INIT_TIMEOUT).read()
- return []
- def start_mvc():
- port = getFreePort()
- address = "http://127.0.0.1:%s" % port
- proc = execute("run_local_modelverse", [str(port)], wait=False)
- return proc, address
- def get_constructor(code):
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__constraint.alc", "w") as f:
- f.write(code)
- f.flush()
- constructors = do_compile("__constraint.alc", "interface/HUTN/grammars/actionlanguage.g", "CS")
- return constructors
- def get_model_constructor(code):
- # First change multiple spaces to a tab
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__model.mvc", "w") as f:
- f.write(code)
- f.flush()
- return get_model_constructor_2("__model.mvc")
- def get_model_constructor_2(f):
- return do_compile(f, "interface/HUTN/grammars/modelling.g", "M") + ["exit"]
|