Skip to content

Dataflow

oqd_compiler_infrastructure.dataflow

GraphProtocol

Bases: Protocol[NodeType]

Any object passed to ForwardDataflowAnalysis.analyze must provide this interface. The protocol is intentionally minimal so it can adapt to Control Flow Graphs (CFGs), dependency graphs, custom IR graphs, etc.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
class GraphProtocol(Protocol[NodeType]):
    """
    Any object passed to `ForwardDataflowAnalysis.analyze` must provide this interface. 
    The protocol is intentionally minimal so it can adapt to Control Flow Graphs (CFGs),
    dependency graphs, custom IR graphs, etc.
    """
    def nodes(self) -> Iterable[NodeType]:
        """ Returns all nodes in the graph. """
        ...
    def predecessors(self, node: NodeType) -> Iterable[NodeType]:
        """ Returns all predecessors of a given node. """
        ...
    def successors(self, node: NodeType) -> Iterable[NodeType]:
        """ Returns all successors of a given node. """
        ...

nodes() -> Iterable[NodeType]

Returns all nodes in the graph.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
def nodes(self) -> Iterable[NodeType]:
    """ Returns all nodes in the graph. """
    ...

predecessors(node: NodeType) -> Iterable[NodeType]

Returns all predecessors of a given node.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
def predecessors(self, node: NodeType) -> Iterable[NodeType]:
    """ Returns all predecessors of a given node. """
    ...

successors(node: NodeType) -> Iterable[NodeType]

Returns all successors of a given node.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
def successors(self, node: NodeType) -> Iterable[NodeType]:
    """ Returns all successors of a given node. """
    ...

DataflowResult dataclass

Bases: Generic[NodeType, StateType]

The result of a dataflow analysis.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
@dataclass(frozen=True)
class DataflowResult(Generic[NodeType, StateType]):
    """
    The result of a dataflow analysis.
    """
    in_states: dict[NodeType, StateType]
    out_states: dict[NodeType, StateType]
    iterations: int

DataflowAnalysis

Bases: ABC, Generic[NodeType, StateType]

Base class that defines what every dataflow analysis must implement.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
class DataflowAnalysis(ABC, Generic[NodeType, StateType]):
    """
    Base class that defines what every dataflow analysis must implement.
    """
    @abstractmethod
    def bottom(self) -> StateType:
        """Returns the default starting state for all nodes."""
        pass

    @abstractmethod
    def boundary_state(self, node: NodeType) -> StateType:
        """Returns the extra state injected at a given node."""
        pass

    @abstractmethod
    def merge(self, states: Iterable[StateType]) -> StateType:
        """Returns the combined state of incoming states."""
        pass

    @abstractmethod
    def transfer(self, node: NodeType, state_in: StateType) -> StateType:
        """Returns the state of a given node after transfer."""
        pass

    def states_equal(self, t1: StateType, t2: StateType) -> bool:
        """Returns True if two states are equal."""
        return t1 == t2

bottom() -> StateType abstractmethod

Returns the default starting state for all nodes.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
@abstractmethod
def bottom(self) -> StateType:
    """Returns the default starting state for all nodes."""
    pass

boundary_state(node: NodeType) -> StateType abstractmethod

Returns the extra state injected at a given node.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
@abstractmethod
def boundary_state(self, node: NodeType) -> StateType:
    """Returns the extra state injected at a given node."""
    pass

merge(states: Iterable[StateType]) -> StateType abstractmethod

Returns the combined state of incoming states.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
@abstractmethod
def merge(self, states: Iterable[StateType]) -> StateType:
    """Returns the combined state of incoming states."""
    pass

transfer(node: NodeType, state_in: StateType) -> StateType abstractmethod

Returns the state of a given node after transfer.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
@abstractmethod
def transfer(self, node: NodeType, state_in: StateType) -> StateType:
    """Returns the state of a given node after transfer."""
    pass

states_equal(t1: StateType, t2: StateType) -> bool

Returns True if two states are equal.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
def states_equal(self, t1: StateType, t2: StateType) -> bool:
    """Returns True if two states are equal."""
    return t1 == t2

ForwardDataflowAnalysis

Bases: DataflowAnalysis[NodeType, StateType], Generic[NodeType, StateType]

Forward dataflow analysis framework. This class implements the fixed point loop.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
class ForwardDataflowAnalysis(DataflowAnalysis[NodeType, StateType], Generic[NodeType, StateType]):
    """
    Forward dataflow analysis framework.
    This class implements the fixed point loop.
    """
    def analyze(self, graph: GraphProtocol[NodeType]) -> DataflowResult[NodeType, StateType]:
        """
        Runs the worklist algorithm and returns the result of the dataflow analysis.
        Steps:
        - Initializes in/out states with `bottom()`.
        - Puts all nodes in a worklist.
        - Recomputes each node from predecessor outputs.
        - If a node output changes, schedules its successors again.
        - Returns final states and iteration count.
        """
        nodes = list(graph.nodes())
        in_states = {node: self.bottom() for node in nodes}
        out_states = {node: self.bottom() for node in nodes}

        worklist = deque(nodes)
        queued = set(nodes)
        iterations = 0

        while worklist:
            node = worklist.popleft()
            queued.discard(node)
            iterations += 1

            pred_states = [out_states[pred] for pred in graph.predecessors(node)]
            merged_input = self.merge([self.boundary_state(node), *pred_states])

            if not self.states_equal(in_states[node], merged_input):
                in_states[node] = merged_input

            next_out = self.transfer(node, merged_input)
            if self.states_equal(out_states[node], next_out):
                continue

            out_states[node] = next_out
            for succ in graph.successors(node):
                if succ not in queued:
                    worklist.append(succ)
                    queued.add(succ)

        return DataflowResult(in_states=in_states, out_states=out_states, iterations=iterations)

analyze(graph: GraphProtocol[NodeType]) -> DataflowResult[NodeType, StateType]

Runs the worklist algorithm and returns the result of the dataflow analysis. Steps: - Initializes in/out states with bottom(). - Puts all nodes in a worklist. - Recomputes each node from predecessor outputs. - If a node output changes, schedules its successors again. - Returns final states and iteration count.

Source code in oqd-compiler-infrastructure/src/oqd_compiler_infrastructure/dataflow.py
def analyze(self, graph: GraphProtocol[NodeType]) -> DataflowResult[NodeType, StateType]:
    """
    Runs the worklist algorithm and returns the result of the dataflow analysis.
    Steps:
    - Initializes in/out states with `bottom()`.
    - Puts all nodes in a worklist.
    - Recomputes each node from predecessor outputs.
    - If a node output changes, schedules its successors again.
    - Returns final states and iteration count.
    """
    nodes = list(graph.nodes())
    in_states = {node: self.bottom() for node in nodes}
    out_states = {node: self.bottom() for node in nodes}

    worklist = deque(nodes)
    queued = set(nodes)
    iterations = 0

    while worklist:
        node = worklist.popleft()
        queued.discard(node)
        iterations += 1

        pred_states = [out_states[pred] for pred in graph.predecessors(node)]
        merged_input = self.merge([self.boundary_state(node), *pred_states])

        if not self.states_equal(in_states[node], merged_input):
            in_states[node] = merged_input

        next_out = self.transfer(node, merged_input)
        if self.states_equal(out_states[node], next_out):
            continue

        out_states[node] = next_out
        for succ in graph.successors(node):
            if succ not in queued:
                worklist.append(succ)
                queued.add(succ)

    return DataflowResult(in_states=in_states, out_states=out_states, iterations=iterations)