synchronizer.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. '''*****************************************************************************
  2. AToMPM - A Tool for Multi-Paradigm Modelling
  3. Copyright (c) 2011 Eugene Syriani
  4. This file is part of AToMPM.
  5. AToMPM is free software: you can redistribute it and/or modify it under the
  6. terms of the GNU Lesser General Public License as published by the Free Software
  7. Foundation, either version 3 of the License, or (at your option) any later
  8. version.
  9. AToMPM is distributed in the hope that it will be useful, but WITHOUT ANY
  10. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  11. PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License along
  13. with AToMPM. If not, see <http://www.gnu.org/licenses/>.
  14. *****************************************************************************'''
  15. from ..util.seeded_random import Random
  16. from control_primitive import ControlPrimitive
  17. from messages import TransformationException, NIL_PACKET
  18. class Synchronizer(ControlPrimitive):
  19. '''
  20. Synchonizes all threads of execution by merging the packets.
  21. '''
  22. def __init__(self, threads=2, custom_merge=lambda packets: None):
  23. '''
  24. Synchonizes all threads of execution by merging the packets.
  25. @param threads: Specifies how many threads will be synchronized.
  26. By default, this is 2.
  27. @param custom_merge: Function that defines how to merge the success packets.
  28. By default, this returns None.
  29. '''
  30. super(Synchronizer, self)
  31. assert(threads >= 2)
  32. self.threads = threads
  33. self.custom_merge = custom_merge
  34. def success_in(self, packet):
  35. '''
  36. Receives a successful packet
  37. '''
  38. self.exception = None
  39. self.is_success = False
  40. self.success.append(packet)
  41. def fail_in(self, packet):
  42. '''
  43. Receives a failed packet
  44. '''
  45. self.exception = None
  46. self.is_success = False
  47. self.fail.append(packet)
  48. def _custom_merge(self):
  49. '''
  50. Applies the user-defined merge function
  51. '''
  52. return self.custom_merge(self.success)
  53. def _default_merge(self):
  54. '''
  55. Attempts to merge the packets conservatively
  56. '''
  57. return None
  58. def merge(self):
  59. '''
  60. Attempts to merge the packets into a single one, only if all threads had succeeded.
  61. '''
  62. self.exception = None
  63. self.is_success = False
  64. def failure():
  65. self.is_success = False
  66. self.exception = TransformationException()
  67. self.exception.packet = NIL_PACKET
  68. return NIL_PACKET
  69. if len(self.success) == self.threads:
  70. packet = self._custom_merge()
  71. if packet is not None:
  72. self.is_success = True
  73. self.reset()
  74. return packet
  75. else:
  76. packet = self._default_merge()
  77. if packet is not None:
  78. self.is_success = True
  79. self.reset()
  80. return packet
  81. else:
  82. return failure()
  83. elif len(self.success) + len(self.fail) == self.threads:
  84. self.is_success = False
  85. return Random.choice(self.fail)
  86. else:
  87. return failure()