Skip to content

Checkpoint

graphorchestrator.core.checkpoint

CheckpointData

Represents the data to be checkpointed, including graph state, and execution metadata.

Source code in graphorchestrator\core\checkpoint.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class CheckpointData:
    """Represents the data to be checkpointed, including graph state, and execution metadata."""

    def __init__(
        self,
        graph: Graph,
        initial_state: State,
        active_states: Dict[str, List[State]],
        superstep: int,
        final_state: Optional[State],
        retry_policy: RetryPolicy,
        max_workers: int,
    ):
        """
        Initializes the CheckpointData with the necessary components for checkpointing.

        Args:
            graph (Graph): The graph structure.
            initial_state (State): The initial state of the graph execution.
            active_states (Dict[str, List[State]]): The states of active nodes.
            superstep (int): The current superstep number.
            final_state (Optional[State]): The final state of the graph execution, if available.
            retry_policy (RetryPolicy): The retry policy applied to the graph execution.
            max_workers (int): The maximum number of workers used in execution.
        """

        # Assign the provided parameters to the object's attributes.
        self.graph = graph
        self.initial_state = initial_state
        self.active_states = active_states
        self.superstep = superstep
        self.final_state = final_state
        self.retry_policy = retry_policy
        self.max_workers = max_workers

    def save(self, path: str) -> None:
        """
        Serializes and saves the checkpoint data to the specified path.

        Args:
            path (str): The file path where the checkpoint will be saved.
        """

        # Get the graph logger instance.
        log = GraphLogger.get()

        # Serialize the checkpoint data and save it to the specified file path.
        with open(path, "wb") as f:
            pickle.dump(self, f)

        log.info(
            **wrap_constants(
                message="Checkpoint saved to disk",
                level="INFO",
                # Prepare log entry with essential checkpointing information.
                **{
                    LC.EVENT_TYPE: "graph",
                    LC.ACTION: "checkpoint_save",
                    LC.SUPERSTEP: self.superstep,
                    LC.CUSTOM: {
                        "path": path,
                        "final_state_message_count": (
                            len(self.final_state.messages) if self.final_state else None
                        ),
                        "active_nodes": list(self.active_states.keys()),
                    },
                }
            )
        )

    @staticmethod
    def load(path: str) -> "CheckpointData":
        """
        Loads checkpoint data from the specified path.

        Args:
            path (str): The file path from which to load the checkpoint.

        Returns:
            CheckpointData: The loaded checkpoint data.
        """
        # Get the graph logger instance.
        log = GraphLogger.get()
        # Deserialize the checkpoint data from the specified file path.
        with open(path, "rb") as f:
            data: CheckpointData = pickle.load(f)

        log.info(
            **wrap_constants(
                message="Checkpoint loaded from disk",
                level="INFO",
                # Prepare log entry with essential checkpoint loading information.
                **{
                    LC.EVENT_TYPE: "graph",
                    LC.ACTION: "checkpoint_load",
                    LC.SUPERSTEP: data.superstep,
                    LC.CUSTOM: {
                        "path": path,
                        "active_nodes": list(data.active_states.keys()),
                    },
                }
            )
        )
        # Return the loaded checkpoint data.
        return data

__init__(graph, initial_state, active_states, superstep, final_state, retry_policy, max_workers)

Initializes the CheckpointData with the necessary components for checkpointing.

Parameters:

Name Type Description Default
graph Graph

The graph structure.

required
initial_state State

The initial state of the graph execution.

required
active_states Dict[str, List[State]]

The states of active nodes.

required
superstep int

The current superstep number.

required
final_state Optional[State]

The final state of the graph execution, if available.

required
retry_policy RetryPolicy

The retry policy applied to the graph execution.

required
max_workers int

The maximum number of workers used in execution.

required
Source code in graphorchestrator\core\checkpoint.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    graph: Graph,
    initial_state: State,
    active_states: Dict[str, List[State]],
    superstep: int,
    final_state: Optional[State],
    retry_policy: RetryPolicy,
    max_workers: int,
):
    """
    Initializes the CheckpointData with the necessary components for checkpointing.

    Args:
        graph (Graph): The graph structure.
        initial_state (State): The initial state of the graph execution.
        active_states (Dict[str, List[State]]): The states of active nodes.
        superstep (int): The current superstep number.
        final_state (Optional[State]): The final state of the graph execution, if available.
        retry_policy (RetryPolicy): The retry policy applied to the graph execution.
        max_workers (int): The maximum number of workers used in execution.
    """

    # Assign the provided parameters to the object's attributes.
    self.graph = graph
    self.initial_state = initial_state
    self.active_states = active_states
    self.superstep = superstep
    self.final_state = final_state
    self.retry_policy = retry_policy
    self.max_workers = max_workers

load(path) staticmethod

Loads checkpoint data from the specified path.

Parameters:

Name Type Description Default
path str

The file path from which to load the checkpoint.

required

Returns:

Name Type Description
CheckpointData CheckpointData

The loaded checkpoint data.

Source code in graphorchestrator\core\checkpoint.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@staticmethod
def load(path: str) -> "CheckpointData":
    """
    Loads checkpoint data from the specified path.

    Args:
        path (str): The file path from which to load the checkpoint.

    Returns:
        CheckpointData: The loaded checkpoint data.
    """
    # Get the graph logger instance.
    log = GraphLogger.get()
    # Deserialize the checkpoint data from the specified file path.
    with open(path, "rb") as f:
        data: CheckpointData = pickle.load(f)

    log.info(
        **wrap_constants(
            message="Checkpoint loaded from disk",
            level="INFO",
            # Prepare log entry with essential checkpoint loading information.
            **{
                LC.EVENT_TYPE: "graph",
                LC.ACTION: "checkpoint_load",
                LC.SUPERSTEP: data.superstep,
                LC.CUSTOM: {
                    "path": path,
                    "active_nodes": list(data.active_states.keys()),
                },
            }
        )
    )
    # Return the loaded checkpoint data.
    return data

save(path)

Serializes and saves the checkpoint data to the specified path.

Parameters:

Name Type Description Default
path str

The file path where the checkpoint will be saved.

required
Source code in graphorchestrator\core\checkpoint.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def save(self, path: str) -> None:
    """
    Serializes and saves the checkpoint data to the specified path.

    Args:
        path (str): The file path where the checkpoint will be saved.
    """

    # Get the graph logger instance.
    log = GraphLogger.get()

    # Serialize the checkpoint data and save it to the specified file path.
    with open(path, "wb") as f:
        pickle.dump(self, f)

    log.info(
        **wrap_constants(
            message="Checkpoint saved to disk",
            level="INFO",
            # Prepare log entry with essential checkpointing information.
            **{
                LC.EVENT_TYPE: "graph",
                LC.ACTION: "checkpoint_save",
                LC.SUPERSTEP: self.superstep,
                LC.CUSTOM: {
                    "path": path,
                    "final_state_message_count": (
                        len(self.final_state.messages) if self.final_state else None
                    ),
                    "active_nodes": list(self.active_states.keys()),
                },
            }
        )
    )