"""Defines control flow graph IR data structures.""" from collections import defaultdict # Let's just agree to disagree on map vs list comprehensions, pylint. # pylint: disable=I0011,W0141 class SharedCounter(object): """Defines a shared counter.""" def __init__(self): self.index = 0 def next_value(self): """Gets the next value for this counter.""" result = self.index self.index += 1 return result class BasicBlock(object): """Represents a basic block.""" def __init__(self, counter): self.parameters = [] self.definitions = [] self.counter = counter self.index = counter.next_value() self.definition_counter = SharedCounter() self.flow = UnreachableFlow() def append_parameter(self, parameter): """Appends a parameter to this basic block.""" result = self.create_definition(parameter) self.parameters.append(result) if len(self.definitions) > 0: self.renumber_definitions() return result def remove_parameter(self, parameter): """Removes the given parameter definition from this basic block.""" return self.parameters.remove(parameter) def prepend_definition(self, value): """Defines the given value in this basic block.""" result = self.create_definition(value) self.definitions.insert(0, result) self.renumber_definitions() return result def insert_definition_before(self, anchor, value): """Inserts the second definition or value before the first definition.""" index = None for i, definition in enumerate(self.definitions): if definition.definition_index == anchor.definition_index: index = i if index is None: raise ValueError( 'Cannot insert a definition because the anchor ' 'is not defined in this block.') result = self.create_definition(value) self.definitions.insert(index, result) self.renumber_definitions() return result def append_definition(self, value): """Defines the given value in this basic block.""" result = self.create_definition(value) self.definitions.append(result) return result def create_definition(self, value=None): """Creates a definition, but does not assign it to this block yet.""" if isinstance(value, Definition): value.block = self value.renumber(self.definition_counter.next_value()) return value else: assert isinstance(value, Value) or value is None return Definition( self.counter.next_value(), self, self.definition_counter.next_value(), value) def remove_definition(self, definition): """Removes the given definition from this basic block.""" return self.definitions.remove(definition) def renumber_definitions(self): """Re-numbers all definitions in this basic block.""" self.definition_counter = SharedCounter() for definition in self.parameters: definition.renumber(self.definition_counter.next_value()) for definition in self.definitions: definition.renumber(self.definition_counter.next_value()) def __str__(self): prefix = '!%d(%s):' % (self.index, ', '.join(map(str, self.parameters))) return '\n'.join( [prefix] + [' ' * 4 + str(item) for item in self.definitions + [self.flow]]) class Definition(object): """Maps a value to a variable.""" def __init__(self, index, block, definition_index, value): self.index = index self.block = block self.definition_index = definition_index self.value = value if value is not None: assert isinstance(value, Value) or isinstance(value, Definition) def redefine(self, new_value): """Tweaks this definition to take on the given new value.""" self.value = new_value if new_value is not None: assert isinstance(new_value, Value) or isinstance(new_value, Definition) def renumber(self, new_definition_index): """Updates this definition's index in the block that defines it.""" self.definition_index = new_definition_index def get_all_dependencies(self): """Gets all definitions and instructions on which this definition depends, along with any dependencies of instruction dependencies.""" if isinstance(self.value, Definition): return [self.value] else: return self.value.get_all_dependencies() def has_side_effects(self): """Tests if this definition produces any side-effects.""" return self.value.has_side_effects() def has_value(self): """Tells if this definition produces a result that is not None.""" return self.value.has_value() def insert_before(self, value): """Inserts the given value or definition before this definition.""" return self.block.insert_definition_before(self, value) def ref_str(self): """Gets a string that represents a reference to this definition.""" return '$%d' % self.index def __str__(self): return '$%d = %s' % (self.index, self.value.ref_str()) class Instruction(object): """Represents an instruction.""" def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" raise NotImplementedError() def get_all_dependencies(self): """Gets all definitions and instructions on which this instruction depends, along with any dependencies of instruction dependencies.""" results = list(self.get_dependencies()) for item in results: if not isinstance(item, Definition): results.extend(item.get_all_dependencies()) return results class Branch(Instruction): """Represents a branch from one basic block to another.""" def __init__(self, block, arguments=None): self.block = block assert isinstance(block, BasicBlock) if arguments is None: arguments = [] self.arguments = arguments assert all([isinstance(arg, Definition) for arg in arguments]) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return self.arguments def __str__(self): return '!%d(%s)' % (self.block.index, ', '.join([arg.ref_str() for arg in self.arguments])) class FlowInstruction(Instruction): """Represents a control flow instruction which terminates a basic block.""" def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" raise NotImplementedError() def has_side_effects(self): """Tells if this instruction has side-effects.""" # All flow-instructions have side-effects! return True class JumpFlow(FlowInstruction): """Represents a control flow instruction which jumps directly to a basic block.""" def __init__(self, branch): FlowInstruction.__init__(self) self.branch = branch assert isinstance(branch, Branch) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return self.branches() def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" return [self.branch] def __str__(self): return 'jump %s' % self.branch class SelectFlow(FlowInstruction): """Represents a control flow instruction which jumps to one of two basic blocks depending on whether a condition is truthy or not.""" def __init__(self, condition, if_branch, else_branch): FlowInstruction.__init__(self) self.condition = condition assert isinstance(condition, Definition) self.if_branch = if_branch assert isinstance(if_branch, Branch) self.else_branch = else_branch assert isinstance(else_branch, Branch) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.condition] + self.branches() def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" return [self.if_branch, self.else_branch] def __str__(self): return 'select %s, %s, %s' % (self.condition.ref_str(), self.if_branch, self.else_branch) class ReturnFlow(FlowInstruction): """Represents a control flow instruction which terminates the execution of the current function and returns a value.""" def __init__(self, value): FlowInstruction.__init__(self) self.value = value assert isinstance(value, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.value] def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" return [] def __str__(self): return 'return %s' % self.value.ref_str() class ThrowFlow(FlowInstruction): """Represents a control flow instruction which throws an exception.""" def __init__(self, exception): FlowInstruction.__init__(self) self.exception = exception assert isinstance(exception, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.exception] def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" return [] def __str__(self): return 'throw %s' % self.exception.ref_str() class UnreachableFlow(FlowInstruction): """Represents a control flow instruction which is unreachable.""" def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def branches(self): """Gets a list of basic blocks targeted by this flow instruction.""" return [] def __str__(self): return 'unreachable' class Value(Instruction): """A value: an instruction that produces some result.""" def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" raise NotImplementedError() def has_value(self): """Tells if this value produces a result that is not None.""" return True def has_side_effects(self): """Tells if this instruction has side-effects.""" return False def ref_str(self): """Gets a string that represents this value.""" return str(self) class BlockParameter(Value): """A basic block parameter.""" def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'block-parameter' class FunctionParameter(Value): """A function parameter.""" def __init__(self, name): Value.__init__(self) self.name = name def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'func-parameter %s' % self.name class Literal(Value): """A literal value.""" def __init__(self, literal): Value.__init__(self) self.literal = literal def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def has_value(self): """Tells if this value produces a result that is not None.""" return self.literal is not None def __str__(self): return 'literal %r' % self.literal class IndirectFunctionCall(Value): """A value that is the result of an indirect function call.""" def __init__(self, target, argument_list): Value.__init__(self) assert isinstance(target, Definition) self.target = target assert all([isinstance(val, Definition) for _, val in argument_list]) self.argument_list = argument_list def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.target] + [val for _, val in self.argument_list] def __str__(self): return 'indirect-call %s(%s)' % ( self.target.ref_str(), ', '.join(['%s=%s' % (key, val.ref_str()) for key, val in self.argument_list])) SIMPLE_POSITIONAL_CALLING_CONVENTION = 'simple-positional' """The calling convention for functions that use 'return' statements to return. Arguments are matched to parameters based on position.""" JIT_CALLING_CONVENTION = 'jit' """The calling convention for jitted functions.""" class DirectFunctionCall(Value): """A value that is the result of a direct function call.""" def __init__(self, target_name, argument_list, calling_convention=JIT_CALLING_CONVENTION): Value.__init__(self) self.target_name = target_name assert all([isinstance(val, Definition) for _, val in argument_list]) self.argument_list = argument_list self.calling_convention = calling_convention def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [val for _, val in self.argument_list] def __str__(self): return 'direct-call %r %s(%s)' % ( self.calling_convention, self.target_name, ', '.join(['%s=%s' % (key, val.ref_str()) for key, val in self.argument_list])) class AllocateRootNode(Value): """A value that produces a new root node. Typically used in function prologs.""" def __init__(self): Value.__init__(self) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'alloc-root-node' class DeallocateRootNode(Value): """A value that deallocates a root node. Typically used in function epilogs.""" def __init__(self, root_node): Value.__init__(self) assert isinstance(root_node, Definition) self.root_node = root_node def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.root_node] def has_value(self): """Tells if this value produces a result that is not None.""" return False def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'free-root-node %s' % self.root_node.ref_str() class DeclareLocal(Value): """A value that declares a local variable.""" def __init__(self, variable, root_node): Value.__init__(self) self.variable = variable self.root_node = root_node def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.root_node] def has_value(self): """Tells if this value produces a result that is not None.""" return False def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'declare-local %s, %s' % (self.variable, self.root_node.ref_str()) class DeclareGlobal(Value): """A value that declares a global variable.""" def __init__(self, variable): Value.__init__(self) self.variable = variable def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def has_value(self): """Tells if this value produces a result that is not None.""" return False def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'declare-global %s' % self.variable.name class CheckLocalExists(Value): """A value that checks if a local value has been defined (yet).""" def __init__(self, variable): Value.__init__(self) self.variable = variable def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'check-local-exists %s' % self.variable class ResolveLocal(Value): """A value that resolves a local as a pointer.""" def __init__(self, variable): Value.__init__(self) self.variable = variable def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'resolve-local %s' % self.variable class ResolveGlobal(Value): """A value that resolves a global as a pointer.""" def __init__(self, variable): Value.__init__(self) self.variable = variable def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def __str__(self): return 'resolve-global %s' % self.variable.name class LoadPointer(Value): """A value that loads the value assigned to a pointer.""" def __init__(self, pointer): Value.__init__(self) self.pointer = pointer assert isinstance(pointer, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.pointer] def __str__(self): return 'load %s' % self.pointer.ref_str() class StoreAtPointer(Value): """A value that assigns a value to a pointer.""" def __init__(self, pointer, value): Value.__init__(self) self.pointer = pointer assert isinstance(pointer, Definition) self.value = value assert isinstance(value, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.pointer, self.value] def has_value(self): """Tells if this value produces a result that is not None.""" return False def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'store %s, %s' % (self.pointer.ref_str(), self.value.ref_str()) class Read(Value): """A value that reads the value stored in a node.""" def __init__(self, node): Value.__init__(self) self.node = node assert isinstance(node, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.node] def __str__(self): return 'read %s' % (self.node.ref_str()) class CreateNode(Value): """A value that creates a new node.""" def __init__(self, value): Value.__init__(self) self.value = value assert isinstance(value, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.value] def __str__(self): return 'create-node %s' % (self.value.ref_str()) class Input(Value): """A value that pops a node from the input queue.""" def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [] def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'input' class Output(Value): """A value that pushes a node onto the output queue.""" def __init__(self, value): Value.__init__(self) self.value = value assert isinstance(value, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.value] def has_value(self): """Tells if this value produces a result that is not None.""" return False def has_side_effects(self): """Tells if this instruction has side-effects.""" return True def __str__(self): return 'output %s' % self.value.ref_str() class Binary(Value): """A value that applies a binary operator to two other values.""" def __init__(self, lhs, operator, rhs): Value.__init__(self) self.lhs = lhs assert isinstance(lhs, Definition) self.operator = operator self.rhs = rhs assert isinstance(rhs, Definition) def get_dependencies(self): """Gets all definitions and instructions on which this instruction depends.""" return [self.lhs, self.rhs] def __str__(self): return 'binary %s, %r, %s' % (self.lhs.ref_str(), self.operator, self.rhs.ref_str()) def create_jump(block, arguments=None): """Creates a jump to the given block with the given argument list.""" return JumpFlow(Branch(block, arguments)) def get_def_value(def_or_value): """Returns the given value, or the underlying value of the given definition, whichever is appropriate.""" if isinstance(def_or_value, Definition): return get_def_value(def_or_value.value) else: return def_or_value def apply_to_value(function, def_or_value): """Applies the given function to the specified value, or the underlying value of the given definition.""" return function(get_def_value(def_or_value)) def is_literal(value): """Tests if the given value is a literal.""" return isinstance(value, Literal) def is_literal_def(def_or_value): """Tests if the given value is a literal or a definition with an underlying literal.""" return apply_to_value(is_literal, def_or_value) def is_value_def(def_or_value, class_or_type_or_tuple=Value): """Tests if the given definition or value is a value of the given type.""" return isinstance(get_def_value(def_or_value), class_or_type_or_tuple) def get_def_variable(def_or_value): """Gets the 'variable' attribute of the given value, or the underlying value of the given definition, whichever is appropriate.""" return get_def_value(def_or_value).variable def get_literal_value(value): """Gets the value of the given literal value.""" return value.literal def get_literal_def_value(def_or_value): """Gets the value of the given literal value or definition with an underlying literal.""" return apply_to_value(get_literal_value, def_or_value) def get_all_predecessor_blocks(entry_point): """Creates a mapping of blocks to their direct predecessors for every block in the control-flow graph defined by the given entry point.""" results = defaultdict(set) processed = set() def __find_predecessors_step(block): if block in processed: return processed.add(block) for branch in block.flow.branches(): target_block = branch.block results[target_block].add(block) __find_predecessors_step(target_block) __find_predecessors_step(entry_point) return results def get_directly_reachable_blocks(block): """Gets the set of all blocks that can be reached by taking a single branch from the given block.""" return [branch.block for branch in block.flow.branches()] def get_reachable_blocks(entry_point): """Constructs the set of all reachable vertices from the given block.""" # This is a simple O(n^2) algorithm. Maybe a faster algorithm is more appropriate here. def __add_block_children(block, results): for child in get_directly_reachable_blocks(block): if child not in results: results.add(child) __add_block_children(child, results) return results return __add_block_children(entry_point, set()) def get_all_reachable_blocks(entry_point): """Constructs the set of all reachable vertices, for every block that is reachable from the given entry point.""" # This is a simple O(n^3) algorithm. Maybe a faster algorithm is more appropriate here. results = {} all_blocks = get_reachable_blocks(entry_point) results[entry_point] = all_blocks for block in all_blocks: if block not in results: results[block] = get_reachable_blocks(block) return results def get_all_blocks(entry_point): """Gets all basic blocks in the control-flow graph defined by the given entry point.""" yield entry_point for block in get_reachable_blocks(entry_point): yield block