utils.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import unittest
  2. import sys
  3. import os
  4. import sys
  5. import time
  6. import json
  7. import urllib
  8. import urllib2
  9. import subprocess
  10. import signal
  11. import random
  12. sys.path.append("interface/HUTN")
  13. sys.path.append("scripts")
  14. from hutn_compiler.compiler import main as do_compile
  15. from check_objects import to_recompile
  16. taskname = "test_task"
  17. parallel_push = True
  18. INIT_TIMEOUT = 30
  19. TIMEOUT = 2000
  20. try:
  21. import pytest
  22. slow = pytest.mark.skipif(
  23. not pytest.config.getoption("--runslow"),
  24. reason="need --runslow option to run"
  25. )
  26. except:
  27. slow = lambda i:i
  28. ports = set()
  29. def getFreePort():
  30. """Gets a unique new port."""
  31. while 1:
  32. port = random.randint(10000, 20000)
  33. # Check if this port is in the set of ports.
  34. if port not in ports:
  35. # We have found a unique port. Add it to the set and return.
  36. ports.add(port)
  37. return port
  38. def execute(scriptname, parameters=[], wait=False):
  39. if os.name not in ["nt", "posix"]:
  40. # Stop now, as we would have no clue on how to kill its subtree
  41. raise Exception("Unknown OS version: " + str(os.name))
  42. command = [sys.executable, "-u", "scripts/%s.py" % scriptname] + parameters
  43. if wait:
  44. return subprocess.call(command, shell=False)
  45. else:
  46. return subprocess.Popen(command, shell=False, stdout=subprocess.PIPE)
  47. def kill(process):
  48. if os.name == "nt":
  49. subprocess.call(["taskkill", "/F", "/T", "/PID", "%i" % process.pid])
  50. elif os.name == "posix":
  51. # Kill parents
  52. subprocess.call(["pkill", "-P", "%i" % process.pid])
  53. # Kill self
  54. subprocess.call(["kill", "%i" % process.pid])
  55. def flush_data(address, data):
  56. if data:
  57. urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": json.dumps(data), "taskname": taskname})), timeout=INIT_TIMEOUT).read()
  58. return []
  59. def start_mvc():
  60. port = getFreePort()
  61. address = "http://127.0.0.1:%s" % port
  62. proc = execute("run_local_modelverse", [str(port)], wait=False)
  63. return proc, address
  64. def get_constructor(code):
  65. code_fragments = code.split("\n")
  66. code_fragments = [i for i in code_fragments if i.strip() != ""]
  67. code_fragments = [i.replace(" ", "\t") for i in code_fragments]
  68. initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
  69. code_fragments = [i[initial_tabs:] for i in code_fragments]
  70. code_fragments.append("")
  71. code = "\n".join(code_fragments)
  72. with open("__constraint.alc", "w") as f:
  73. f.write(code)
  74. f.flush()
  75. constructors = do_compile("__constraint.alc", "interface/HUTN/grammars/actionlanguage.g", "CS")
  76. return constructors
  77. def get_model_constructor(code):
  78. # First change multiple spaces to a tab
  79. code_fragments = code.split("\n")
  80. code_fragments = [i for i in code_fragments if i.strip() != ""]
  81. code_fragments = [i.replace(" ", "\t") for i in code_fragments]
  82. initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
  83. code_fragments = [i[initial_tabs:] for i in code_fragments]
  84. code_fragments.append("")
  85. code = "\n".join(code_fragments)
  86. with open("__model.mvc", "w") as f:
  87. f.write(code)
  88. f.flush()
  89. return get_model_constructor_2("__model.mvc")
  90. def get_model_constructor_2(f):
  91. return do_compile(f, "interface/HUTN/grammars/modelling.g", "M") + ["exit"]