Skip to content

Processing Node

graphorchestrator.nodes.nodes.ProcessingNode

Bases: Node

A node that processes the state.

This node takes a function that operates on a State object, processes it, and returns a modified State object.

Source code in graphorchestrator\nodes\nodes.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ProcessingNode(Node):
    """
    A node that processes the state.

    This node takes a function that operates on a State object, processes it,
    and returns a modified State object.
    """

    def __init__(self, node_id: str, func: Callable[[State], State]) -> None:
        super().__init__(node_id)
        self.func = func
        if not getattr(func, "is_node_action", False):
            raise NodeActionNotDecoratedError(func)

        GraphLogger.get().info(
            **wrap_constants(
                message="ProcessingNode created",
                **{
                    LC.EVENT_TYPE: "node",
                    LC.NODE_ID: self.node_id,
                    LC.NODE_TYPE: "ProcessingNode",
                    LC.ACTION: "node_created",
                    LC.CUSTOM: {"function": func.__name__},
                },
            )
        )

    async def execute(self, state: State) -> State:
        """
        Executes the processing logic of the node.

        Args:
            state (State): The input state for the node.

        Returns:
            State: The modified state after processing.
        """
        log = GraphLogger.get()

        log.info(
            **wrap_constants(
                message="ProcessingNode execution started",
                **{
                    LC.EVENT_TYPE: "node",
                    LC.NODE_ID: self.node_id,
                    LC.NODE_TYPE: "ProcessingNode",
                    LC.ACTION: "execute_start",
                    LC.INPUT_SIZE: len(state.messages),
                },
            )
        )

        result = (
            await self.func(state)
            if asyncio.iscoroutinefunction(self.func)
            else self.func(state)
        )

        log.info(
            **wrap_constants(
                message="ProcessingNode execution completed",
                **{
                    LC.EVENT_TYPE: "node",
                    LC.NODE_ID: self.node_id,
                    LC.NODE_TYPE: "ProcessingNode",
                    LC.ACTION: "execute_end",
                    LC.OUTPUT_SIZE: len(result.messages),
                },
            )
        )

        return result

execute(state) async

Executes the processing logic of the node.

Parameters:

Name Type Description Default
state State

The input state for the node.

required

Returns:

Name Type Description
State State

The modified state after processing.

Source code in graphorchestrator\nodes\nodes.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def execute(self, state: State) -> State:
    """
    Executes the processing logic of the node.

    Args:
        state (State): The input state for the node.

    Returns:
        State: The modified state after processing.
    """
    log = GraphLogger.get()

    log.info(
        **wrap_constants(
            message="ProcessingNode execution started",
            **{
                LC.EVENT_TYPE: "node",
                LC.NODE_ID: self.node_id,
                LC.NODE_TYPE: "ProcessingNode",
                LC.ACTION: "execute_start",
                LC.INPUT_SIZE: len(state.messages),
            },
        )
    )

    result = (
        await self.func(state)
        if asyncio.iscoroutinefunction(self.func)
        else self.func(state)
    )

    log.info(
        **wrap_constants(
            message="ProcessingNode execution completed",
            **{
                LC.EVENT_TYPE: "node",
                LC.NODE_ID: self.node_id,
                LC.NODE_TYPE: "ProcessingNode",
                LC.ACTION: "execute_end",
                LC.OUTPUT_SIZE: len(result.messages),
            },
        )
    )

    return result