synchronizer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. from ..util.seeded_random import Random
  5. from .control_primitive import ControlPrimitive
  6. from .messages import TransformationException, NIL_PACKET
  7. class Synchronizer(ControlPrimitive):
  8. '''
  9. Synchonizes all threads of execution by merging the packets.
  10. '''
  11. def __init__(self, threads=2, custom_merge=lambda packets: None):
  12. '''
  13. Synchonizes all threads of execution by merging the packets.
  14. @param threads: Specifies how many threads will be synchronized.
  15. By default, this is 2.
  16. @param custom_merge: Function that defines how to merge the success packets.
  17. By default, this returns None.
  18. '''
  19. super(Synchronizer, self)
  20. assert(threads >= 2)
  21. self.threads = threads
  22. self.custom_merge = custom_merge
  23. def success_in(self, packet):
  24. '''
  25. Receives a successful packet
  26. '''
  27. self.exception = None
  28. self.is_success = False
  29. self.success.append(packet)
  30. def fail_in(self, packet):
  31. '''
  32. Receives a failed packet
  33. '''
  34. self.exception = None
  35. self.is_success = False
  36. self.fail.append(packet)
  37. def _custom_merge(self):
  38. '''
  39. Applies the user-defined merge function
  40. '''
  41. return self.custom_merge(self.success)
  42. def _default_merge(self):
  43. '''
  44. Attempts to merge the packets conservatively
  45. '''
  46. return None
  47. def merge(self):
  48. '''
  49. Attempts to merge the packets into a single one, only if all threads had succeeded.
  50. '''
  51. self.exception = None
  52. self.is_success = False
  53. def failure():
  54. self.is_success = False
  55. self.exception = TransformationException()
  56. self.exception.packet = NIL_PACKET
  57. return NIL_PACKET
  58. if len(self.success) == self.threads:
  59. packet = self._custom_merge()
  60. if packet is not None:
  61. self.is_success = True
  62. self.reset()
  63. return packet
  64. else:
  65. packet = self._default_merge()
  66. if packet is not None:
  67. self.is_success = True
  68. self.reset()
  69. return packet
  70. else:
  71. return failure()
  72. elif len(self.success) + len(self.fail) == self.threads:
  73. self.is_success = False
  74. return Random.choice(self.fail)
  75. else:
  76. return failure()